]> git.proxmox.com Git - ceph.git/blame - ceph/src/spdk/dpdk/drivers/event/sw/sw_evdev_scheduler.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / dpdk / drivers / event / sw / sw_evdev_scheduler.c
CommitLineData
11fdf7f2
TL
1/* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2016-2017 Intel Corporation
3 */
4
5#include <rte_ring.h>
6#include <rte_hash_crc.h>
7#include <rte_event_ring.h>
8#include "sw_evdev.h"
9#include "iq_chunk.h"
10
11#define SW_IQS_MASK (SW_IQS_MAX-1)
12
13/* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
14 * CLZ twice is faster than caching the value due to data dependencies
15 */
16#define PKT_MASK_TO_IQ(pkts) \
17 (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
18
19#if SW_IQS_MAX != 4
20#error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
21#endif
22#define PRIO_TO_IQ(prio) (prio >> 6)
23
24#define MAX_PER_IQ_DEQUEUE 48
25#define FLOWID_MASK (SW_QID_NUM_FIDS-1)
26/* use cheap bit mixing, we only need to lose a few bits */
27#define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
28
29static inline uint32_t
30sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
31 uint32_t iq_num, unsigned int count)
32{
33 struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
34 struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
35 uint32_t nb_blocked = 0;
36 uint32_t i;
37
38 if (count > MAX_PER_IQ_DEQUEUE)
39 count = MAX_PER_IQ_DEQUEUE;
40
41 /* This is the QID ID. The QID ID is static, hence it can be
42 * used to identify the stage of processing in history lists etc
43 */
44 uint32_t qid_id = qid->id;
45
46 iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
47 for (i = 0; i < count; i++) {
48 const struct rte_event *qe = &qes[i];
49 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
50 struct sw_fid_t *fid = &qid->fids[flow_id];
51 int cq = fid->cq;
52
53 if (cq < 0) {
9f95a23c
TL
54 uint32_t cq_idx;
55 if (qid->cq_next_tx >= qid->cq_num_mapped_cqs)
11fdf7f2 56 qid->cq_next_tx = 0;
9f95a23c
TL
57 cq_idx = qid->cq_next_tx++;
58
11fdf7f2
TL
59 cq = qid->cq_map[cq_idx];
60
61 /* find least used */
62 int cq_free_cnt = sw->cq_ring_space[cq];
63 for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
64 cq_idx++) {
65 int test_cq = qid->cq_map[cq_idx];
66 int test_cq_free = sw->cq_ring_space[test_cq];
67 if (test_cq_free > cq_free_cnt) {
68 cq = test_cq;
69 cq_free_cnt = test_cq_free;
70 }
71 }
72
73 fid->cq = cq; /* this pins early */
74 }
75
76 if (sw->cq_ring_space[cq] == 0 ||
77 sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
78 blocked_qes[nb_blocked++] = *qe;
79 continue;
80 }
81
82 struct sw_port *p = &sw->ports[cq];
83
84 /* at this point we can queue up the packet on the cq_buf */
85 fid->pcount++;
86 p->cq_buf[p->cq_buf_count++] = *qe;
87 p->inflights++;
88 sw->cq_ring_space[cq]--;
89
90 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
91 p->hist_list[head].fid = flow_id;
92 p->hist_list[head].qid = qid_id;
93
94 p->stats.tx_pkts++;
95 qid->stats.tx_pkts++;
96 qid->to_port[cq]++;
97
98 /* if we just filled in the last slot, flush the buffer */
99 if (sw->cq_ring_space[cq] == 0) {
100 struct rte_event_ring *worker = p->cq_worker_ring;
101 rte_event_ring_enqueue_burst(worker, p->cq_buf,
102 p->cq_buf_count,
103 &sw->cq_ring_space[cq]);
104 p->cq_buf_count = 0;
105 }
106 }
107 iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
108
109 return count - nb_blocked;
110}
111
112static inline uint32_t
113sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
114 uint32_t iq_num, unsigned int count, int keep_order)
115{
116 uint32_t i;
117 uint32_t cq_idx = qid->cq_next_tx;
118
119 /* This is the QID ID. The QID ID is static, hence it can be
120 * used to identify the stage of processing in history lists etc
121 */
122 uint32_t qid_id = qid->id;
123
124 if (count > MAX_PER_IQ_DEQUEUE)
125 count = MAX_PER_IQ_DEQUEUE;
126
127 if (keep_order)
128 /* only schedule as many as we have reorder buffer entries */
129 count = RTE_MIN(count,
130 rte_ring_count(qid->reorder_buffer_freelist));
131
132 for (i = 0; i < count; i++) {
133 const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
134 uint32_t cq_check_count = 0;
135 uint32_t cq;
136
137 /*
138 * for parallel, just send to next available CQ in round-robin
139 * fashion. So scan for an available CQ. If all CQs are full
140 * just return and move on to next QID
141 */
142 do {
143 if (++cq_check_count > qid->cq_num_mapped_cqs)
144 goto exit;
9f95a23c 145 if (cq_idx >= qid->cq_num_mapped_cqs)
11fdf7f2 146 cq_idx = 0;
9f95a23c
TL
147 cq = qid->cq_map[cq_idx++];
148
11fdf7f2
TL
149 } while (rte_event_ring_free_count(
150 sw->ports[cq].cq_worker_ring) == 0 ||
151 sw->ports[cq].inflights == SW_PORT_HIST_LIST);
152
153 struct sw_port *p = &sw->ports[cq];
154 if (sw->cq_ring_space[cq] == 0 ||
155 p->inflights == SW_PORT_HIST_LIST)
156 break;
157
158 sw->cq_ring_space[cq]--;
159
160 qid->stats.tx_pkts++;
161
162 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
163 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
164 p->hist_list[head].qid = qid_id;
165
166 if (keep_order)
167 rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
168 (void *)&p->hist_list[head].rob_entry);
169
170 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
171 iq_pop(sw, &qid->iq[iq_num]);
172
173 rte_compiler_barrier();
174 p->inflights++;
175 p->stats.tx_pkts++;
176 p->hist_head++;
177 }
178exit:
179 qid->cq_next_tx = cq_idx;
180 return i;
181}
182
183static uint32_t
184sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
185 uint32_t iq_num, unsigned int count __rte_unused)
186{
187 uint32_t cq_id = qid->cq_map[0];
188 struct sw_port *port = &sw->ports[cq_id];
189
190 /* get max burst enq size for cq_ring */
191 uint32_t count_free = sw->cq_ring_space[cq_id];
192 if (count_free == 0)
193 return 0;
194
195 /* burst dequeue from the QID IQ ring */
196 struct sw_iq *iq = &qid->iq[iq_num];
197 uint32_t ret = iq_dequeue_burst(sw, iq,
198 &port->cq_buf[port->cq_buf_count], count_free);
199 port->cq_buf_count += ret;
200
201 /* Update QID, Port and Total TX stats */
202 qid->stats.tx_pkts += ret;
203 port->stats.tx_pkts += ret;
204
205 /* Subtract credits from cached value */
206 sw->cq_ring_space[cq_id] -= ret;
207
208 return ret;
209}
210
211static uint32_t
212sw_schedule_qid_to_cq(struct sw_evdev *sw)
213{
214 uint32_t pkts = 0;
215 uint32_t qid_idx;
216
217 sw->sched_cq_qid_called++;
218
219 for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
220 struct sw_qid *qid = sw->qids_prioritized[qid_idx];
221
222 int type = qid->type;
223 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
224
225 /* zero mapped CQs indicates directed */
9f95a23c 226 if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0)
11fdf7f2
TL
227 continue;
228
229 uint32_t pkts_done = 0;
230 uint32_t count = iq_count(&qid->iq[iq_num]);
231
232 if (count > 0) {
233 if (type == SW_SCHED_TYPE_DIRECT)
234 pkts_done += sw_schedule_dir_to_cq(sw, qid,
235 iq_num, count);
236 else if (type == RTE_SCHED_TYPE_ATOMIC)
237 pkts_done += sw_schedule_atomic_to_cq(sw, qid,
238 iq_num, count);
239 else
240 pkts_done += sw_schedule_parallel_to_cq(sw, qid,
241 iq_num, count,
242 type == RTE_SCHED_TYPE_ORDERED);
243 }
244
245 /* Check if the IQ that was polled is now empty, and unset it
246 * in the IQ mask if its empty.
247 */
248 int all_done = (pkts_done == count);
249
250 qid->iq_pkt_mask &= ~(all_done << (iq_num));
251 pkts += pkts_done;
252 }
253
254 return pkts;
255}
256
257/* This function will perform re-ordering of packets, and injecting into
258 * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
259 * contiguous in that array, this function accepts a "range" of QIDs to scan.
260 */
261static uint16_t
262sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
263{
264 /* Perform egress reordering */
265 struct rte_event *qe;
266 uint32_t pkts_iter = 0;
267
268 for (; qid_start < qid_end; qid_start++) {
269 struct sw_qid *qid = &sw->qids[qid_start];
270 int i, num_entries_in_use;
271
272 if (qid->type != RTE_SCHED_TYPE_ORDERED)
273 continue;
274
275 num_entries_in_use = rte_ring_free_count(
276 qid->reorder_buffer_freelist);
277
278 for (i = 0; i < num_entries_in_use; i++) {
279 struct reorder_buffer_entry *entry;
280 int j;
281
282 entry = &qid->reorder_buffer[qid->reorder_buffer_index];
283
284 if (!entry->ready)
285 break;
286
287 for (j = 0; j < entry->num_fragments; j++) {
288 uint16_t dest_qid;
289 uint16_t dest_iq;
290
291 int idx = entry->fragment_index + j;
292 qe = &entry->fragments[idx];
293
294 dest_qid = qe->queue_id;
295 dest_iq = PRIO_TO_IQ(qe->priority);
296
297 if (dest_qid >= sw->qid_count) {
298 sw->stats.rx_dropped++;
299 continue;
300 }
301
302 pkts_iter++;
303
304 struct sw_qid *q = &sw->qids[dest_qid];
305 struct sw_iq *iq = &q->iq[dest_iq];
306
307 /* we checked for space above, so enqueue must
308 * succeed
309 */
310 iq_enqueue(sw, iq, qe);
311 q->iq_pkt_mask |= (1 << (dest_iq));
312 q->iq_pkt_count[dest_iq]++;
313 q->stats.rx_pkts++;
314 }
315
316 entry->ready = (j != entry->num_fragments);
317 entry->num_fragments -= j;
318 entry->fragment_index += j;
319
320 if (!entry->ready) {
321 entry->fragment_index = 0;
322
323 rte_ring_sp_enqueue(
324 qid->reorder_buffer_freelist,
325 entry);
326
327 qid->reorder_buffer_index++;
328 qid->reorder_buffer_index %= qid->window_size;
329 }
330 }
331 }
332 return pkts_iter;
333}
334
335static __rte_always_inline void
336sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
337{
338 RTE_SET_USED(sw);
339 struct rte_event_ring *worker = port->rx_worker_ring;
340 port->pp_buf_start = 0;
341 port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
342 RTE_DIM(port->pp_buf), NULL);
343}
344
345static __rte_always_inline uint32_t
346__pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
347{
348 static struct reorder_buffer_entry dummy_rob;
349 uint32_t pkts_iter = 0;
350 struct sw_port *port = &sw->ports[port_id];
351
352 /* If shadow ring has 0 pkts, pull from worker ring */
353 if (port->pp_buf_count == 0)
354 sw_refill_pp_buf(sw, port);
355
356 while (port->pp_buf_count) {
357 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
358 struct sw_hist_list_entry *hist_entry = NULL;
359 uint8_t flags = qe->op;
360 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
361 int needs_reorder = 0;
362 /* if no-reordering, having PARTIAL == NEW */
363 if (!allow_reorder && !eop)
364 flags = QE_FLAG_VALID;
365
366 /*
367 * if we don't have space for this packet in an IQ,
368 * then move on to next queue. Technically, for a
369 * packet that needs reordering, we don't need to check
370 * here, but it simplifies things not to special-case
371 */
372 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
373 struct sw_qid *qid = &sw->qids[qe->queue_id];
374
375 /* now process based on flags. Note that for directed
376 * queues, the enqueue_flush masks off all but the
377 * valid flag. This makes FWD and PARTIAL enqueues just
378 * NEW type, and makes DROPS no-op calls.
379 */
380 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
381 const uint32_t hist_tail = port->hist_tail &
382 (SW_PORT_HIST_LIST - 1);
383
384 hist_entry = &port->hist_list[hist_tail];
385 const uint32_t hist_qid = hist_entry->qid;
386 const uint32_t hist_fid = hist_entry->fid;
387
388 struct sw_fid_t *fid =
389 &sw->qids[hist_qid].fids[hist_fid];
390 fid->pcount -= eop;
391 if (fid->pcount == 0)
392 fid->cq = -1;
393
394 if (allow_reorder) {
395 /* set reorder ready if an ordered QID */
396 uintptr_t rob_ptr =
397 (uintptr_t)hist_entry->rob_entry;
398 const uintptr_t valid = (rob_ptr != 0);
399 needs_reorder = valid;
400 rob_ptr |=
401 ((valid - 1) & (uintptr_t)&dummy_rob);
402 struct reorder_buffer_entry *tmp_rob_ptr =
403 (struct reorder_buffer_entry *)rob_ptr;
404 tmp_rob_ptr->ready = eop * needs_reorder;
405 }
406
407 port->inflights -= eop;
408 port->hist_tail += eop;
409 }
410 if (flags & QE_FLAG_VALID) {
411 port->stats.rx_pkts++;
412
413 if (allow_reorder && needs_reorder) {
414 struct reorder_buffer_entry *rob_entry =
415 hist_entry->rob_entry;
416
417 hist_entry->rob_entry = NULL;
418 /* Although fragmentation not currently
419 * supported by eventdev API, we support it
420 * here. Open: How do we alert the user that
421 * they've exceeded max frags?
422 */
423 int num_frag = rob_entry->num_fragments;
424 if (num_frag == SW_FRAGMENTS_MAX)
425 sw->stats.rx_dropped++;
426 else {
427 int idx = rob_entry->num_fragments++;
428 rob_entry->fragments[idx] = *qe;
429 }
430 goto end_qe;
431 }
432
433 /* Use the iq_num from above to push the QE
434 * into the qid at the right priority
435 */
436
437 qid->iq_pkt_mask |= (1 << (iq_num));
438 iq_enqueue(sw, &qid->iq[iq_num], qe);
439 qid->iq_pkt_count[iq_num]++;
440 qid->stats.rx_pkts++;
441 pkts_iter++;
442 }
443
444end_qe:
445 port->pp_buf_start++;
446 port->pp_buf_count--;
447 } /* while (avail_qes) */
448
449 return pkts_iter;
450}
451
452static uint32_t
453sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
454{
455 return __pull_port_lb(sw, port_id, 1);
456}
457
458static uint32_t
459sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
460{
461 return __pull_port_lb(sw, port_id, 0);
462}
463
464static uint32_t
465sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
466{
467 uint32_t pkts_iter = 0;
468 struct sw_port *port = &sw->ports[port_id];
469
470 /* If shadow ring has 0 pkts, pull from worker ring */
471 if (port->pp_buf_count == 0)
472 sw_refill_pp_buf(sw, port);
473
474 while (port->pp_buf_count) {
475 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
476 uint8_t flags = qe->op;
477
478 if ((flags & QE_FLAG_VALID) == 0)
479 goto end_qe;
480
481 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
482 struct sw_qid *qid = &sw->qids[qe->queue_id];
483 struct sw_iq *iq = &qid->iq[iq_num];
484
485 port->stats.rx_pkts++;
486
487 /* Use the iq_num from above to push the QE
488 * into the qid at the right priority
489 */
490 qid->iq_pkt_mask |= (1 << (iq_num));
491 iq_enqueue(sw, iq, qe);
492 qid->iq_pkt_count[iq_num]++;
493 qid->stats.rx_pkts++;
494 pkts_iter++;
495
496end_qe:
497 port->pp_buf_start++;
498 port->pp_buf_count--;
499 } /* while port->pp_buf_count */
500
501 return pkts_iter;
502}
503
504void
505sw_event_schedule(struct rte_eventdev *dev)
506{
507 struct sw_evdev *sw = sw_pmd_priv(dev);
508 uint32_t in_pkts, out_pkts;
509 uint32_t out_pkts_total = 0, in_pkts_total = 0;
510 int32_t sched_quanta = sw->sched_quanta;
511 uint32_t i;
512
513 sw->sched_called++;
514 if (unlikely(!sw->started))
515 return;
516
517 do {
518 uint32_t in_pkts_this_iteration = 0;
519
520 /* Pull from rx_ring for ports */
521 do {
522 in_pkts = 0;
9f95a23c
TL
523 for (i = 0; i < sw->port_count; i++) {
524 /* ack the unlinks in progress as done */
525 if (sw->ports[i].unlinks_in_progress)
526 sw->ports[i].unlinks_in_progress = 0;
527
11fdf7f2
TL
528 if (sw->ports[i].is_directed)
529 in_pkts += sw_schedule_pull_port_dir(sw, i);
530 else if (sw->ports[i].num_ordered_qids > 0)
531 in_pkts += sw_schedule_pull_port_lb(sw, i);
532 else
533 in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
9f95a23c 534 }
11fdf7f2
TL
535
536 /* QID scan for re-ordered */
537 in_pkts += sw_schedule_reorder(sw, 0,
538 sw->qid_count);
539 in_pkts_this_iteration += in_pkts;
540 } while (in_pkts > 4 &&
541 (int)in_pkts_this_iteration < sched_quanta);
542
543 out_pkts = sw_schedule_qid_to_cq(sw);
544 out_pkts_total += out_pkts;
545 in_pkts_total += in_pkts_this_iteration;
546
547 if (in_pkts == 0 && out_pkts == 0)
548 break;
549 } while ((int)out_pkts_total < sched_quanta);
550
551 sw->stats.tx_pkts += out_pkts_total;
552 sw->stats.rx_pkts += in_pkts_total;
553
554 sw->sched_no_iq_enqueues += (in_pkts_total == 0);
555 sw->sched_no_cq_enqueues += (out_pkts_total == 0);
556
557 /* push all the internal buffered QEs in port->cq_ring to the
558 * worker cores: aka, do the ring transfers batched.
559 */
560 for (i = 0; i < sw->port_count; i++) {
561 struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
562 rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
563 sw->ports[i].cq_buf_count,
564 &sw->cq_ring_space[i]);
565 sw->ports[i].cq_buf_count = 0;
566 }
567
568}