]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/dpdk/examples/performance-thread/common/lthread_queue.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / spdk / dpdk / examples / performance-thread / common / lthread_queue.h
1 /*
2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2015 Intel Corporation.
4 * Copyright 2010-2011 Dmitry Vyukov
5 */
6
7 #ifndef LTHREAD_QUEUE_H_
8 #define LTHREAD_QUEUE_H_
9
10 #ifdef __cplusplus
11 extern "C" {
12 #endif
13
14 #include <string.h>
15
16 #include <rte_prefetch.h>
17 #include <rte_per_lcore.h>
18
19 #include "lthread_int.h"
20 #include "lthread.h"
21 #include "lthread_diag.h"
22 #include "lthread_pool.h"
23
24 struct lthread_queue;
25
26 /*
27 * This file implements an unbounded FIFO queue based on a lock free
28 * linked list.
29 *
30 * The queue is non-intrusive in that it uses intermediate nodes, and does
31 * not require these nodes to be inserted into the object being placed
32 * in the queue.
33 *
34 * This is slightly more efficient than the very similar queue in lthread_pool
35 * in that it does not have to swing a stub node as the queue becomes empty.
36 *
37 * The queue access functions allocate and free intermediate node
38 * transparently from/to a per scheduler pool ( see lthread_pool.h ).
39 *
40 * The queue provides both MPSC and SPSC insert methods
41 */
42
43 /*
44 * define a queue of lthread nodes
45 */
46 struct lthread_queue {
47 struct qnode *head;
48 struct qnode *tail __rte_cache_aligned;
49 struct lthread_queue *p;
50 char name[LT_MAX_NAME_SIZE];
51
52 DIAG_COUNT_DEFINE(rd);
53 DIAG_COUNT_DEFINE(wr);
54 DIAG_COUNT_DEFINE(size);
55
56 } __rte_cache_aligned;
57
58
59
60 static inline struct lthread_queue *
61 _lthread_queue_create(const char *name)
62 {
63 struct qnode *stub;
64 struct lthread_queue *new_queue;
65
66 new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
67 RTE_CACHE_LINE_SIZE,
68 rte_socket_id());
69 if (new_queue == NULL)
70 return NULL;
71
72 /* allocated stub node */
73 stub = _qnode_alloc();
74 RTE_ASSERT(stub);
75
76 if (name != NULL)
77 strncpy(new_queue->name, name, sizeof(new_queue->name));
78 new_queue->name[sizeof(new_queue->name)-1] = 0;
79
80 /* initialize queue as empty */
81 stub->next = NULL;
82 new_queue->head = stub;
83 new_queue->tail = stub;
84
85 DIAG_COUNT_INIT(new_queue, rd);
86 DIAG_COUNT_INIT(new_queue, wr);
87 DIAG_COUNT_INIT(new_queue, size);
88
89 return new_queue;
90 }
91
92 /**
93 * Return true if the queue is empty
94 */
95 static __rte_always_inline int
96 _lthread_queue_empty(struct lthread_queue *q)
97 {
98 return q->tail == q->head;
99 }
100
101
102
103 /**
104 * Destroy a queue
105 * fail if queue is not empty
106 */
107 static inline int _lthread_queue_destroy(struct lthread_queue *q)
108 {
109 if (q == NULL)
110 return -1;
111
112 if (!_lthread_queue_empty(q))
113 return -1;
114
115 _qnode_free(q->head);
116 rte_free(q);
117 return 0;
118 }
119
120 RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
121
122 /*
123 * Insert a node into a queue
124 * this implementation is multi producer safe
125 */
126 static __rte_always_inline struct qnode *
127 _lthread_queue_insert_mp(struct lthread_queue
128 *q, void *data)
129 {
130 struct qnode *prev;
131 struct qnode *n = _qnode_alloc();
132
133 if (n == NULL)
134 return NULL;
135
136 /* set object in node */
137 n->data = data;
138 n->next = NULL;
139
140 /* this is an MPSC method, perform a locked update */
141 prev = n;
142 prev =
143 (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
144 (uint64_t) prev);
145 /* there is a window of inconsistency until prev next is set,
146 * which is why remove must retry
147 */
148 prev->next = n;
149
150 DIAG_COUNT_INC(q, wr);
151 DIAG_COUNT_INC(q, size);
152
153 return n;
154 }
155
156 /*
157 * Insert an node into a queue in single producer mode
158 * this implementation is NOT mult producer safe
159 */
160 static __rte_always_inline struct qnode *
161 _lthread_queue_insert_sp(struct lthread_queue
162 *q, void *data)
163 {
164 /* allocate a queue node */
165 struct qnode *prev;
166 struct qnode *n = _qnode_alloc();
167
168 if (n == NULL)
169 return NULL;
170
171 /* set data in node */
172 n->data = data;
173 n->next = NULL;
174
175 /* this is an SPSC method, no need for locked exchange operation */
176 prev = q->head;
177 prev->next = q->head = n;
178
179 DIAG_COUNT_INC(q, wr);
180 DIAG_COUNT_INC(q, size);
181
182 return n;
183 }
184
185 /*
186 * Remove a node from a queue
187 */
188 static __rte_always_inline void *
189 _lthread_queue_poll(struct lthread_queue *q)
190 {
191 void *data = NULL;
192 struct qnode *tail = q->tail;
193 struct qnode *next = (struct qnode *)tail->next;
194 /*
195 * There is a small window of inconsistency between producer and
196 * consumer whereby the queue may appear empty if consumer and
197 * producer access it at the same time.
198 * The consumer must handle this by retrying
199 */
200
201 if (likely(next != NULL)) {
202 q->tail = next;
203 tail->data = next->data;
204 data = tail->data;
205
206 /* free the node */
207 _qnode_free(tail);
208
209 DIAG_COUNT_INC(q, rd);
210 DIAG_COUNT_DEC(q, size);
211 return data;
212 }
213 return NULL;
214 }
215
216 /*
217 * Remove a node from a queue
218 */
219 static __rte_always_inline void *
220 _lthread_queue_remove(struct lthread_queue *q)
221 {
222 void *data = NULL;
223
224 /*
225 * There is a small window of inconsistency between producer and
226 * consumer whereby the queue may appear empty if consumer and
227 * producer access it at the same time. We handle this by retrying
228 */
229 do {
230 data = _lthread_queue_poll(q);
231
232 if (likely(data != NULL)) {
233
234 DIAG_COUNT_INC(q, rd);
235 DIAG_COUNT_DEC(q, size);
236 return data;
237 }
238 rte_compiler_barrier();
239 } while (unlikely(!_lthread_queue_empty(q)));
240 return NULL;
241 }
242
243 #ifdef __cplusplus
244 }
245 #endif
246
247 #endif /* LTHREAD_QUEUE_H_ */