]> git.proxmox.com Git - rustc.git/blame - vendor/parking_lot_core/src/parking_lot.rs
New upstream version 1.46.0~beta.2+dfsg1
[rustc.git] / vendor / parking_lot_core / src / parking_lot.rs
CommitLineData
ba9703b0
XL
1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
ba9703b0
XL
7use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
8use crate::util::UncheckedOptionExt;
9use crate::word_lock::WordLock;
f035d41b 10use cfg_if::cfg_if;
ba9703b0
XL
11use core::{
12 cell::{Cell, UnsafeCell},
13 ptr,
14 sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
15};
f035d41b 16use instant::Instant;
ba9703b0 17use smallvec::SmallVec;
f035d41b
XL
18use std::time::Duration;
19
20cfg_if! {
21 if #[cfg(all(
22 target_arch = "wasm32",
23 target_os = "unknown",
24 target_vendor = "unknown"
25 ))] {
26 use core::ops::Add;
27
28 #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
29 struct DummyInstant(Duration);
30
31 impl DummyInstant {
32 pub fn now() -> DummyInstant {
33 DummyInstant::zero()
34 }
35
36 const fn zero() -> DummyInstant {
37 DummyInstant(Duration::from_secs(0))
38 }
39 }
40
41 impl Add<Duration> for DummyInstant {
42 type Output = DummyInstant;
43
44 fn add(self, _rhs: Duration) -> DummyInstant {
45 DummyInstant::zero()
46 }
47 }
48
49 // Use dummy implementation for `Instant` on `wasm32`. The reason for this is
50 // that `Instant::now()` will always panic because time is currently not implemented
51 // on wasm32-unknown-unknown.
52 // See https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasm/time.rs
53 type InstantType = DummyInstant;
54 } else {
55 // Otherwise use `instant::Instant`
56 type InstantType = Instant;
57 }
58}
ba9703b0
XL
59
60static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
61
62/// Holds the pointer to the currently active `HashTable`.
63///
64/// # Safety
65///
66/// Except for the initial value of null, it must always point to a valid `HashTable` instance.
67/// Any `HashTable` this global static has ever pointed to must never be freed.
68static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
69
70// Even with 3x more buckets than threads, the memory overhead per thread is
71// still only a few hundred bytes per thread.
72const LOAD_FACTOR: usize = 3;
73
74struct HashTable {
75 // Hash buckets for the table
76 entries: Box<[Bucket]>,
77
78 // Number of bits used for the hash function
79 hash_bits: u32,
80
81 // Previous table. This is only kept to keep leak detectors happy.
82 _prev: *const HashTable,
83}
84
85impl HashTable {
86 #[inline]
87 fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
88 let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
89 let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
90
f035d41b 91 let now = InstantType::now();
ba9703b0
XL
92 let mut entries = Vec::with_capacity(new_size);
93 for i in 0..new_size {
94 // We must ensure the seed is not zero
95 entries.push(Bucket::new(now, i as u32 + 1));
96 }
97
98 Box::new(HashTable {
99 entries: entries.into_boxed_slice(),
100 hash_bits,
101 _prev: prev,
102 })
103 }
104}
105
106#[repr(align(64))]
107struct Bucket {
108 // Lock protecting the queue
109 mutex: WordLock,
110
111 // Linked list of threads waiting on this bucket
112 queue_head: Cell<*const ThreadData>,
113 queue_tail: Cell<*const ThreadData>,
114
115 // Next time at which point be_fair should be set
116 fair_timeout: UnsafeCell<FairTimeout>,
117}
118
119impl Bucket {
120 #[inline]
f035d41b 121 pub fn new(timeout: InstantType, seed: u32) -> Self {
ba9703b0
XL
122 Self {
123 mutex: WordLock::new(),
124 queue_head: Cell::new(ptr::null()),
125 queue_tail: Cell::new(ptr::null()),
126 fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)),
127 }
128 }
129}
130
131struct FairTimeout {
132 // Next time at which point be_fair should be set
f035d41b 133 timeout: InstantType,
ba9703b0
XL
134
135 // the PRNG state for calculating the next timeout
136 seed: u32,
137}
138
139impl FairTimeout {
140 #[inline]
f035d41b 141 fn new(timeout: InstantType, seed: u32) -> FairTimeout {
ba9703b0
XL
142 FairTimeout { timeout, seed }
143 }
144
145 // Determine whether we should force a fair unlock, and update the timeout
146 #[inline]
147 fn should_timeout(&mut self) -> bool {
f035d41b 148 let now = InstantType::now();
ba9703b0
XL
149 if now > self.timeout {
150 // Time between 0 and 1ms.
151 let nanos = self.gen_u32() % 1_000_000;
152 self.timeout = now + Duration::new(0, nanos);
153 true
154 } else {
155 false
156 }
157 }
158
159 // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia.
160 fn gen_u32(&mut self) -> u32 {
161 self.seed ^= self.seed << 13;
162 self.seed ^= self.seed >> 17;
163 self.seed ^= self.seed << 5;
164 self.seed
165 }
166}
167
168struct ThreadData {
169 parker: ThreadParker,
170
171 // Key that this thread is sleeping on. This may change if the thread is
172 // requeued to a different key.
173 key: AtomicUsize,
174
175 // Linked list of parked threads in a bucket
176 next_in_queue: Cell<*const ThreadData>,
177
178 // UnparkToken passed to this thread when it is unparked
179 unpark_token: Cell<UnparkToken>,
180
181 // ParkToken value set by the thread when it was parked
182 park_token: Cell<ParkToken>,
183
184 // Is the thread parked with a timeout?
185 parked_with_timeout: Cell<bool>,
186
187 // Extra data for deadlock detection
188 #[cfg(feature = "deadlock_detection")]
189 deadlock_data: deadlock::DeadlockData,
190}
191
192impl ThreadData {
193 fn new() -> ThreadData {
194 // Keep track of the total number of live ThreadData objects and resize
195 // the hash table accordingly.
196 let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
197 grow_hashtable(num_threads);
198
199 ThreadData {
200 parker: ThreadParker::new(),
201 key: AtomicUsize::new(0),
202 next_in_queue: Cell::new(ptr::null()),
203 unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
204 park_token: Cell::new(DEFAULT_PARK_TOKEN),
205 parked_with_timeout: Cell::new(false),
206 #[cfg(feature = "deadlock_detection")]
207 deadlock_data: deadlock::DeadlockData::new(),
208 }
209 }
210}
211
212// Invokes the given closure with a reference to the current thread `ThreadData`.
213#[inline(always)]
214fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
215 // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
216 // to construct. Try to use a thread-local version if possible. Otherwise just
217 // create a ThreadData on the stack
218 let mut thread_data_storage = None;
219 thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
220 let thread_data_ptr = THREAD_DATA
221 .try_with(|x| x as *const ThreadData)
222 .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new));
223
224 f(unsafe { &*thread_data_ptr })
225}
226
227impl Drop for ThreadData {
228 fn drop(&mut self) {
229 NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
230 }
231}
232
233/// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
234/// The reference is valid forever. However, the `HashTable` it references might become stale
235/// at any point. Meaning it still exists, but it is not the instance in active use.
236#[inline]
237fn get_hashtable() -> &'static HashTable {
238 let table = HASHTABLE.load(Ordering::Acquire);
239
240 // If there is no table, create one
241 if table.is_null() {
242 create_hashtable()
243 } else {
244 // SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
245 unsafe { &*table }
246 }
247}
248
249/// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
250/// The reference is valid forever. However, the `HashTable` it references might become stale
251/// at any point. Meaning it still exists, but it is not the instance in active use.
252#[cold]
253fn create_hashtable() -> &'static HashTable {
254 let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
255
256 // If this fails then it means some other thread created the hash table first.
257 let table = match HASHTABLE.compare_exchange(
258 ptr::null_mut(),
259 new_table,
260 Ordering::AcqRel,
261 Ordering::Acquire,
262 ) {
263 Ok(_) => new_table,
264 Err(old_table) => {
265 // Free the table we created
266 // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here.
267 unsafe {
268 Box::from_raw(new_table);
269 }
270 old_table
271 }
272 };
273 // SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we
274 // created here, or it is one loaded from `HASHTABLE`.
275 unsafe { &*table }
276}
277
278// Grow the hash table so that it is big enough for the given number of threads.
279// This isn't performance-critical since it is only done when a ThreadData is
280// created, which only happens once per thread.
281fn grow_hashtable(num_threads: usize) {
282 // Lock all buckets in the existing table and get a reference to it
283 let old_table = loop {
284 let table = get_hashtable();
285
286 // Check if we need to resize the existing table
287 if table.entries.len() >= LOAD_FACTOR * num_threads {
288 return;
289 }
290
291 // Lock all buckets in the old table
292 for bucket in &table.entries[..] {
293 bucket.mutex.lock();
294 }
295
296 // Now check if our table is still the latest one. Another thread could
297 // have grown the hash table between us reading HASHTABLE and locking
298 // the buckets.
299 if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ {
300 break table;
301 }
302
303 // Unlock buckets and try again
304 for bucket in &table.entries[..] {
305 // SAFETY: We hold the lock here, as required
306 unsafe { bucket.mutex.unlock() };
307 }
308 };
309
310 // Create the new table
311 let mut new_table = HashTable::new(num_threads, old_table);
312
313 // Move the entries from the old table to the new one
314 for bucket in &old_table.entries[..] {
315 // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked
316 // lists. All `ThreadData` instances in these lists will remain valid as long as they are
317 // present in the lists, meaning as long as their threads are parked.
318 unsafe { rehash_bucket_into(bucket, &mut new_table) };
319 }
320
321 // Publish the new table. No races are possible at this point because
322 // any other thread trying to grow the hash table is blocked on the bucket
323 // locks in the old table.
324 HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
325
326 // Unlock all buckets in the old table
327 for bucket in &old_table.entries[..] {
328 // SAFETY: We hold the lock here, as required
329 unsafe { bucket.mutex.unlock() };
330 }
331}
332
333/// Iterate through all `ThreadData` objects in the bucket and insert them into the given table
334/// in the bucket their key correspond to for this table.
335///
336/// # Safety
337///
338/// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing
339/// `ThreadData` instances that must stay valid at least as long as the given `table` is in use.
340///
341/// The given `table` must only contain buckets with correctly constructed linked lists.
342unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) {
343 let mut current: *const ThreadData = bucket.queue_head.get();
344 while !current.is_null() {
345 let next = (*current).next_in_queue.get();
346 let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits);
347 if table.entries[hash].queue_tail.get().is_null() {
348 table.entries[hash].queue_head.set(current);
349 } else {
350 (*table.entries[hash].queue_tail.get())
351 .next_in_queue
352 .set(current);
353 }
354 table.entries[hash].queue_tail.set(current);
355 (*current).next_in_queue.set(ptr::null());
356 current = next;
357 }
358}
359
360// Hash function for addresses
361#[cfg(target_pointer_width = "32")]
362#[inline]
363fn hash(key: usize, bits: u32) -> usize {
364 key.wrapping_mul(0x9E3779B9) >> (32 - bits)
365}
366#[cfg(target_pointer_width = "64")]
367#[inline]
368fn hash(key: usize, bits: u32) -> usize {
369 key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
370}
371
372/// Locks the bucket for the given key and returns a reference to it.
373/// The returned bucket must be unlocked again in order to not cause deadlocks.
374#[inline]
375fn lock_bucket(key: usize) -> &'static Bucket {
376 loop {
377 let hashtable = get_hashtable();
378
379 let hash = hash(key, hashtable.hash_bits);
380 let bucket = &hashtable.entries[hash];
381
382 // Lock the bucket
383 bucket.mutex.lock();
384
385 // If no other thread has rehashed the table before we grabbed the lock
386 // then we are good to go! The lock we grabbed prevents any rehashes.
387 if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
388 return bucket;
389 }
390
391 // Unlock the bucket and try again
392 // SAFETY: We hold the lock here, as required
393 unsafe { bucket.mutex.unlock() };
394 }
395}
396
397/// Locks the bucket for the given key and returns a reference to it. But checks that the key
398/// hasn't been changed in the meantime due to a requeue.
399/// The returned bucket must be unlocked again in order to not cause deadlocks.
400#[inline]
401fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) {
402 loop {
403 let hashtable = get_hashtable();
404 let current_key = key.load(Ordering::Relaxed);
405
406 let hash = hash(current_key, hashtable.hash_bits);
407 let bucket = &hashtable.entries[hash];
408
409 // Lock the bucket
410 bucket.mutex.lock();
411
412 // Check that both the hash table and key are correct while the bucket
413 // is locked. Note that the key can't change once we locked the proper
414 // bucket for it, so we just keep trying until we have the correct key.
415 if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _
416 && key.load(Ordering::Relaxed) == current_key
417 {
418 return (current_key, bucket);
419 }
420
421 // Unlock the bucket and try again
422 // SAFETY: We hold the lock here, as required
423 unsafe { bucket.mutex.unlock() };
424 }
425}
426
427/// Locks the two buckets for the given pair of keys and returns references to them.
428/// The returned buckets must be unlocked again in order to not cause deadlocks.
429///
430/// If both keys hash to the same value, both returned references will be to the same bucket. Be
431/// careful to only unlock it once in this case, always use `unlock_bucket_pair`.
432#[inline]
433fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) {
434 loop {
435 let hashtable = get_hashtable();
436
437 let hash1 = hash(key1, hashtable.hash_bits);
438 let hash2 = hash(key2, hashtable.hash_bits);
439
440 // Get the bucket at the lowest hash/index first
441 let bucket1 = if hash1 <= hash2 {
442 &hashtable.entries[hash1]
443 } else {
444 &hashtable.entries[hash2]
445 };
446
447 // Lock the first bucket
448 bucket1.mutex.lock();
449
450 // If no other thread has rehashed the table before we grabbed the lock
451 // then we are good to go! The lock we grabbed prevents any rehashes.
452 if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
453 // Now lock the second bucket and return the two buckets
454 if hash1 == hash2 {
455 return (bucket1, bucket1);
456 } else if hash1 < hash2 {
457 let bucket2 = &hashtable.entries[hash2];
458 bucket2.mutex.lock();
459 return (bucket1, bucket2);
460 } else {
461 let bucket2 = &hashtable.entries[hash1];
462 bucket2.mutex.lock();
463 return (bucket2, bucket1);
464 }
465 }
466
467 // Unlock the bucket and try again
468 // SAFETY: We hold the lock here, as required
469 unsafe { bucket1.mutex.unlock() };
470 }
471}
472
473/// Unlock a pair of buckets
474///
475/// # Safety
476///
477/// Both buckets must be locked
478#[inline]
479unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
480 bucket1.mutex.unlock();
481 if !ptr::eq(bucket1, bucket2) {
482 bucket2.mutex.unlock();
483 }
484}
485
486/// Result of a park operation.
487#[derive(Copy, Clone, Eq, PartialEq, Debug)]
488pub enum ParkResult {
489 /// We were unparked by another thread with the given token.
490 Unparked(UnparkToken),
491
492 /// The validation callback returned false.
493 Invalid,
494
495 /// The timeout expired.
496 TimedOut,
497}
498
499impl ParkResult {
500 /// Returns true if we were unparked by another thread.
501 #[inline]
502 pub fn is_unparked(self) -> bool {
503 if let ParkResult::Unparked(_) = self {
504 true
505 } else {
506 false
507 }
508 }
509}
510
511/// Result of an unpark operation.
512#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
513pub struct UnparkResult {
514 /// The number of threads that were unparked.
515 pub unparked_threads: usize,
516
517 /// The number of threads that were requeued.
518 pub requeued_threads: usize,
519
520 /// Whether there are any threads remaining in the queue. This only returns
521 /// true if a thread was unparked.
522 pub have_more_threads: bool,
523
524 /// This is set to true on average once every 0.5ms for any given key. It
525 /// should be used to switch to a fair unlocking mechanism for a particular
526 /// unlock.
527 pub be_fair: bool,
528
529 /// Private field so new fields can be added without breakage.
530 _sealed: (),
531}
532
533/// Operation that `unpark_requeue` should perform.
534#[derive(Copy, Clone, Eq, PartialEq, Debug)]
535pub enum RequeueOp {
536 /// Abort the operation without doing anything.
537 Abort,
538
539 /// Unpark one thread and requeue the rest onto the target queue.
540 UnparkOneRequeueRest,
541
542 /// Requeue all threads onto the target queue.
543 RequeueAll,
544
545 /// Unpark one thread and leave the rest parked. No requeuing is done.
546 UnparkOne,
547
548 /// Requeue one thread and leave the rest parked on the original queue.
549 RequeueOne,
550}
551
552/// Operation that `unpark_filter` should perform for each thread.
553#[derive(Copy, Clone, Eq, PartialEq, Debug)]
554pub enum FilterOp {
555 /// Unpark the thread and continue scanning the list of parked threads.
556 Unpark,
557
558 /// Don't unpark the thread and continue scanning the list of parked threads.
559 Skip,
560
561 /// Don't unpark the thread and stop scanning the list of parked threads.
562 Stop,
563}
564
565/// A value which is passed from an unparker to a parked thread.
566#[derive(Copy, Clone, Eq, PartialEq, Debug)]
567pub struct UnparkToken(pub usize);
568
569/// A value associated with a parked thread which can be used by `unpark_filter`.
570#[derive(Copy, Clone, Eq, PartialEq, Debug)]
571pub struct ParkToken(pub usize);
572
573/// A default unpark token to use.
574pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
575
576/// A default park token to use.
577pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
578
579/// Parks the current thread in the queue associated with the given key.
580///
581/// The `validate` function is called while the queue is locked and can abort
582/// the operation by returning false. If `validate` returns true then the
583/// current thread is appended to the queue and the queue is unlocked.
584///
585/// The `before_sleep` function is called after the queue is unlocked but before
586/// the thread is put to sleep. The thread will then sleep until it is unparked
587/// or the given timeout is reached.
588///
589/// The `timed_out` function is also called while the queue is locked, but only
590/// if the timeout was reached. It is passed the key of the queue it was in when
591/// it timed out, which may be different from the original key if
592/// `unpark_requeue` was called. It is also passed a bool which indicates
593/// whether it was the last thread in the queue.
594///
595/// # Safety
596///
597/// You should only call this function with an address that you control, since
598/// you could otherwise interfere with the operation of other synchronization
599/// primitives.
600///
601/// The `validate` and `timed_out` functions are called while the queue is
602/// locked and must not panic or call into any function in `parking_lot`.
603///
604/// The `before_sleep` function is called outside the queue lock and is allowed
605/// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
606/// it is not allowed to call `park` or panic.
607#[inline]
608pub unsafe fn park(
609 key: usize,
610 validate: impl FnOnce() -> bool,
611 before_sleep: impl FnOnce(),
612 timed_out: impl FnOnce(usize, bool),
613 park_token: ParkToken,
614 timeout: Option<Instant>,
615) -> ParkResult {
616 // Grab our thread data, this also ensures that the hash table exists
617 with_thread_data(|thread_data| {
618 // Lock the bucket for the given key
619 let bucket = lock_bucket(key);
620
621 // If the validation function fails, just return
622 if !validate() {
623 // SAFETY: We hold the lock here, as required
624 bucket.mutex.unlock();
625 return ParkResult::Invalid;
626 }
627
628 // Append our thread data to the queue and unlock the bucket
629 thread_data.parked_with_timeout.set(timeout.is_some());
630 thread_data.next_in_queue.set(ptr::null());
631 thread_data.key.store(key, Ordering::Relaxed);
632 thread_data.park_token.set(park_token);
633 thread_data.parker.prepare_park();
634 if !bucket.queue_head.get().is_null() {
635 (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
636 } else {
637 bucket.queue_head.set(thread_data);
638 }
639 bucket.queue_tail.set(thread_data);
640 // SAFETY: We hold the lock here, as required
641 bucket.mutex.unlock();
642
643 // Invoke the pre-sleep callback
644 before_sleep();
645
646 // Park our thread and determine whether we were woken up by an unpark
647 // or by our timeout. Note that this isn't precise: we can still be
648 // unparked since we are still in the queue.
649 let unparked = match timeout {
650 Some(timeout) => thread_data.parker.park_until(timeout),
651 None => {
652 thread_data.parker.park();
653 // call deadlock detection on_unpark hook
654 deadlock::on_unpark(thread_data);
655 true
656 }
657 };
658
659 // If we were unparked, return now
660 if unparked {
661 return ParkResult::Unparked(thread_data.unpark_token.get());
662 }
663
664 // Lock our bucket again. Note that the hashtable may have been rehashed in
665 // the meantime. Our key may also have changed if we were requeued.
666 let (key, bucket) = lock_bucket_checked(&thread_data.key);
667
668 // Now we need to check again if we were unparked or timed out. Unlike the
669 // last check this is precise because we hold the bucket lock.
670 if !thread_data.parker.timed_out() {
671 // SAFETY: We hold the lock here, as required
672 bucket.mutex.unlock();
673 return ParkResult::Unparked(thread_data.unpark_token.get());
674 }
675
676 // We timed out, so we now need to remove our thread from the queue
677 let mut link = &bucket.queue_head;
678 let mut current = bucket.queue_head.get();
679 let mut previous = ptr::null();
680 let mut was_last_thread = true;
681 while !current.is_null() {
682 if current == thread_data {
683 let next = (*current).next_in_queue.get();
684 link.set(next);
685 if bucket.queue_tail.get() == current {
686 bucket.queue_tail.set(previous);
687 } else {
688 // Scan the rest of the queue to see if there are any other
689 // entries with the given key.
690 let mut scan = next;
691 while !scan.is_null() {
692 if (*scan).key.load(Ordering::Relaxed) == key {
693 was_last_thread = false;
694 break;
695 }
696 scan = (*scan).next_in_queue.get();
697 }
698 }
699
700 // Callback to indicate that we timed out, and whether we were the
701 // last thread on the queue.
702 timed_out(key, was_last_thread);
703 break;
704 } else {
705 if (*current).key.load(Ordering::Relaxed) == key {
706 was_last_thread = false;
707 }
708 link = &(*current).next_in_queue;
709 previous = current;
710 current = link.get();
711 }
712 }
713
714 // There should be no way for our thread to have been removed from the queue
715 // if we timed out.
716 debug_assert!(!current.is_null());
717
718 // Unlock the bucket, we are done
719 // SAFETY: We hold the lock here, as required
720 bucket.mutex.unlock();
721 ParkResult::TimedOut
722 })
723}
724
725/// Unparks one thread from the queue associated with the given key.
726///
727/// The `callback` function is called while the queue is locked and before the
728/// target thread is woken up. The `UnparkResult` argument to the function
729/// indicates whether a thread was found in the queue and whether this was the
730/// last thread in the queue. This value is also returned by `unpark_one`.
731///
732/// The `callback` function should return an `UnparkToken` value which will be
733/// passed to the thread that is unparked. If no thread is unparked then the
734/// returned value is ignored.
735///
736/// # Safety
737///
738/// You should only call this function with an address that you control, since
739/// you could otherwise interfere with the operation of other synchronization
740/// primitives.
741///
742/// The `callback` function is called while the queue is locked and must not
743/// panic or call into any function in `parking_lot`.
744#[inline]
745pub unsafe fn unpark_one(
746 key: usize,
747 callback: impl FnOnce(UnparkResult) -> UnparkToken,
748) -> UnparkResult {
749 // Lock the bucket for the given key
750 let bucket = lock_bucket(key);
751
752 // Find a thread with a matching key and remove it from the queue
753 let mut link = &bucket.queue_head;
754 let mut current = bucket.queue_head.get();
755 let mut previous = ptr::null();
756 let mut result = UnparkResult::default();
757 while !current.is_null() {
758 if (*current).key.load(Ordering::Relaxed) == key {
759 // Remove the thread from the queue
760 let next = (*current).next_in_queue.get();
761 link.set(next);
762 if bucket.queue_tail.get() == current {
763 bucket.queue_tail.set(previous);
764 } else {
765 // Scan the rest of the queue to see if there are any other
766 // entries with the given key.
767 let mut scan = next;
768 while !scan.is_null() {
769 if (*scan).key.load(Ordering::Relaxed) == key {
770 result.have_more_threads = true;
771 break;
772 }
773 scan = (*scan).next_in_queue.get();
774 }
775 }
776
777 // Invoke the callback before waking up the thread
778 result.unparked_threads = 1;
779 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
780 let token = callback(result);
781
782 // Set the token for the target thread
783 (*current).unpark_token.set(token);
784
785 // This is a bit tricky: we first lock the ThreadParker to prevent
786 // the thread from exiting and freeing its ThreadData if its wait
787 // times out. Then we unlock the queue since we don't want to keep
788 // the queue locked while we perform a system call. Finally we wake
789 // up the parked thread.
790 let handle = (*current).parker.unpark_lock();
791 // SAFETY: We hold the lock here, as required
792 bucket.mutex.unlock();
793 handle.unpark();
794
795 return result;
796 } else {
797 link = &(*current).next_in_queue;
798 previous = current;
799 current = link.get();
800 }
801 }
802
803 // No threads with a matching key were found in the bucket
804 callback(result);
805 // SAFETY: We hold the lock here, as required
806 bucket.mutex.unlock();
807 result
808}
809
810/// Unparks all threads in the queue associated with the given key.
811///
812/// The given `UnparkToken` is passed to all unparked threads.
813///
814/// This function returns the number of threads that were unparked.
815///
816/// # Safety
817///
818/// You should only call this function with an address that you control, since
819/// you could otherwise interfere with the operation of other synchronization
820/// primitives.
821#[inline]
822pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
823 // Lock the bucket for the given key
824 let bucket = lock_bucket(key);
825
826 // Remove all threads with the given key in the bucket
827 let mut link = &bucket.queue_head;
828 let mut current = bucket.queue_head.get();
829 let mut previous = ptr::null();
830 let mut threads = SmallVec::<[_; 8]>::new();
831 while !current.is_null() {
832 if (*current).key.load(Ordering::Relaxed) == key {
833 // Remove the thread from the queue
834 let next = (*current).next_in_queue.get();
835 link.set(next);
836 if bucket.queue_tail.get() == current {
837 bucket.queue_tail.set(previous);
838 }
839
840 // Set the token for the target thread
841 (*current).unpark_token.set(unpark_token);
842
843 // Don't wake up threads while holding the queue lock. See comment
844 // in unpark_one. For now just record which threads we need to wake
845 // up.
846 threads.push((*current).parker.unpark_lock());
847 current = next;
848 } else {
849 link = &(*current).next_in_queue;
850 previous = current;
851 current = link.get();
852 }
853 }
854
855 // Unlock the bucket
856 // SAFETY: We hold the lock here, as required
857 bucket.mutex.unlock();
858
859 // Now that we are outside the lock, wake up all the threads that we removed
860 // from the queue.
861 let num_threads = threads.len();
862 for handle in threads.into_iter() {
863 handle.unpark();
864 }
865
866 num_threads
867}
868
869/// Removes all threads from the queue associated with `key_from`, optionally
870/// unparks the first one and requeues the rest onto the queue associated with
871/// `key_to`.
872///
873/// The `validate` function is called while both queues are locked. Its return
874/// value will determine which operation is performed, or whether the operation
875/// should be aborted. See `RequeueOp` for details about the different possible
876/// return values.
877///
878/// The `callback` function is also called while both queues are locked. It is
879/// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
880/// indicating whether a thread was unparked and whether there are threads still
881/// parked in the new queue. This `UnparkResult` value is also returned by
882/// `unpark_requeue`.
883///
884/// The `callback` function should return an `UnparkToken` value which will be
885/// passed to the thread that is unparked. If no thread is unparked then the
886/// returned value is ignored.
887///
888/// # Safety
889///
890/// You should only call this function with an address that you control, since
891/// you could otherwise interfere with the operation of other synchronization
892/// primitives.
893///
894/// The `validate` and `callback` functions are called while the queue is locked
895/// and must not panic or call into any function in `parking_lot`.
896#[inline]
897pub unsafe fn unpark_requeue(
898 key_from: usize,
899 key_to: usize,
900 validate: impl FnOnce() -> RequeueOp,
901 callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
902) -> UnparkResult {
903 // Lock the two buckets for the given key
904 let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
905
906 // If the validation function fails, just return
907 let mut result = UnparkResult::default();
908 let op = validate();
909 if op == RequeueOp::Abort {
910 // SAFETY: Both buckets are locked, as required.
911 unlock_bucket_pair(bucket_from, bucket_to);
912 return result;
913 }
914
915 // Remove all threads with the given key in the source bucket
916 let mut link = &bucket_from.queue_head;
917 let mut current = bucket_from.queue_head.get();
918 let mut previous = ptr::null();
919 let mut requeue_threads: *const ThreadData = ptr::null();
920 let mut requeue_threads_tail: *const ThreadData = ptr::null();
921 let mut wakeup_thread = None;
922 while !current.is_null() {
923 if (*current).key.load(Ordering::Relaxed) == key_from {
924 // Remove the thread from the queue
925 let next = (*current).next_in_queue.get();
926 link.set(next);
927 if bucket_from.queue_tail.get() == current {
928 bucket_from.queue_tail.set(previous);
929 }
930
931 // Prepare the first thread for wakeup and requeue the rest.
932 if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
933 && wakeup_thread.is_none()
934 {
935 wakeup_thread = Some(current);
936 result.unparked_threads = 1;
937 } else {
938 if !requeue_threads.is_null() {
939 (*requeue_threads_tail).next_in_queue.set(current);
940 } else {
941 requeue_threads = current;
942 }
943 requeue_threads_tail = current;
944 (*current).key.store(key_to, Ordering::Relaxed);
945 result.requeued_threads += 1;
946 }
947 if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
948 // Scan the rest of the queue to see if there are any other
949 // entries with the given key.
950 let mut scan = next;
951 while !scan.is_null() {
952 if (*scan).key.load(Ordering::Relaxed) == key_from {
953 result.have_more_threads = true;
954 break;
955 }
956 scan = (*scan).next_in_queue.get();
957 }
958 break;
959 }
960 current = next;
961 } else {
962 link = &(*current).next_in_queue;
963 previous = current;
964 current = link.get();
965 }
966 }
967
968 // Add the requeued threads to the destination bucket
969 if !requeue_threads.is_null() {
970 (*requeue_threads_tail).next_in_queue.set(ptr::null());
971 if !bucket_to.queue_head.get().is_null() {
972 (*bucket_to.queue_tail.get())
973 .next_in_queue
974 .set(requeue_threads);
975 } else {
976 bucket_to.queue_head.set(requeue_threads);
977 }
978 bucket_to.queue_tail.set(requeue_threads_tail);
979 }
980
981 // Invoke the callback before waking up the thread
982 if result.unparked_threads != 0 {
983 result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
984 }
985 let token = callback(op, result);
986
987 // See comment in unpark_one for why we mess with the locking
988 if let Some(wakeup_thread) = wakeup_thread {
989 (*wakeup_thread).unpark_token.set(token);
990 let handle = (*wakeup_thread).parker.unpark_lock();
991 // SAFETY: Both buckets are locked, as required.
992 unlock_bucket_pair(bucket_from, bucket_to);
993 handle.unpark();
994 } else {
995 // SAFETY: Both buckets are locked, as required.
996 unlock_bucket_pair(bucket_from, bucket_to);
997 }
998
999 result
1000}
1001
1002/// Unparks a number of threads from the front of the queue associated with
1003/// `key` depending on the results of a filter function which inspects the
1004/// `ParkToken` associated with each thread.
1005///
1006/// The `filter` function is called for each thread in the queue or until
1007/// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
1008/// associated with a particular thread, which is unparked if `FilterOp::Unpark`
1009/// is returned.
1010///
1011/// The `callback` function is also called while both queues are locked. It is
1012/// passed an `UnparkResult` indicating the number of threads that were unparked
1013/// and whether there are still parked threads in the queue. This `UnparkResult`
1014/// value is also returned by `unpark_filter`.
1015///
1016/// The `callback` function should return an `UnparkToken` value which will be
1017/// passed to all threads that are unparked. If no thread is unparked then the
1018/// returned value is ignored.
1019///
1020/// # Safety
1021///
1022/// You should only call this function with an address that you control, since
1023/// you could otherwise interfere with the operation of other synchronization
1024/// primitives.
1025///
1026/// The `filter` and `callback` functions are called while the queue is locked
1027/// and must not panic or call into any function in `parking_lot`.
1028#[inline]
1029pub unsafe fn unpark_filter(
1030 key: usize,
1031 mut filter: impl FnMut(ParkToken) -> FilterOp,
1032 callback: impl FnOnce(UnparkResult) -> UnparkToken,
1033) -> UnparkResult {
1034 // Lock the bucket for the given key
1035 let bucket = lock_bucket(key);
1036
1037 // Go through the queue looking for threads with a matching key
1038 let mut link = &bucket.queue_head;
1039 let mut current = bucket.queue_head.get();
1040 let mut previous = ptr::null();
1041 let mut threads = SmallVec::<[_; 8]>::new();
1042 let mut result = UnparkResult::default();
1043 while !current.is_null() {
1044 if (*current).key.load(Ordering::Relaxed) == key {
1045 // Call the filter function with the thread's ParkToken
1046 let next = (*current).next_in_queue.get();
1047 match filter((*current).park_token.get()) {
1048 FilterOp::Unpark => {
1049 // Remove the thread from the queue
1050 link.set(next);
1051 if bucket.queue_tail.get() == current {
1052 bucket.queue_tail.set(previous);
1053 }
1054
1055 // Add the thread to our list of threads to unpark
1056 threads.push((current, None));
1057
1058 current = next;
1059 }
1060 FilterOp::Skip => {
1061 result.have_more_threads = true;
1062 link = &(*current).next_in_queue;
1063 previous = current;
1064 current = link.get();
1065 }
1066 FilterOp::Stop => {
1067 result.have_more_threads = true;
1068 break;
1069 }
1070 }
1071 } else {
1072 link = &(*current).next_in_queue;
1073 previous = current;
1074 current = link.get();
1075 }
1076 }
1077
1078 // Invoke the callback before waking up the threads
1079 result.unparked_threads = threads.len();
1080 if result.unparked_threads != 0 {
1081 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1082 }
1083 let token = callback(result);
1084
1085 // Pass the token to all threads that are going to be unparked and prepare
1086 // them for unparking.
1087 for t in threads.iter_mut() {
1088 (*t.0).unpark_token.set(token);
1089 t.1 = Some((*t.0).parker.unpark_lock());
1090 }
1091
1092 // SAFETY: We hold the lock here, as required
1093 bucket.mutex.unlock();
1094
1095 // Now that we are outside the lock, wake up all the threads that we removed
1096 // from the queue.
1097 for (_, handle) in threads.into_iter() {
1098 handle.unchecked_unwrap().unpark();
1099 }
1100
1101 result
1102}
1103
1104/// \[Experimental\] Deadlock detection
1105///
1106/// Enabled via the `deadlock_detection` feature flag.
1107pub mod deadlock {
1108 #[cfg(feature = "deadlock_detection")]
1109 use super::deadlock_impl;
1110
1111 #[cfg(feature = "deadlock_detection")]
1112 pub(super) use super::deadlock_impl::DeadlockData;
1113
1114 /// Acquire a resource identified by key in the deadlock detector
1115 /// Noop if deadlock_detection feature isn't enabled.
1116 ///
1117 /// # Safety
1118 ///
1119 /// Call after the resource is acquired
1120 #[inline]
1121 pub unsafe fn acquire_resource(_key: usize) {
1122 #[cfg(feature = "deadlock_detection")]
1123 deadlock_impl::acquire_resource(_key);
1124 }
1125
1126 /// Release a resource identified by key in the deadlock detector.
1127 /// Noop if deadlock_detection feature isn't enabled.
1128 ///
1129 /// # Panics
1130 ///
1131 /// Panics if the resource was already released or wasn't acquired in this thread.
1132 ///
1133 /// # Safety
1134 ///
1135 /// Call before the resource is released
1136 #[inline]
1137 pub unsafe fn release_resource(_key: usize) {
1138 #[cfg(feature = "deadlock_detection")]
1139 deadlock_impl::release_resource(_key);
1140 }
1141
1142 /// Returns all deadlocks detected *since* the last call.
1143 /// Each cycle consist of a vector of `DeadlockedThread`.
1144 #[cfg(feature = "deadlock_detection")]
1145 #[inline]
1146 pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1147 deadlock_impl::check_deadlock()
1148 }
1149
1150 #[inline]
1151 pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1152 #[cfg(feature = "deadlock_detection")]
1153 deadlock_impl::on_unpark(_td);
1154 }
1155}
1156
1157#[cfg(feature = "deadlock_detection")]
1158mod deadlock_impl {
1159 use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
1160 use crate::thread_parker::{ThreadParkerT, UnparkHandleT};
1161 use crate::word_lock::WordLock;
1162 use backtrace::Backtrace;
1163 use petgraph;
1164 use petgraph::graphmap::DiGraphMap;
1165 use std::cell::{Cell, UnsafeCell};
1166 use std::collections::HashSet;
1167 use std::sync::atomic::Ordering;
1168 use std::sync::mpsc;
1169 use thread_id;
1170
1171 /// Representation of a deadlocked thread
1172 pub struct DeadlockedThread {
1173 thread_id: usize,
1174 backtrace: Backtrace,
1175 }
1176
1177 impl DeadlockedThread {
1178 /// The system thread id
1179 pub fn thread_id(&self) -> usize {
1180 self.thread_id
1181 }
1182
1183 /// The thread backtrace
1184 pub fn backtrace(&self) -> &Backtrace {
1185 &self.backtrace
1186 }
1187 }
1188
1189 pub struct DeadlockData {
1190 // Currently owned resources (keys)
1191 resources: UnsafeCell<Vec<usize>>,
1192
1193 // Set when there's a pending callstack request
1194 deadlocked: Cell<bool>,
1195
1196 // Sender used to report the backtrace
1197 backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1198
1199 // System thread id
1200 thread_id: usize,
1201 }
1202
1203 impl DeadlockData {
1204 pub fn new() -> Self {
1205 DeadlockData {
1206 resources: UnsafeCell::new(Vec::new()),
1207 deadlocked: Cell::new(false),
1208 backtrace_sender: UnsafeCell::new(None),
1209 thread_id: thread_id::get(),
1210 }
1211 }
1212 }
1213
1214 pub(super) unsafe fn on_unpark(td: &ThreadData) {
1215 if td.deadlock_data.deadlocked.get() {
1216 let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1217 sender
1218 .send(DeadlockedThread {
1219 thread_id: td.deadlock_data.thread_id,
1220 backtrace: Backtrace::new(),
1221 })
1222 .unwrap();
1223 // make sure to close this sender
1224 drop(sender);
1225
1226 // park until the end of the time
1227 td.parker.prepare_park();
1228 td.parker.park();
1229 unreachable!("unparked deadlocked thread!");
1230 }
1231 }
1232
1233 pub unsafe fn acquire_resource(key: usize) {
1234 with_thread_data(|thread_data| {
1235 (*thread_data.deadlock_data.resources.get()).push(key);
1236 });
1237 }
1238
1239 pub unsafe fn release_resource(key: usize) {
1240 with_thread_data(|thread_data| {
1241 let resources = &mut (*thread_data.deadlock_data.resources.get());
1242
1243 // There is only one situation where we can fail to find the
1244 // resource: we are currently running TLS destructors and our
1245 // ThreadData has already been freed. There isn't much we can do
1246 // about it at this point, so just ignore it.
1247 if let Some(p) = resources.iter().rposition(|x| *x == key) {
1248 resources.swap_remove(p);
1249 }
1250 });
1251 }
1252
1253 pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1254 unsafe {
1255 // fast pass
1256 if check_wait_graph_fast() {
1257 // double check
1258 check_wait_graph_slow()
1259 } else {
1260 Vec::new()
1261 }
1262 }
1263 }
1264
1265 // Simple algorithm that builds a wait graph f the threads and the resources,
1266 // then checks for the presence of cycles (deadlocks).
1267 // This variant isn't precise as it doesn't lock the entire table before checking
1268 unsafe fn check_wait_graph_fast() -> bool {
1269 let table = get_hashtable();
1270 let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1271 let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1272
1273 for b in &(*table).entries[..] {
1274 b.mutex.lock();
1275 let mut current = b.queue_head.get();
1276 while !current.is_null() {
1277 if !(*current).parked_with_timeout.get()
1278 && !(*current).deadlock_data.deadlocked.get()
1279 {
1280 // .resources are waiting for their owner
1281 for &resource in &(*(*current).deadlock_data.resources.get()) {
1282 graph.add_edge(resource, current as usize, ());
1283 }
1284 // owner waits for resource .key
1285 graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1286 }
1287 current = (*current).next_in_queue.get();
1288 }
1289 // SAFETY: We hold the lock here, as required
1290 b.mutex.unlock();
1291 }
1292
1293 petgraph::algo::is_cyclic_directed(&graph)
1294 }
1295
1296 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1297 enum WaitGraphNode {
1298 Thread(*const ThreadData),
1299 Resource(usize),
1300 }
1301
1302 use self::WaitGraphNode::*;
1303
1304 // Contrary to the _fast variant this locks the entries table before looking for cycles.
1305 // Returns all detected thread wait cycles.
1306 // Note that once a cycle is reported it's never reported again.
1307 unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1308 static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new();
1309 DEADLOCK_DETECTION_LOCK.lock();
1310
1311 let mut table = get_hashtable();
1312 loop {
1313 // Lock all buckets in the old table
1314 for b in &table.entries[..] {
1315 b.mutex.lock();
1316 }
1317
1318 // Now check if our table is still the latest one. Another thread could
1319 // have grown the hash table between us getting and locking the hash table.
1320 let new_table = get_hashtable();
1321 if new_table as *const _ == table as *const _ {
1322 break;
1323 }
1324
1325 // Unlock buckets and try again
1326 for b in &table.entries[..] {
1327 // SAFETY: We hold the lock here, as required
1328 b.mutex.unlock();
1329 }
1330
1331 table = new_table;
1332 }
1333
1334 let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1335 let mut graph =
1336 DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1337
1338 for b in &table.entries[..] {
1339 let mut current = b.queue_head.get();
1340 while !current.is_null() {
1341 if !(*current).parked_with_timeout.get()
1342 && !(*current).deadlock_data.deadlocked.get()
1343 {
1344 // .resources are waiting for their owner
1345 for &resource in &(*(*current).deadlock_data.resources.get()) {
1346 graph.add_edge(Resource(resource), Thread(current), ());
1347 }
1348 // owner waits for resource .key
1349 graph.add_edge(
1350 Thread(current),
1351 Resource((*current).key.load(Ordering::Relaxed)),
1352 (),
1353 );
1354 }
1355 current = (*current).next_in_queue.get();
1356 }
1357 }
1358
1359 for b in &table.entries[..] {
1360 // SAFETY: We hold the lock here, as required
1361 b.mutex.unlock();
1362 }
1363
1364 // find cycles
1365 let cycles = graph_cycles(&graph);
1366
1367 let mut results = Vec::with_capacity(cycles.len());
1368
1369 for cycle in cycles {
1370 let (sender, receiver) = mpsc::channel();
1371 for td in cycle {
1372 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1373 (*td).deadlock_data.deadlocked.set(true);
1374 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1375 let handle = (*td).parker.unpark_lock();
1376 // SAFETY: We hold the lock here, as required
1377 bucket.mutex.unlock();
1378 // unpark the deadlocked thread!
1379 // on unpark it'll notice the deadlocked flag and report back
1380 handle.unpark();
1381 }
1382 // make sure to drop our sender before collecting results
1383 drop(sender);
1384 results.push(receiver.iter().collect());
1385 }
1386
1387 DEADLOCK_DETECTION_LOCK.unlock();
1388
1389 results
1390 }
1391
1392 // normalize a cycle to start with the "smallest" node
1393 fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1394 let min_pos = input
1395 .iter()
1396 .enumerate()
1397 .min_by_key(|&(_, &t)| t)
1398 .map(|(p, _)| p)
1399 .unwrap_or(0);
1400 input
1401 .iter()
1402 .cycle()
1403 .skip(min_pos)
1404 .take(input.len())
1405 .cloned()
1406 .collect()
1407 }
1408
1409 // returns all thread cycles in the wait graph
1410 fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1411 use petgraph::visit::depth_first_search;
1412 use petgraph::visit::DfsEvent;
1413 use petgraph::visit::NodeIndexable;
1414
1415 let mut cycles = HashSet::new();
1416 let mut path = Vec::with_capacity(g.node_bound());
1417 // start from threads to get the correct threads cycle
1418 let threads = g
1419 .nodes()
1420 .filter(|n| if let &Thread(_) = n { true } else { false });
1421
1422 depth_first_search(g, threads, |e| match e {
1423 DfsEvent::Discover(Thread(n), _) => path.push(n),
1424 DfsEvent::Finish(Thread(_), _) => {
1425 path.pop();
1426 }
1427 DfsEvent::BackEdge(_, Thread(n)) => {
1428 let from = path.iter().rposition(|&i| i == n).unwrap();
1429 cycles.insert(normalize_cycle(&path[from..]));
1430 }
1431 _ => (),
1432 });
1433
1434 cycles.iter().cloned().collect()
1435 }
1436}
1437
1438#[cfg(test)]
1439mod tests {
1440 use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
1441 use std::{
1442 ptr,
1443 sync::{
1444 atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
1445 Arc,
1446 },
1447 thread,
1448 time::Duration,
1449 };
1450
1451 /// Calls a closure for every `ThreadData` currently parked on a given key
1452 fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) {
1453 let bucket = super::lock_bucket(key);
1454
1455 let mut current: *const ThreadData = bucket.queue_head.get();
1456 while !current.is_null() {
1457 let current_ref = unsafe { &*current };
1458 if current_ref.key.load(Ordering::Relaxed) == key {
1459 f(current_ref);
1460 }
1461 current = current_ref.next_in_queue.get();
1462 }
1463
1464 // SAFETY: We hold the lock here, as required
1465 unsafe { bucket.mutex.unlock() };
1466 }
1467
1468 macro_rules! test {
1469 ( $( $name:ident(
1470 repeats: $repeats:expr,
1471 latches: $latches:expr,
1472 delay: $delay:expr,
1473 threads: $threads:expr,
1474 single_unparks: $single_unparks:expr);
1475 )* ) => {
1476 $(#[test]
1477 fn $name() {
1478 let delay = Duration::from_micros($delay);
1479 for _ in 0..$repeats {
1480 run_parking_test($latches, delay, $threads, $single_unparks);
1481 }
1482 })*
1483 };
1484 }
1485
1486 test! {
1487 unpark_all_one_fast(
1488 repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
1489 );
1490 unpark_all_hundred_fast(
1491 repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
1492 );
1493 unpark_one_one_fast(
1494 repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
1495 );
1496 unpark_one_hundred_fast(
1497 repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
1498 );
1499 unpark_one_fifty_then_fifty_all_fast(
1500 repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
1501 );
1502 unpark_all_one(
1503 repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
1504 );
1505 unpark_all_hundred(
1506 repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
1507 );
1508 unpark_one_one(
1509 repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
1510 );
1511 unpark_one_fifty(
1512 repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
1513 );
1514 unpark_one_fifty_then_fifty_all(
1515 repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
1516 );
1517 hundred_unpark_all_one_fast(
1518 repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
1519 );
1520 hundred_unpark_all_one(
1521 repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
1522 );
1523 }
1524
1525 fn run_parking_test(
1526 num_latches: usize,
1527 delay: Duration,
1528 num_threads: usize,
1529 num_single_unparks: usize,
1530 ) {
1531 let mut tests = Vec::with_capacity(num_latches);
1532
1533 for _ in 0..num_latches {
1534 let test = Arc::new(SingleLatchTest::new(num_threads));
1535 let mut threads = Vec::with_capacity(num_threads);
1536 for _ in 0..num_threads {
1537 let test = test.clone();
1538 threads.push(thread::spawn(move || test.run()));
1539 }
1540 tests.push((test, threads));
1541 }
1542
1543 for unpark_index in 0..num_single_unparks {
1544 thread::sleep(delay);
1545 for (test, _) in &tests {
1546 test.unpark_one(unpark_index);
1547 }
1548 }
1549
1550 for (test, threads) in tests {
1551 test.finish(num_single_unparks);
1552 for thread in threads {
1553 thread.join().expect("Test thread panic");
1554 }
1555 }
1556 }
1557
1558 struct SingleLatchTest {
1559 semaphore: AtomicIsize,
1560 num_awake: AtomicUsize,
1561 /// Holds the pointer to the last *unprocessed* woken up thread.
1562 last_awoken: AtomicPtr<ThreadData>,
1563 /// Total number of threads participating in this test.
1564 num_threads: usize,
1565 }
1566
1567 impl SingleLatchTest {
1568 pub fn new(num_threads: usize) -> Self {
1569 Self {
1570 // This implements a fair (FIFO) semaphore, and it starts out unavailable.
1571 semaphore: AtomicIsize::new(0),
1572 num_awake: AtomicUsize::new(0),
1573 last_awoken: AtomicPtr::new(ptr::null_mut()),
1574 num_threads,
1575 }
1576 }
1577
1578 pub fn run(&self) {
1579 // Get one slot from the semaphore
1580 self.down();
1581
1582 // Report back to the test verification code that this thread woke up
1583 let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _);
1584 self.last_awoken.store(this_thread_ptr, Ordering::SeqCst);
1585 self.num_awake.fetch_add(1, Ordering::SeqCst);
1586 }
1587
1588 pub fn unpark_one(&self, single_unpark_index: usize) {
1589 // last_awoken should be null at all times except between self.up() and at the bottom
1590 // of this method where it's reset to null again
1591 assert!(self.last_awoken.load(Ordering::SeqCst).is_null());
1592
1593 let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads);
1594 for_each(self.semaphore_addr(), |thread_data| {
1595 queue.push(thread_data as *const _ as *mut _);
1596 });
1597 assert!(queue.len() <= self.num_threads - single_unpark_index);
1598
1599 let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
1600
1601 self.up();
1602
1603 // Wait for a parked thread to wake up and update num_awake + last_awoken.
1604 while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
1605 thread::yield_now();
1606 }
1607
1608 // At this point the other thread should have set last_awoken inside the run() method
1609 let last_awoken = self.last_awoken.load(Ordering::SeqCst);
1610 assert!(!last_awoken.is_null());
1611 if !queue.is_empty() && queue[0] != last_awoken {
1612 panic!(
1613 "Woke up wrong thread:\n\tqueue: {:?}\n\tlast awoken: {:?}",
1614 queue, last_awoken
1615 );
1616 }
1617 self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst);
1618 }
1619
1620 pub fn finish(&self, num_single_unparks: usize) {
1621 // The amount of threads not unparked via unpark_one
1622 let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap();
1623
1624 // Wake remaining threads up with unpark_all. Has to be in a loop, because there might
1625 // still be threads that has not yet parked.
1626 while num_threads_left > 0 {
1627 let mut num_waiting_on_address = 0;
1628 for_each(self.semaphore_addr(), |_thread_data| {
1629 num_waiting_on_address += 1;
1630 });
1631 assert!(num_waiting_on_address <= num_threads_left);
1632
1633 let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
1634
1635 let num_unparked =
1636 unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) };
1637 assert!(num_unparked >= num_waiting_on_address);
1638 assert!(num_unparked <= num_threads_left);
1639
1640 // Wait for all unparked threads to wake up and update num_awake + last_awoken.
1641 while self.num_awake.load(Ordering::SeqCst)
1642 != num_awake_before_unpark + num_unparked
1643 {
1644 thread::yield_now()
1645 }
1646
1647 num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
1648 }
1649 // By now, all threads should have been woken up
1650 assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
1651
1652 // Make sure no thread is parked on our semaphore address
1653 let mut num_waiting_on_address = 0;
1654 for_each(self.semaphore_addr(), |_thread_data| {
1655 num_waiting_on_address += 1;
1656 });
1657 assert_eq!(num_waiting_on_address, 0);
1658 }
1659
1660 pub fn down(&self) {
1661 let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
1662
1663 if old_semaphore_value > 0 {
1664 // We acquired the semaphore. Done.
1665 return;
1666 }
1667
1668 // We need to wait.
1669 let validate = || true;
1670 let before_sleep = || {};
1671 let timed_out = |_, _| {};
1672 unsafe {
1673 super::park(
1674 self.semaphore_addr(),
1675 validate,
1676 before_sleep,
1677 timed_out,
1678 DEFAULT_PARK_TOKEN,
1679 None,
1680 );
1681 }
1682 }
1683
1684 pub fn up(&self) {
1685 let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
1686
1687 // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
1688 if old_semaphore_value < 0 {
1689 // We need to continue until we have actually unparked someone. It might be that
1690 // the thread we want to pass ownership to has decremented the semaphore counter,
1691 // but not yet parked.
1692 loop {
1693 match unsafe {
1694 super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN)
1695 .unparked_threads
1696 } {
1697 1 => break,
1698 0 => (),
1699 i => panic!("Should not wake up {} threads", i),
1700 }
1701 }
1702 }
1703 }
1704
1705 fn semaphore_addr(&self) -> usize {
1706 &self.semaphore as *const _ as usize
1707 }
1708 }
1709}