1 extern crate crossbeam_deque
as deque
;
2 extern crate crossbeam_utils
as utils
;
5 use std
::sync
::atomic
::Ordering
::SeqCst
;
6 use std
::sync
::atomic
::{AtomicBool, AtomicUsize}
;
7 use std
::sync
::{Arc, Mutex}
;
9 use deque
::Steal
::{Empty, Success}
;
10 use deque
::{Injector, Worker}
;
12 use utils
::thread
::scope
;
16 let q
= Injector
::new();
17 assert_eq
!(q
.steal(), Empty
);
21 assert_eq
!(q
.steal(), Success(1));
22 assert_eq
!(q
.steal(), Success(2));
23 assert_eq
!(q
.steal(), Empty
);
26 assert_eq
!(q
.steal(), Success(3));
27 assert_eq
!(q
.steal(), Empty
);
32 let q
= Injector
::new();
33 assert
!(q
.is_empty());
36 assert
!(!q
.is_empty());
38 assert
!(!q
.is_empty());
41 assert
!(!q
.is_empty());
43 assert
!(q
.is_empty());
46 assert
!(!q
.is_empty());
48 assert
!(q
.is_empty());
53 const COUNT
: usize = 100_000;
55 let q
= Injector
::new();
61 if let Success(v
) = q
.steal() {
68 assert_eq
!(q
.steal(), Empty
);
80 const COUNT
: usize = 25_000;
81 const THREADS
: usize = 4;
83 let q
= Injector
::new();
84 let v
= (0..COUNT
).map(|_
| AtomicUsize
::new(0)).collect
::<Vec
<_
>>();
99 if let Success(n
) = q
.steal() {
100 v
[n
].fetch_add(1, SeqCst
);
111 assert_eq
!(c
.load(SeqCst
), THREADS
);
117 const THREADS
: usize = 8;
118 const COUNT
: usize = 50_000;
120 let q
= Injector
::new();
123 q
.push(Box
::new(i
+ 1));
125 let remaining
= Arc
::new(AtomicUsize
::new(COUNT
));
128 for _
in 0..THREADS
{
129 let remaining
= remaining
.clone();
132 scope
.spawn(move |_
| {
134 while remaining
.load(SeqCst
) > 0 {
135 if let Success(x
) = q
.steal() {
138 remaining
.fetch_sub(1, SeqCst
);
145 while remaining
.load(SeqCst
) > 0 {
146 if let Success(x
) = q
.steal() {
149 remaining
.fetch_sub(1, SeqCst
);
158 const THREADS
: usize = 8;
159 const COUNT
: usize = 50_000;
161 let q
= Injector
::new();
162 let done
= Arc
::new(AtomicBool
::new(false));
163 let hits
= Arc
::new(AtomicUsize
::new(0));
166 for _
in 0..THREADS
{
167 let done
= done
.clone();
168 let hits
= hits
.clone();
171 scope
.spawn(move |_
| {
172 let w2
= Worker
::new_fifo();
174 while !done
.load(SeqCst
) {
175 if let Success(_
) = q
.steal() {
176 hits
.fetch_add(1, SeqCst
);
179 let _
= q
.steal_batch(&w2
);
181 if let Success(_
) = q
.steal_batch_and_pop(&w2
) {
182 hits
.fetch_add(1, SeqCst
);
185 while let Some(_
) = w2
.pop() {
186 hits
.fetch_add(1, SeqCst
);
192 let mut rng
= rand
::thread_rng();
193 let mut expected
= 0;
194 while expected
< COUNT
{
195 if rng
.gen_range(0, 3) == 0 {
196 while let Success(_
) = q
.steal() {
197 hits
.fetch_add(1, SeqCst
);
205 while hits
.load(SeqCst
) < COUNT
{
206 while let Success(_
) = q
.steal() {
207 hits
.fetch_add(1, SeqCst
);
210 done
.store(true, SeqCst
);
217 const THREADS
: usize = 8;
218 const COUNT
: usize = 50_000;
220 let q
= Injector
::new();
221 let done
= Arc
::new(AtomicBool
::new(false));
222 let mut all_hits
= Vec
::new();
225 for _
in 0..THREADS
{
226 let done
= done
.clone();
227 let hits
= Arc
::new(AtomicUsize
::new(0));
228 all_hits
.push(hits
.clone());
231 scope
.spawn(move |_
| {
232 let w2
= Worker
::new_fifo();
234 while !done
.load(SeqCst
) {
235 if let Success(_
) = q
.steal() {
236 hits
.fetch_add(1, SeqCst
);
239 let _
= q
.steal_batch(&w2
);
241 if let Success(_
) = q
.steal_batch_and_pop(&w2
) {
242 hits
.fetch_add(1, SeqCst
);
245 while let Some(_
) = w2
.pop() {
246 hits
.fetch_add(1, SeqCst
);
252 let mut rng
= rand
::thread_rng();
255 for i
in 0..rng
.gen_range(0, COUNT
) {
256 if rng
.gen_range(0, 3) == 0 && my_hits
== 0 {
257 while let Success(_
) = q
.steal() {
265 if my_hits
> 0 && all_hits
.iter().all(|h
| h
.load(SeqCst
) > 0) {
269 done
.store(true, SeqCst
);
276 const THREADS
: usize = 8;
277 const COUNT
: usize = 50_000;
278 const STEPS
: usize = 1000;
280 struct Elem(usize, Arc
<Mutex
<Vec
<usize>>>);
284 self.1.lock().unwrap().push(self.0);
288 let q
= Injector
::new();
289 let dropped
= Arc
::new(Mutex
::new(Vec
::new()));
290 let remaining
= Arc
::new(AtomicUsize
::new(COUNT
));
293 q
.push(Elem(i
, dropped
.clone()));
297 for _
in 0..THREADS
{
298 let remaining
= remaining
.clone();
301 scope
.spawn(move |_
| {
302 let w2
= Worker
::new_fifo();
306 if let Success(_
) = q
.steal() {
308 remaining
.fetch_sub(1, SeqCst
);
311 let _
= q
.steal_batch(&w2
);
313 if let Success(_
) = q
.steal_batch_and_pop(&w2
) {
315 remaining
.fetch_sub(1, SeqCst
);
318 while let Some(_
) = w2
.pop() {
320 remaining
.fetch_sub(1, SeqCst
);
327 if let Success(_
) = q
.steal() {
328 remaining
.fetch_sub(1, SeqCst
);
334 let rem
= remaining
.load(SeqCst
);
338 let mut v
= dropped
.lock().unwrap();
339 assert_eq
!(v
.len(), COUNT
- rem
);
346 let mut v
= dropped
.lock().unwrap();
347 assert_eq
!(v
.len(), rem
);
349 for pair
in v
.windows(2) {
350 assert_eq
!(pair
[0] + 1, pair
[1]);