1 extern crate crossbeam_deque
as deque
;
2 extern crate crossbeam_utils
as utils
;
5 use std
::sync
::atomic
::Ordering
::SeqCst
;
6 use std
::sync
::atomic
::{AtomicBool, AtomicUsize}
;
7 use std
::sync
::{Arc, Mutex}
;
9 use deque
::Steal
::{Empty, Success}
;
12 use utils
::thread
::scope
;
16 let w
= Worker
::new_lifo();
18 assert_eq
!(w
.pop(), None
);
19 assert_eq
!(s
.steal(), Empty
);
22 assert_eq
!(w
.pop(), Some(1));
23 assert_eq
!(w
.pop(), None
);
24 assert_eq
!(s
.steal(), Empty
);
27 assert_eq
!(s
.steal(), Success(2));
28 assert_eq
!(s
.steal(), Empty
);
29 assert_eq
!(w
.pop(), None
);
34 assert_eq
!(s
.steal(), Success(3));
35 assert_eq
!(s
.steal(), Success(4));
36 assert_eq
!(s
.steal(), Success(5));
37 assert_eq
!(s
.steal(), Empty
);
43 assert_eq
!(w
.pop(), Some(9));
44 assert_eq
!(s
.steal(), Success(6));
45 assert_eq
!(w
.pop(), Some(8));
46 assert_eq
!(w
.pop(), Some(7));
47 assert_eq
!(w
.pop(), None
);
52 let w
= Worker
::new_lifo();
55 assert
!(w
.is_empty());
57 assert
!(!w
.is_empty());
59 assert
!(!w
.is_empty());
61 assert
!(!w
.is_empty());
63 assert
!(w
.is_empty());
65 assert
!(s
.is_empty());
67 assert
!(!s
.is_empty());
69 assert
!(!s
.is_empty());
71 assert
!(!s
.is_empty());
73 assert
!(s
.is_empty());
78 const STEPS
: usize = 50_000;
80 let w
= Worker
::new_lifo();
87 if let Success(v
) = s
.steal() {
94 assert_eq
!(s
.steal(), Empty
);
106 const THREADS
: usize = 8;
107 const COUNT
: usize = 50_000;
109 let w
= Worker
::new_lifo();
112 w
.push(Box
::new(i
+ 1));
114 let remaining
= Arc
::new(AtomicUsize
::new(COUNT
));
117 for _
in 0..THREADS
{
119 let remaining
= remaining
.clone();
121 scope
.spawn(move |_
| {
123 while remaining
.load(SeqCst
) > 0 {
124 if let Success(x
) = s
.steal() {
127 remaining
.fetch_sub(1, SeqCst
);
133 let mut last
= COUNT
+ 1;
134 while remaining
.load(SeqCst
) > 0 {
135 if let Some(x
) = w
.pop() {
138 remaining
.fetch_sub(1, SeqCst
);
147 const THREADS
: usize = 8;
148 const COUNT
: usize = 50_000;
150 let w
= Worker
::new_lifo();
151 let done
= Arc
::new(AtomicBool
::new(false));
152 let hits
= Arc
::new(AtomicUsize
::new(0));
155 for _
in 0..THREADS
{
157 let done
= done
.clone();
158 let hits
= hits
.clone();
160 scope
.spawn(move |_
| {
161 let w2
= Worker
::new_lifo();
163 while !done
.load(SeqCst
) {
164 if let Success(_
) = s
.steal() {
165 hits
.fetch_add(1, SeqCst
);
168 let _
= s
.steal_batch(&w2
);
170 if let Success(_
) = s
.steal_batch_and_pop(&w2
) {
171 hits
.fetch_add(1, SeqCst
);
174 while let Some(_
) = w2
.pop() {
175 hits
.fetch_add(1, SeqCst
);
181 let mut rng
= rand
::thread_rng();
182 let mut expected
= 0;
183 while expected
< COUNT
{
184 if rng
.gen_range(0, 3) == 0 {
185 while let Some(_
) = w
.pop() {
186 hits
.fetch_add(1, SeqCst
);
194 while hits
.load(SeqCst
) < COUNT
{
195 while let Some(_
) = w
.pop() {
196 hits
.fetch_add(1, SeqCst
);
199 done
.store(true, SeqCst
);
206 const THREADS
: usize = 8;
207 const COUNT
: usize = 50_000;
209 let w
= Worker
::new_lifo();
210 let done
= Arc
::new(AtomicBool
::new(false));
211 let mut all_hits
= Vec
::new();
214 for _
in 0..THREADS
{
216 let done
= done
.clone();
217 let hits
= Arc
::new(AtomicUsize
::new(0));
218 all_hits
.push(hits
.clone());
220 scope
.spawn(move |_
| {
221 let w2
= Worker
::new_lifo();
223 while !done
.load(SeqCst
) {
224 if let Success(_
) = s
.steal() {
225 hits
.fetch_add(1, SeqCst
);
228 let _
= s
.steal_batch(&w2
);
230 if let Success(_
) = s
.steal_batch_and_pop(&w2
) {
231 hits
.fetch_add(1, SeqCst
);
234 while let Some(_
) = w2
.pop() {
235 hits
.fetch_add(1, SeqCst
);
241 let mut rng
= rand
::thread_rng();
244 for i
in 0..rng
.gen_range(0, COUNT
) {
245 if rng
.gen_range(0, 3) == 0 && my_hits
== 0 {
246 while let Some(_
) = w
.pop() {
254 if my_hits
> 0 && all_hits
.iter().all(|h
| h
.load(SeqCst
) > 0) {
258 done
.store(true, SeqCst
);
265 const THREADS
: usize = 8;
266 const COUNT
: usize = 50_000;
267 const STEPS
: usize = 1000;
269 struct Elem(usize, Arc
<Mutex
<Vec
<usize>>>);
273 self.1.lock().unwrap().push(self.0);
277 let w
= Worker
::new_lifo();
278 let dropped
= Arc
::new(Mutex
::new(Vec
::new()));
279 let remaining
= Arc
::new(AtomicUsize
::new(COUNT
));
282 w
.push(Elem(i
, dropped
.clone()));
286 for _
in 0..THREADS
{
287 let remaining
= remaining
.clone();
290 scope
.spawn(move |_
| {
291 let w2
= Worker
::new_lifo();
295 if let Success(_
) = s
.steal() {
297 remaining
.fetch_sub(1, SeqCst
);
300 let _
= s
.steal_batch(&w2
);
302 if let Success(_
) = s
.steal_batch_and_pop(&w2
) {
304 remaining
.fetch_sub(1, SeqCst
);
307 while let Some(_
) = w2
.pop() {
309 remaining
.fetch_sub(1, SeqCst
);
316 if let Some(_
) = w
.pop() {
317 remaining
.fetch_sub(1, SeqCst
);
323 let rem
= remaining
.load(SeqCst
);
327 let mut v
= dropped
.lock().unwrap();
328 assert_eq
!(v
.len(), COUNT
- rem
);
335 let mut v
= dropped
.lock().unwrap();
336 assert_eq
!(v
.len(), rem
);
338 for pair
in v
.windows(2) {
339 assert_eq
!(pair
[0] + 1, pair
[1]);