1 use std
::sync
::atomic
::Ordering
::SeqCst
;
2 use std
::sync
::atomic
::{AtomicBool, AtomicUsize}
;
3 use std
::sync
::{Arc, Mutex}
;
5 use crossbeam_deque
::Steal
::{Empty, Success}
;
6 use crossbeam_deque
::Worker
;
7 use crossbeam_utils
::thread
::scope
;
12 let w
= Worker
::new_lifo();
14 assert_eq
!(w
.pop(), None
);
15 assert_eq
!(s
.steal(), Empty
);
18 assert_eq
!(w
.pop(), Some(1));
19 assert_eq
!(w
.pop(), None
);
20 assert_eq
!(s
.steal(), Empty
);
23 assert_eq
!(s
.steal(), Success(2));
24 assert_eq
!(s
.steal(), Empty
);
25 assert_eq
!(w
.pop(), None
);
30 assert_eq
!(s
.steal(), Success(3));
31 assert_eq
!(s
.steal(), Success(4));
32 assert_eq
!(s
.steal(), Success(5));
33 assert_eq
!(s
.steal(), Empty
);
39 assert_eq
!(w
.pop(), Some(9));
40 assert_eq
!(s
.steal(), Success(6));
41 assert_eq
!(w
.pop(), Some(8));
42 assert_eq
!(w
.pop(), Some(7));
43 assert_eq
!(w
.pop(), None
);
48 let w
= Worker
::new_lifo();
51 assert
!(w
.is_empty());
53 assert
!(!w
.is_empty());
55 assert
!(!w
.is_empty());
57 assert
!(!w
.is_empty());
59 assert
!(w
.is_empty());
61 assert
!(s
.is_empty());
63 assert
!(!s
.is_empty());
65 assert
!(!s
.is_empty());
67 assert
!(!s
.is_empty());
69 assert
!(s
.is_empty());
74 const STEPS
: usize = 50_000;
76 let w
= Worker
::new_lifo();
83 if let Success(v
) = s
.steal() {
90 assert_eq
!(s
.steal(), Empty
);
102 const THREADS
: usize = 8;
103 const COUNT
: usize = 50_000;
105 let w
= Worker
::new_lifo();
108 w
.push(Box
::new(i
+ 1));
110 let remaining
= Arc
::new(AtomicUsize
::new(COUNT
));
113 for _
in 0..THREADS
{
115 let remaining
= remaining
.clone();
117 scope
.spawn(move |_
| {
119 while remaining
.load(SeqCst
) > 0 {
120 if let Success(x
) = s
.steal() {
123 remaining
.fetch_sub(1, SeqCst
);
129 let mut last
= COUNT
+ 1;
130 while remaining
.load(SeqCst
) > 0 {
131 if let Some(x
) = w
.pop() {
134 remaining
.fetch_sub(1, SeqCst
);
143 const THREADS
: usize = 8;
144 const COUNT
: usize = 50_000;
146 let w
= Worker
::new_lifo();
147 let done
= Arc
::new(AtomicBool
::new(false));
148 let hits
= Arc
::new(AtomicUsize
::new(0));
151 for _
in 0..THREADS
{
153 let done
= done
.clone();
154 let hits
= hits
.clone();
156 scope
.spawn(move |_
| {
157 let w2
= Worker
::new_lifo();
159 while !done
.load(SeqCst
) {
160 if let Success(_
) = s
.steal() {
161 hits
.fetch_add(1, SeqCst
);
164 let _
= s
.steal_batch(&w2
);
166 if let Success(_
) = s
.steal_batch_and_pop(&w2
) {
167 hits
.fetch_add(1, SeqCst
);
170 while w2
.pop().is_some() {
171 hits
.fetch_add(1, SeqCst
);
177 let mut rng
= rand
::thread_rng();
178 let mut expected
= 0;
179 while expected
< COUNT
{
180 if rng
.gen_range(0..3) == 0 {
181 while w
.pop().is_some() {
182 hits
.fetch_add(1, SeqCst
);
190 while hits
.load(SeqCst
) < COUNT
{
191 while w
.pop().is_some() {
192 hits
.fetch_add(1, SeqCst
);
195 done
.store(true, SeqCst
);
202 const THREADS
: usize = 8;
203 const COUNT
: usize = 50_000;
205 let w
= Worker
::new_lifo();
206 let done
= Arc
::new(AtomicBool
::new(false));
207 let mut all_hits
= Vec
::new();
210 for _
in 0..THREADS
{
212 let done
= done
.clone();
213 let hits
= Arc
::new(AtomicUsize
::new(0));
214 all_hits
.push(hits
.clone());
216 scope
.spawn(move |_
| {
217 let w2
= Worker
::new_lifo();
219 while !done
.load(SeqCst
) {
220 if let Success(_
) = s
.steal() {
221 hits
.fetch_add(1, SeqCst
);
224 let _
= s
.steal_batch(&w2
);
226 if let Success(_
) = s
.steal_batch_and_pop(&w2
) {
227 hits
.fetch_add(1, SeqCst
);
230 while w2
.pop().is_some() {
231 hits
.fetch_add(1, SeqCst
);
237 let mut rng
= rand
::thread_rng();
240 for i
in 0..rng
.gen_range(0..COUNT
) {
241 if rng
.gen_range(0..3) == 0 && my_hits
== 0 {
242 while w
.pop().is_some() {
250 if my_hits
> 0 && all_hits
.iter().all(|h
| h
.load(SeqCst
) > 0) {
254 done
.store(true, SeqCst
);
261 const THREADS
: usize = 8;
262 const COUNT
: usize = 50_000;
263 const STEPS
: usize = 1000;
265 struct Elem(usize, Arc
<Mutex
<Vec
<usize>>>);
269 self.1.lock().unwrap().push(self.0);
273 let w
= Worker
::new_lifo();
274 let dropped
= Arc
::new(Mutex
::new(Vec
::new()));
275 let remaining
= Arc
::new(AtomicUsize
::new(COUNT
));
278 w
.push(Elem(i
, dropped
.clone()));
282 for _
in 0..THREADS
{
283 let remaining
= remaining
.clone();
286 scope
.spawn(move |_
| {
287 let w2
= Worker
::new_lifo();
291 if let Success(_
) = s
.steal() {
293 remaining
.fetch_sub(1, SeqCst
);
296 let _
= s
.steal_batch(&w2
);
298 if let Success(_
) = s
.steal_batch_and_pop(&w2
) {
300 remaining
.fetch_sub(1, SeqCst
);
303 while w2
.pop().is_some() {
305 remaining
.fetch_sub(1, SeqCst
);
312 if w
.pop().is_some() {
313 remaining
.fetch_sub(1, SeqCst
);
319 let rem
= remaining
.load(SeqCst
);
323 let mut v
= dropped
.lock().unwrap();
324 assert_eq
!(v
.len(), COUNT
- rem
);
331 let mut v
= dropped
.lock().unwrap();
332 assert_eq
!(v
.len(), rem
);
334 for pair
in v
.windows(2) {
335 assert_eq
!(pair
[0] + 1, pair
[1]);