1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation
8 #include <rte_bus_vdev.h>
10 #include <rte_cycles.h>
11 #include <rte_memzone.h>
13 #include "opdl_evdev.h"
14 #include "opdl_ring.h"
18 static __rte_always_inline
uint32_t
19 enqueue_check(struct opdl_port
*p
,
20 const struct rte_event ev
[],
26 if (p
->opdl
->do_validation
) {
28 for (i
= 0; i
< num
; i
++) {
29 if (ev
[i
].queue_id
!= p
->next_external_qid
) {
30 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
31 "ERROR - port:[%u] - event wants"
32 " to enq to q_id[%u],"
33 " but should be [%u]",
34 opdl_pmd_dev_id(p
->opdl
),
37 p
->next_external_qid
);
44 if (p
->p_type
== OPDL_PURE_RX_PORT
||
45 p
->p_type
== OPDL_ASYNC_PORT
) {
48 p
->port_stat
[claim_pkts_requested
] += num
;
49 p
->port_stat
[claim_pkts_granted
] += num_events
;
50 p
->port_stat
[claim_non_empty
]++;
51 p
->start_cycles
= rte_rdtsc();
53 p
->port_stat
[claim_empty
]++;
57 if (p
->start_cycles
) {
58 uint64_t end_cycles
= rte_rdtsc();
59 p
->port_stat
[total_cycles
] +=
60 end_cycles
- p
->start_cycles
;
65 ev
[0].queue_id
!= p
->next_external_qid
) {
74 static __rte_always_inline
void
75 update_on_dequeue(struct opdl_port
*p
,
76 struct rte_event ev
[],
80 if (p
->opdl
->do_validation
) {
82 for (i
= 0; i
< num
; i
++)
84 p
->opdl
->queue
[p
->queue_id
].external_qid
;
88 p
->port_stat
[claim_pkts_requested
] += num
;
89 p
->port_stat
[claim_pkts_granted
] += num_events
;
90 p
->port_stat
[claim_non_empty
]++;
91 p
->start_cycles
= rte_rdtsc();
93 p
->port_stat
[claim_empty
]++;
99 p
->opdl
->queue
[p
->queue_id
].external_qid
;
111 opdl_rx_error_enqueue(struct opdl_port
*p
,
112 const struct rte_event ev
[],
127 * This function handles enqueue for a single input stage_inst with
128 * threadsafe disabled or enabled. eg 1 thread using a stage_inst or
129 * multiple threads sharing a stage_inst
133 opdl_rx_enqueue(struct opdl_port
*p
,
134 const struct rte_event ev
[],
137 uint16_t enqueued
= 0;
139 enqueued
= opdl_ring_input(opdl_stage_get_opdl_ring(p
->enq_stage_inst
),
143 if (!enqueue_check(p
, ev
, num
, enqueued
))
159 opdl_tx_error_dequeue(struct opdl_port
*p
,
160 struct rte_event ev
[],
173 * TX single threaded claim
175 * This function handles dequeue for a single worker stage_inst with
176 * threadsafe disabled. eg 1 thread using an stage_inst
180 opdl_tx_dequeue_single_thread(struct opdl_port
*p
,
181 struct rte_event ev
[],
186 struct opdl_ring
*ring
;
188 ring
= opdl_stage_get_opdl_ring(p
->deq_stage_inst
);
190 returned
= opdl_ring_copy_to_burst(ring
,
196 update_on_dequeue(p
, ev
, num
, returned
);
202 * TX multi threaded claim
204 * This function handles dequeue for multiple worker stage_inst with
205 * threadsafe disabled. eg multiple stage_inst each with its own instance
209 opdl_tx_dequeue_multi_inst(struct opdl_port
*p
,
210 struct rte_event ev
[],
213 uint32_t num_events
= 0;
215 num_events
= opdl_stage_claim(p
->deq_stage_inst
,
222 update_on_dequeue(p
, ev
, num
, num_events
);
224 return opdl_stage_disclaim(p
->deq_stage_inst
, num_events
, false);
229 * Worker thread claim
234 opdl_claim(struct opdl_port
*p
, struct rte_event ev
[], uint16_t num
)
236 uint32_t num_events
= 0;
238 if (unlikely(num
> MAX_OPDL_CONS_Q_DEPTH
)) {
239 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
240 "Attempt to dequeue num of events larger than port (%d) max",
241 opdl_pmd_dev_id(p
->opdl
),
248 num_events
= opdl_stage_claim(p
->deq_stage_inst
,
256 update_on_dequeue(p
, ev
, num
, num_events
);
262 * Worker thread disclaim
266 opdl_disclaim(struct opdl_port
*p
, const struct rte_event ev
[], uint16_t num
)
268 uint16_t enqueued
= 0;
272 for (i
= 0; i
< num
; i
++)
273 opdl_ring_cas_slot(p
->enq_stage_inst
, &ev
[i
],
276 enqueued
= opdl_stage_disclaim(p
->enq_stage_inst
,
280 return enqueue_check(p
, ev
, num
, enqueued
);
283 static __rte_always_inline
struct opdl_stage
*
284 stage_for_port(struct opdl_queue
*q
, unsigned int i
)
286 if (q
->q_pos
== OPDL_Q_POS_START
|| q
->q_pos
== OPDL_Q_POS_MIDDLE
)
287 return q
->ports
[i
]->enq_stage_inst
;
289 return q
->ports
[i
]->deq_stage_inst
;
292 static int opdl_add_deps(struct opdl_evdev
*device
,
298 struct opdl_ring
*ring
;
299 struct opdl_queue
*queue
= &device
->queue
[q_id
];
300 struct opdl_queue
*queue_deps
= &device
->queue
[deps_q_id
];
301 struct opdl_stage
*dep_stages
[OPDL_PORTS_MAX
];
303 /* sanity check that all stages are for same opdl ring */
304 for (i
= 0; i
< queue
->nb_ports
; i
++) {
305 struct opdl_ring
*r
=
306 opdl_stage_get_opdl_ring(stage_for_port(queue
, i
));
307 for (j
= 0; j
< queue_deps
->nb_ports
; j
++) {
308 struct opdl_ring
*rj
=
309 opdl_stage_get_opdl_ring(
310 stage_for_port(queue_deps
, j
));
312 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
313 "Stages and dependents"
314 " are not for same opdl ring",
315 opdl_pmd_dev_id(device
));
317 for (k
= 0; k
< device
->nb_opdls
; k
++) {
318 opdl_ring_dump(device
->opdl
[k
],
326 /* Gather all stages instance in deps */
327 for (i
= 0; i
< queue_deps
->nb_ports
; i
++)
328 dep_stages
[i
] = stage_for_port(queue_deps
, i
);
331 /* Add all deps for each port->stage_inst in this queue */
332 for (i
= 0; i
< queue
->nb_ports
; i
++) {
334 ring
= opdl_stage_get_opdl_ring(stage_for_port(queue
, i
));
336 status
= opdl_stage_deps_add(ring
,
337 stage_for_port(queue
, i
),
338 queue
->ports
[i
]->num_instance
,
339 queue
->ports
[i
]->instance_id
,
341 queue_deps
->nb_ports
);
350 opdl_add_event_handlers(struct rte_eventdev
*dev
)
354 struct opdl_evdev
*device
= opdl_pmd_priv(dev
);
357 for (i
= 0; i
< device
->max_port_nb
; i
++) {
359 struct opdl_port
*port
= &device
->ports
[i
];
361 if (port
->configured
) {
362 if (port
->p_type
== OPDL_PURE_RX_PORT
) {
363 port
->enq
= opdl_rx_enqueue
;
364 port
->deq
= opdl_tx_error_dequeue
;
366 } else if (port
->p_type
== OPDL_PURE_TX_PORT
) {
368 port
->enq
= opdl_rx_error_enqueue
;
370 if (port
->num_instance
== 1)
372 opdl_tx_dequeue_single_thread
;
374 port
->deq
= opdl_tx_dequeue_multi_inst
;
376 } else if (port
->p_type
== OPDL_REGULAR_PORT
) {
378 port
->enq
= opdl_disclaim
;
379 port
->deq
= opdl_claim
;
381 } else if (port
->p_type
== OPDL_ASYNC_PORT
) {
383 port
->enq
= opdl_rx_enqueue
;
385 /* Always single instance */
386 port
->deq
= opdl_tx_dequeue_single_thread
;
388 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
389 "port:[%u] has invalid port type - ",
390 opdl_pmd_dev_id(port
->opdl
),
395 port
->initialized
= 1;
400 fprintf(stdout
, "Success - enqueue/dequeue handler(s) added\n");
405 build_all_dependencies(struct rte_eventdev
*dev
)
410 struct opdl_evdev
*device
= opdl_pmd_priv(dev
);
412 uint8_t start_qid
= 0;
414 for (i
= 0; i
< RTE_EVENT_MAX_QUEUES_PER_DEV
; i
++) {
415 struct opdl_queue
*queue
= &device
->queue
[i
];
416 if (!queue
->initialized
)
419 if (queue
->q_pos
== OPDL_Q_POS_START
) {
424 if (queue
->q_pos
== OPDL_Q_POS_MIDDLE
) {
425 err
= opdl_add_deps(device
, i
, i
-1);
427 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
428 "dependency addition for queue:[%u] - FAILED",
430 queue
->external_qid
);
435 if (queue
->q_pos
== OPDL_Q_POS_END
) {
436 /* Add this dependency */
437 err
= opdl_add_deps(device
, i
, i
-1);
439 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
440 "dependency addition for queue:[%u] - FAILED",
442 queue
->external_qid
);
445 /* Add dependency for rx on tx */
446 err
= opdl_add_deps(device
, start_qid
, i
);
448 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
449 "dependency addition for queue:[%u] - FAILED",
451 queue
->external_qid
);
458 fprintf(stdout
, "Success - dependencies built\n");
463 check_queues_linked(struct rte_eventdev
*dev
)
468 struct opdl_evdev
*device
= opdl_pmd_priv(dev
);
471 for (i
= 0; i
< RTE_EVENT_MAX_QUEUES_PER_DEV
; i
++) {
472 struct opdl_queue
*queue
= &device
->queue
[i
];
474 if (!queue
->initialized
)
477 if (queue
->external_qid
== OPDL_INVALID_QID
)
480 if (queue
->nb_ports
== 0) {
481 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
482 "queue:[%u] has no associated ports",
490 if ((i
- nb_iq
) != device
->max_queue_nb
) {
491 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
492 "%u queues counted but should be %u",
495 device
->max_queue_nb
);
504 destroy_queues_and_rings(struct rte_eventdev
*dev
)
506 struct opdl_evdev
*device
= opdl_pmd_priv(dev
);
509 for (i
= 0; i
< device
->nb_opdls
; i
++) {
511 opdl_ring_free(device
->opdl
[i
]);
514 memset(&device
->queue
,
516 sizeof(struct opdl_queue
)
517 * RTE_EVENT_MAX_QUEUES_PER_DEV
);
520 #define OPDL_ID(d)(d->nb_opdls - 1)
522 static __rte_always_inline
void
523 initialise_queue(struct opdl_evdev
*device
,
527 struct opdl_queue
*queue
= &device
->queue
[device
->nb_queues
];
530 queue
->q_type
= OPDL_Q_TYPE_ORDERED
;
531 queue
->external_qid
= OPDL_INVALID_QID
;
533 queue
->q_type
= device
->q_md
[i
].type
;
534 queue
->external_qid
= device
->q_md
[i
].ext_id
;
535 /* Add ex->in for queues setup */
536 device
->q_map_ex_to_in
[queue
->external_qid
] = device
->nb_queues
;
538 queue
->opdl_id
= OPDL_ID(device
);
541 queue
->configured
= 1;
547 static __rte_always_inline
int
548 create_opdl(struct opdl_evdev
*device
)
552 char name
[RTE_MEMZONE_NAMESIZE
];
554 snprintf(name
, RTE_MEMZONE_NAMESIZE
,
555 "%s_%u", device
->service_name
, device
->nb_opdls
);
557 device
->opdl
[device
->nb_opdls
] =
558 opdl_ring_create(name
,
559 device
->nb_events_limit
,
560 sizeof(struct rte_event
),
561 device
->max_port_nb
* 2,
564 if (!device
->opdl
[device
->nb_opdls
]) {
565 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
566 "opdl ring %u creation - FAILED",
567 opdl_pmd_dev_id(device
),
576 static __rte_always_inline
int
577 create_link_opdl(struct opdl_evdev
*device
, uint32_t index
)
582 if (device
->q_md
[index
+ 1].type
!=
583 OPDL_Q_TYPE_SINGLE_LINK
) {
585 /* async queue with regular
589 /* create a new opdl ring */
590 err
= create_opdl(device
);
593 * dummy queue for new opdl
595 initialise_queue(device
,
602 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
603 "queue %u, two consecutive"
604 " SINGLE_LINK queues, not allowed",
605 opdl_pmd_dev_id(device
),
614 create_queues_and_rings(struct rte_eventdev
*dev
)
618 struct opdl_evdev
*device
= opdl_pmd_priv(dev
);
620 device
->nb_queues
= 0;
622 if (device
->nb_ports
!= device
->max_port_nb
) {
623 PMD_DRV_LOG(ERR
, "Number ports setup:%u NOT EQUAL to max port"
624 " number:%u for this device",
626 device
->max_port_nb
);
631 /* We will have at least one opdl so create it now */
632 err
= create_opdl(device
);
637 /* Create 1st "dummy" queue */
638 initialise_queue(device
,
643 for (i
= 0; i
< device
->nb_q_md
; i
++) {
646 if (!device
->q_md
[i
].setup
) {
648 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
649 "queue meta data slot %u"
650 " not setup - FAILING",
655 } else if (device
->q_md
[i
].type
!=
656 OPDL_Q_TYPE_SINGLE_LINK
) {
658 if (!device
->q_md
[i
+ 1].setup
) {
659 /* Create a simple ORDERED/ATOMIC
662 initialise_queue(device
,
667 /* Create a simple ORDERED/ATOMIC
668 * queue in the middle
670 initialise_queue(device
,
674 } else if (device
->q_md
[i
].type
==
675 OPDL_Q_TYPE_SINGLE_LINK
) {
677 /* create last queue for this opdl */
678 initialise_queue(device
,
682 err
= create_link_opdl(device
, i
);
692 destroy_queues_and_rings(dev
);
699 initialise_all_other_ports(struct rte_eventdev
*dev
)
702 struct opdl_stage
*stage_inst
= NULL
;
704 struct opdl_evdev
*device
= opdl_pmd_priv(dev
);
707 for (i
= 0; i
< device
->nb_ports
; i
++) {
708 struct opdl_port
*port
= &device
->ports
[i
];
709 struct opdl_queue
*queue
= &device
->queue
[port
->queue_id
];
711 if (port
->queue_id
== 0) {
713 } else if (queue
->q_type
!= OPDL_Q_TYPE_SINGLE_LINK
) {
715 if (queue
->q_pos
== OPDL_Q_POS_MIDDLE
) {
717 /* Regular port with claim/disclaim */
718 stage_inst
= opdl_stage_add(
719 device
->opdl
[queue
->opdl_id
],
722 port
->deq_stage_inst
= stage_inst
;
723 port
->enq_stage_inst
= stage_inst
;
725 if (queue
->q_type
== OPDL_Q_TYPE_ATOMIC
)
726 port
->atomic_claim
= true;
728 port
->atomic_claim
= false;
730 port
->p_type
= OPDL_REGULAR_PORT
;
732 /* Add the port to the queue array of ports */
733 queue
->ports
[queue
->nb_ports
] = port
;
734 port
->instance_id
= queue
->nb_ports
;
736 opdl_stage_set_queue_id(stage_inst
,
739 } else if (queue
->q_pos
== OPDL_Q_POS_END
) {
742 stage_inst
= opdl_stage_add(
743 device
->opdl
[queue
->opdl_id
],
746 port
->deq_stage_inst
= stage_inst
;
747 port
->enq_stage_inst
= NULL
;
748 port
->p_type
= OPDL_PURE_TX_PORT
;
750 /* Add the port to the queue array of ports */
751 queue
->ports
[queue
->nb_ports
] = port
;
752 port
->instance_id
= queue
->nb_ports
;
756 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
757 "port %u:, linked incorrectly"
758 " to a q_pos START/INVALID %u",
759 opdl_pmd_dev_id(port
->opdl
),
766 } else if (queue
->q_type
== OPDL_Q_TYPE_SINGLE_LINK
) {
768 port
->p_type
= OPDL_ASYNC_PORT
;
771 stage_inst
= opdl_stage_add(
772 device
->opdl
[queue
->opdl_id
],
774 false); /* First stage */
775 port
->deq_stage_inst
= stage_inst
;
777 /* Add the port to the queue array of ports */
778 queue
->ports
[queue
->nb_ports
] = port
;
779 port
->instance_id
= queue
->nb_ports
;
782 if (queue
->nb_ports
> 1) {
783 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
784 "queue %u:, setup as SINGLE_LINK"
785 " but has more than one port linked",
786 opdl_pmd_dev_id(port
->opdl
),
787 queue
->external_qid
);
792 /* -- single instance rx for next opdl -- */
794 device
->q_map_ex_to_in
[queue
->external_qid
] + 1;
795 if (next_qid
< RTE_EVENT_MAX_QUEUES_PER_DEV
&&
796 device
->queue
[next_qid
].configured
) {
798 /* Remap the queue */
799 queue
= &device
->queue
[next_qid
];
801 stage_inst
= opdl_stage_add(
802 device
->opdl
[queue
->opdl_id
],
805 port
->enq_stage_inst
= stage_inst
;
807 /* Add the port to the queue array of ports */
808 queue
->ports
[queue
->nb_ports
] = port
;
809 port
->instance_id
= queue
->nb_ports
;
811 if (queue
->nb_ports
> 1) {
812 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
813 "dummy queue %u: for "
815 "SINGLE_LINK but has more "
816 "than one port linked",
817 opdl_pmd_dev_id(port
->opdl
),
823 /* Set this queue to initialized as it is never
824 * referenced by any ports
826 queue
->initialized
= 1;
831 /* Now that all ports are initialised we need to
832 * setup the last bit of stage md
835 for (i
= 0; i
< device
->nb_ports
; i
++) {
836 struct opdl_port
*port
= &device
->ports
[i
];
837 struct opdl_queue
*queue
=
838 &device
->queue
[port
->queue_id
];
840 if (port
->configured
&&
841 (port
->queue_id
!= OPDL_INVALID_QID
)) {
842 if (queue
->nb_ports
== 0) {
843 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
844 "queue:[%u] has no ports"
846 opdl_pmd_dev_id(port
->opdl
),
852 port
->num_instance
= queue
->nb_ports
;
853 port
->initialized
= 1;
854 queue
->initialized
= 1;
856 PMD_DRV_LOG(ERR
, "DEV_ID:[%02d] : "
857 "Port:[%u] not configured invalid"
858 " queue configuration",
859 opdl_pmd_dev_id(port
->opdl
),
870 initialise_queue_zero_ports(struct rte_eventdev
*dev
)
874 struct opdl_stage
*stage_inst
= NULL
;
875 struct opdl_queue
*queue
= NULL
;
877 struct opdl_evdev
*device
= opdl_pmd_priv(dev
);
879 /* Assign queue zero and figure out how many Q0 ports we have */
881 for (i
= 0; i
< device
->nb_ports
; i
++) {
882 struct opdl_port
*port
= &device
->ports
[i
];
883 if (port
->queue_id
== OPDL_INVALID_QID
) {
885 port
->external_qid
= OPDL_INVALID_QID
;
886 port
->p_type
= OPDL_PURE_RX_PORT
;
891 /* Create the stage */
892 stage_inst
= opdl_stage_add(device
->opdl
[0],
893 (mt_rx
> 1 ? true : false),
897 /* Assign the new created input stage to all relevant ports */
898 for (i
= 0; i
< device
->nb_ports
; i
++) {
899 struct opdl_port
*port
= &device
->ports
[i
];
900 if (port
->queue_id
== 0) {
901 queue
= &device
->queue
[port
->queue_id
];
902 port
->enq_stage_inst
= stage_inst
;
903 port
->deq_stage_inst
= NULL
;
904 port
->configured
= 1;
905 port
->initialized
= 1;
907 queue
->ports
[queue
->nb_ports
] = port
;
908 port
->instance_id
= queue
->nb_ports
;
919 assign_internal_queue_ids(struct rte_eventdev
*dev
)
922 struct opdl_evdev
*device
= opdl_pmd_priv(dev
);
925 for (i
= 0; i
< device
->nb_ports
; i
++) {
926 struct opdl_port
*port
= &device
->ports
[i
];
927 if (port
->external_qid
!= OPDL_INVALID_QID
) {
929 device
->q_map_ex_to_in
[port
->external_qid
];
931 /* Now do the external_qid of the next queue */
932 struct opdl_queue
*queue
=
933 &device
->queue
[port
->queue_id
];
934 if (queue
->q_pos
== OPDL_Q_POS_END
)
935 port
->next_external_qid
=
936 device
->queue
[port
->queue_id
+ 2].external_qid
;
938 port
->next_external_qid
=
939 device
->queue
[port
->queue_id
+ 1].external_qid
;