]>
Commit | Line | Data |
---|---|---|
ba9703b0 XL |
1 | // Copyright 2016 Amanieu d'Antras |
2 | // | |
3 | // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or | |
4 | // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or | |
5 | // http://opensource.org/licenses/MIT>, at your option. This file may not be | |
6 | // copied, modified, or distributed except according to those terms. | |
ba9703b0 XL |
7 | use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT}; |
8 | use crate::util::UncheckedOptionExt; | |
9 | use crate::word_lock::WordLock; | |
f035d41b | 10 | use cfg_if::cfg_if; |
ba9703b0 XL |
11 | use core::{ |
12 | cell::{Cell, UnsafeCell}, | |
13 | ptr, | |
14 | sync::atomic::{AtomicPtr, AtomicUsize, Ordering}, | |
15 | }; | |
f035d41b | 16 | use instant::Instant; |
ba9703b0 | 17 | use smallvec::SmallVec; |
f035d41b XL |
18 | use std::time::Duration; |
19 | ||
20 | cfg_if! { | |
21 | if #[cfg(all( | |
22 | target_arch = "wasm32", | |
23 | target_os = "unknown", | |
24 | target_vendor = "unknown" | |
25 | ))] { | |
26 | use core::ops::Add; | |
27 | ||
28 | #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)] | |
29 | struct DummyInstant(Duration); | |
30 | ||
31 | impl DummyInstant { | |
32 | pub fn now() -> DummyInstant { | |
33 | DummyInstant::zero() | |
34 | } | |
35 | ||
36 | const fn zero() -> DummyInstant { | |
37 | DummyInstant(Duration::from_secs(0)) | |
38 | } | |
39 | } | |
40 | ||
41 | impl Add<Duration> for DummyInstant { | |
42 | type Output = DummyInstant; | |
43 | ||
44 | fn add(self, _rhs: Duration) -> DummyInstant { | |
45 | DummyInstant::zero() | |
46 | } | |
47 | } | |
48 | ||
49 | // Use dummy implementation for `Instant` on `wasm32`. The reason for this is | |
50 | // that `Instant::now()` will always panic because time is currently not implemented | |
51 | // on wasm32-unknown-unknown. | |
52 | // See https://github.com/rust-lang/rust/blob/master/src/libstd/sys/wasm/time.rs | |
53 | type InstantType = DummyInstant; | |
54 | } else { | |
55 | // Otherwise use `instant::Instant` | |
56 | type InstantType = Instant; | |
57 | } | |
58 | } | |
ba9703b0 XL |
59 | |
60 | static NUM_THREADS: AtomicUsize = AtomicUsize::new(0); | |
61 | ||
62 | /// Holds the pointer to the currently active `HashTable`. | |
63 | /// | |
64 | /// # Safety | |
65 | /// | |
66 | /// Except for the initial value of null, it must always point to a valid `HashTable` instance. | |
67 | /// Any `HashTable` this global static has ever pointed to must never be freed. | |
68 | static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut()); | |
69 | ||
70 | // Even with 3x more buckets than threads, the memory overhead per thread is | |
71 | // still only a few hundred bytes per thread. | |
72 | const LOAD_FACTOR: usize = 3; | |
73 | ||
74 | struct HashTable { | |
75 | // Hash buckets for the table | |
76 | entries: Box<[Bucket]>, | |
77 | ||
78 | // Number of bits used for the hash function | |
79 | hash_bits: u32, | |
80 | ||
81 | // Previous table. This is only kept to keep leak detectors happy. | |
82 | _prev: *const HashTable, | |
83 | } | |
84 | ||
85 | impl HashTable { | |
86 | #[inline] | |
87 | fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> { | |
88 | let new_size = (num_threads * LOAD_FACTOR).next_power_of_two(); | |
89 | let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1; | |
90 | ||
f035d41b | 91 | let now = InstantType::now(); |
ba9703b0 XL |
92 | let mut entries = Vec::with_capacity(new_size); |
93 | for i in 0..new_size { | |
94 | // We must ensure the seed is not zero | |
95 | entries.push(Bucket::new(now, i as u32 + 1)); | |
96 | } | |
97 | ||
98 | Box::new(HashTable { | |
99 | entries: entries.into_boxed_slice(), | |
100 | hash_bits, | |
101 | _prev: prev, | |
102 | }) | |
103 | } | |
104 | } | |
105 | ||
106 | #[repr(align(64))] | |
107 | struct Bucket { | |
108 | // Lock protecting the queue | |
109 | mutex: WordLock, | |
110 | ||
111 | // Linked list of threads waiting on this bucket | |
112 | queue_head: Cell<*const ThreadData>, | |
113 | queue_tail: Cell<*const ThreadData>, | |
114 | ||
115 | // Next time at which point be_fair should be set | |
116 | fair_timeout: UnsafeCell<FairTimeout>, | |
117 | } | |
118 | ||
119 | impl Bucket { | |
120 | #[inline] | |
f035d41b | 121 | pub fn new(timeout: InstantType, seed: u32) -> Self { |
ba9703b0 XL |
122 | Self { |
123 | mutex: WordLock::new(), | |
124 | queue_head: Cell::new(ptr::null()), | |
125 | queue_tail: Cell::new(ptr::null()), | |
126 | fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)), | |
127 | } | |
128 | } | |
129 | } | |
130 | ||
131 | struct FairTimeout { | |
132 | // Next time at which point be_fair should be set | |
f035d41b | 133 | timeout: InstantType, |
ba9703b0 XL |
134 | |
135 | // the PRNG state for calculating the next timeout | |
136 | seed: u32, | |
137 | } | |
138 | ||
139 | impl FairTimeout { | |
140 | #[inline] | |
f035d41b | 141 | fn new(timeout: InstantType, seed: u32) -> FairTimeout { |
ba9703b0 XL |
142 | FairTimeout { timeout, seed } |
143 | } | |
144 | ||
145 | // Determine whether we should force a fair unlock, and update the timeout | |
146 | #[inline] | |
147 | fn should_timeout(&mut self) -> bool { | |
f035d41b | 148 | let now = InstantType::now(); |
ba9703b0 XL |
149 | if now > self.timeout { |
150 | // Time between 0 and 1ms. | |
151 | let nanos = self.gen_u32() % 1_000_000; | |
152 | self.timeout = now + Duration::new(0, nanos); | |
153 | true | |
154 | } else { | |
155 | false | |
156 | } | |
157 | } | |
158 | ||
159 | // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia. | |
160 | fn gen_u32(&mut self) -> u32 { | |
161 | self.seed ^= self.seed << 13; | |
162 | self.seed ^= self.seed >> 17; | |
163 | self.seed ^= self.seed << 5; | |
164 | self.seed | |
165 | } | |
166 | } | |
167 | ||
168 | struct ThreadData { | |
169 | parker: ThreadParker, | |
170 | ||
171 | // Key that this thread is sleeping on. This may change if the thread is | |
172 | // requeued to a different key. | |
173 | key: AtomicUsize, | |
174 | ||
175 | // Linked list of parked threads in a bucket | |
176 | next_in_queue: Cell<*const ThreadData>, | |
177 | ||
178 | // UnparkToken passed to this thread when it is unparked | |
179 | unpark_token: Cell<UnparkToken>, | |
180 | ||
181 | // ParkToken value set by the thread when it was parked | |
182 | park_token: Cell<ParkToken>, | |
183 | ||
184 | // Is the thread parked with a timeout? | |
185 | parked_with_timeout: Cell<bool>, | |
186 | ||
187 | // Extra data for deadlock detection | |
188 | #[cfg(feature = "deadlock_detection")] | |
189 | deadlock_data: deadlock::DeadlockData, | |
190 | } | |
191 | ||
192 | impl ThreadData { | |
193 | fn new() -> ThreadData { | |
194 | // Keep track of the total number of live ThreadData objects and resize | |
195 | // the hash table accordingly. | |
196 | let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1; | |
197 | grow_hashtable(num_threads); | |
198 | ||
199 | ThreadData { | |
200 | parker: ThreadParker::new(), | |
201 | key: AtomicUsize::new(0), | |
202 | next_in_queue: Cell::new(ptr::null()), | |
203 | unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN), | |
204 | park_token: Cell::new(DEFAULT_PARK_TOKEN), | |
205 | parked_with_timeout: Cell::new(false), | |
206 | #[cfg(feature = "deadlock_detection")] | |
207 | deadlock_data: deadlock::DeadlockData::new(), | |
208 | } | |
209 | } | |
210 | } | |
211 | ||
212 | // Invokes the given closure with a reference to the current thread `ThreadData`. | |
213 | #[inline(always)] | |
214 | fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T { | |
215 | // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive | |
216 | // to construct. Try to use a thread-local version if possible. Otherwise just | |
217 | // create a ThreadData on the stack | |
218 | let mut thread_data_storage = None; | |
219 | thread_local!(static THREAD_DATA: ThreadData = ThreadData::new()); | |
220 | let thread_data_ptr = THREAD_DATA | |
221 | .try_with(|x| x as *const ThreadData) | |
222 | .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new)); | |
223 | ||
224 | f(unsafe { &*thread_data_ptr }) | |
225 | } | |
226 | ||
227 | impl Drop for ThreadData { | |
228 | fn drop(&mut self) { | |
229 | NUM_THREADS.fetch_sub(1, Ordering::Relaxed); | |
230 | } | |
231 | } | |
232 | ||
233 | /// Returns a reference to the latest hash table, creating one if it doesn't exist yet. | |
234 | /// The reference is valid forever. However, the `HashTable` it references might become stale | |
235 | /// at any point. Meaning it still exists, but it is not the instance in active use. | |
236 | #[inline] | |
237 | fn get_hashtable() -> &'static HashTable { | |
238 | let table = HASHTABLE.load(Ordering::Acquire); | |
239 | ||
240 | // If there is no table, create one | |
241 | if table.is_null() { | |
242 | create_hashtable() | |
243 | } else { | |
244 | // SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed. | |
245 | unsafe { &*table } | |
246 | } | |
247 | } | |
248 | ||
249 | /// Returns a reference to the latest hash table, creating one if it doesn't exist yet. | |
250 | /// The reference is valid forever. However, the `HashTable` it references might become stale | |
251 | /// at any point. Meaning it still exists, but it is not the instance in active use. | |
252 | #[cold] | |
253 | fn create_hashtable() -> &'static HashTable { | |
254 | let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null())); | |
255 | ||
256 | // If this fails then it means some other thread created the hash table first. | |
257 | let table = match HASHTABLE.compare_exchange( | |
258 | ptr::null_mut(), | |
259 | new_table, | |
260 | Ordering::AcqRel, | |
261 | Ordering::Acquire, | |
262 | ) { | |
263 | Ok(_) => new_table, | |
264 | Err(old_table) => { | |
265 | // Free the table we created | |
266 | // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here. | |
267 | unsafe { | |
268 | Box::from_raw(new_table); | |
269 | } | |
270 | old_table | |
271 | } | |
272 | }; | |
273 | // SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we | |
274 | // created here, or it is one loaded from `HASHTABLE`. | |
275 | unsafe { &*table } | |
276 | } | |
277 | ||
278 | // Grow the hash table so that it is big enough for the given number of threads. | |
279 | // This isn't performance-critical since it is only done when a ThreadData is | |
280 | // created, which only happens once per thread. | |
281 | fn grow_hashtable(num_threads: usize) { | |
282 | // Lock all buckets in the existing table and get a reference to it | |
283 | let old_table = loop { | |
284 | let table = get_hashtable(); | |
285 | ||
286 | // Check if we need to resize the existing table | |
287 | if table.entries.len() >= LOAD_FACTOR * num_threads { | |
288 | return; | |
289 | } | |
290 | ||
291 | // Lock all buckets in the old table | |
292 | for bucket in &table.entries[..] { | |
293 | bucket.mutex.lock(); | |
294 | } | |
295 | ||
296 | // Now check if our table is still the latest one. Another thread could | |
297 | // have grown the hash table between us reading HASHTABLE and locking | |
298 | // the buckets. | |
299 | if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ { | |
300 | break table; | |
301 | } | |
302 | ||
303 | // Unlock buckets and try again | |
304 | for bucket in &table.entries[..] { | |
305 | // SAFETY: We hold the lock here, as required | |
306 | unsafe { bucket.mutex.unlock() }; | |
307 | } | |
308 | }; | |
309 | ||
310 | // Create the new table | |
311 | let mut new_table = HashTable::new(num_threads, old_table); | |
312 | ||
313 | // Move the entries from the old table to the new one | |
314 | for bucket in &old_table.entries[..] { | |
315 | // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked | |
316 | // lists. All `ThreadData` instances in these lists will remain valid as long as they are | |
317 | // present in the lists, meaning as long as their threads are parked. | |
318 | unsafe { rehash_bucket_into(bucket, &mut new_table) }; | |
319 | } | |
320 | ||
321 | // Publish the new table. No races are possible at this point because | |
322 | // any other thread trying to grow the hash table is blocked on the bucket | |
323 | // locks in the old table. | |
324 | HASHTABLE.store(Box::into_raw(new_table), Ordering::Release); | |
325 | ||
326 | // Unlock all buckets in the old table | |
327 | for bucket in &old_table.entries[..] { | |
328 | // SAFETY: We hold the lock here, as required | |
329 | unsafe { bucket.mutex.unlock() }; | |
330 | } | |
331 | } | |
332 | ||
333 | /// Iterate through all `ThreadData` objects in the bucket and insert them into the given table | |
334 | /// in the bucket their key correspond to for this table. | |
335 | /// | |
336 | /// # Safety | |
337 | /// | |
338 | /// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing | |
339 | /// `ThreadData` instances that must stay valid at least as long as the given `table` is in use. | |
340 | /// | |
341 | /// The given `table` must only contain buckets with correctly constructed linked lists. | |
342 | unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) { | |
343 | let mut current: *const ThreadData = bucket.queue_head.get(); | |
344 | while !current.is_null() { | |
345 | let next = (*current).next_in_queue.get(); | |
346 | let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits); | |
347 | if table.entries[hash].queue_tail.get().is_null() { | |
348 | table.entries[hash].queue_head.set(current); | |
349 | } else { | |
350 | (*table.entries[hash].queue_tail.get()) | |
351 | .next_in_queue | |
352 | .set(current); | |
353 | } | |
354 | table.entries[hash].queue_tail.set(current); | |
355 | (*current).next_in_queue.set(ptr::null()); | |
356 | current = next; | |
357 | } | |
358 | } | |
359 | ||
360 | // Hash function for addresses | |
361 | #[cfg(target_pointer_width = "32")] | |
362 | #[inline] | |
363 | fn hash(key: usize, bits: u32) -> usize { | |
364 | key.wrapping_mul(0x9E3779B9) >> (32 - bits) | |
365 | } | |
366 | #[cfg(target_pointer_width = "64")] | |
367 | #[inline] | |
368 | fn hash(key: usize, bits: u32) -> usize { | |
369 | key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits) | |
370 | } | |
371 | ||
372 | /// Locks the bucket for the given key and returns a reference to it. | |
373 | /// The returned bucket must be unlocked again in order to not cause deadlocks. | |
374 | #[inline] | |
375 | fn lock_bucket(key: usize) -> &'static Bucket { | |
376 | loop { | |
377 | let hashtable = get_hashtable(); | |
378 | ||
379 | let hash = hash(key, hashtable.hash_bits); | |
380 | let bucket = &hashtable.entries[hash]; | |
381 | ||
382 | // Lock the bucket | |
383 | bucket.mutex.lock(); | |
384 | ||
385 | // If no other thread has rehashed the table before we grabbed the lock | |
386 | // then we are good to go! The lock we grabbed prevents any rehashes. | |
387 | if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { | |
388 | return bucket; | |
389 | } | |
390 | ||
391 | // Unlock the bucket and try again | |
392 | // SAFETY: We hold the lock here, as required | |
393 | unsafe { bucket.mutex.unlock() }; | |
394 | } | |
395 | } | |
396 | ||
397 | /// Locks the bucket for the given key and returns a reference to it. But checks that the key | |
398 | /// hasn't been changed in the meantime due to a requeue. | |
399 | /// The returned bucket must be unlocked again in order to not cause deadlocks. | |
400 | #[inline] | |
401 | fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) { | |
402 | loop { | |
403 | let hashtable = get_hashtable(); | |
404 | let current_key = key.load(Ordering::Relaxed); | |
405 | ||
406 | let hash = hash(current_key, hashtable.hash_bits); | |
407 | let bucket = &hashtable.entries[hash]; | |
408 | ||
409 | // Lock the bucket | |
410 | bucket.mutex.lock(); | |
411 | ||
412 | // Check that both the hash table and key are correct while the bucket | |
413 | // is locked. Note that the key can't change once we locked the proper | |
414 | // bucket for it, so we just keep trying until we have the correct key. | |
415 | if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ | |
416 | && key.load(Ordering::Relaxed) == current_key | |
417 | { | |
418 | return (current_key, bucket); | |
419 | } | |
420 | ||
421 | // Unlock the bucket and try again | |
422 | // SAFETY: We hold the lock here, as required | |
423 | unsafe { bucket.mutex.unlock() }; | |
424 | } | |
425 | } | |
426 | ||
427 | /// Locks the two buckets for the given pair of keys and returns references to them. | |
428 | /// The returned buckets must be unlocked again in order to not cause deadlocks. | |
429 | /// | |
430 | /// If both keys hash to the same value, both returned references will be to the same bucket. Be | |
431 | /// careful to only unlock it once in this case, always use `unlock_bucket_pair`. | |
432 | #[inline] | |
433 | fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) { | |
434 | loop { | |
435 | let hashtable = get_hashtable(); | |
436 | ||
437 | let hash1 = hash(key1, hashtable.hash_bits); | |
438 | let hash2 = hash(key2, hashtable.hash_bits); | |
439 | ||
440 | // Get the bucket at the lowest hash/index first | |
441 | let bucket1 = if hash1 <= hash2 { | |
442 | &hashtable.entries[hash1] | |
443 | } else { | |
444 | &hashtable.entries[hash2] | |
445 | }; | |
446 | ||
447 | // Lock the first bucket | |
448 | bucket1.mutex.lock(); | |
449 | ||
450 | // If no other thread has rehashed the table before we grabbed the lock | |
451 | // then we are good to go! The lock we grabbed prevents any rehashes. | |
452 | if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { | |
453 | // Now lock the second bucket and return the two buckets | |
454 | if hash1 == hash2 { | |
455 | return (bucket1, bucket1); | |
456 | } else if hash1 < hash2 { | |
457 | let bucket2 = &hashtable.entries[hash2]; | |
458 | bucket2.mutex.lock(); | |
459 | return (bucket1, bucket2); | |
460 | } else { | |
461 | let bucket2 = &hashtable.entries[hash1]; | |
462 | bucket2.mutex.lock(); | |
463 | return (bucket2, bucket1); | |
464 | } | |
465 | } | |
466 | ||
467 | // Unlock the bucket and try again | |
468 | // SAFETY: We hold the lock here, as required | |
469 | unsafe { bucket1.mutex.unlock() }; | |
470 | } | |
471 | } | |
472 | ||
473 | /// Unlock a pair of buckets | |
474 | /// | |
475 | /// # Safety | |
476 | /// | |
477 | /// Both buckets must be locked | |
478 | #[inline] | |
479 | unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) { | |
480 | bucket1.mutex.unlock(); | |
481 | if !ptr::eq(bucket1, bucket2) { | |
482 | bucket2.mutex.unlock(); | |
483 | } | |
484 | } | |
485 | ||
486 | /// Result of a park operation. | |
487 | #[derive(Copy, Clone, Eq, PartialEq, Debug)] | |
488 | pub enum ParkResult { | |
489 | /// We were unparked by another thread with the given token. | |
490 | Unparked(UnparkToken), | |
491 | ||
492 | /// The validation callback returned false. | |
493 | Invalid, | |
494 | ||
495 | /// The timeout expired. | |
496 | TimedOut, | |
497 | } | |
498 | ||
499 | impl ParkResult { | |
500 | /// Returns true if we were unparked by another thread. | |
501 | #[inline] | |
502 | pub fn is_unparked(self) -> bool { | |
503 | if let ParkResult::Unparked(_) = self { | |
504 | true | |
505 | } else { | |
506 | false | |
507 | } | |
508 | } | |
509 | } | |
510 | ||
511 | /// Result of an unpark operation. | |
512 | #[derive(Copy, Clone, Default, Eq, PartialEq, Debug)] | |
513 | pub struct UnparkResult { | |
514 | /// The number of threads that were unparked. | |
515 | pub unparked_threads: usize, | |
516 | ||
517 | /// The number of threads that were requeued. | |
518 | pub requeued_threads: usize, | |
519 | ||
520 | /// Whether there are any threads remaining in the queue. This only returns | |
521 | /// true if a thread was unparked. | |
522 | pub have_more_threads: bool, | |
523 | ||
524 | /// This is set to true on average once every 0.5ms for any given key. It | |
525 | /// should be used to switch to a fair unlocking mechanism for a particular | |
526 | /// unlock. | |
527 | pub be_fair: bool, | |
528 | ||
529 | /// Private field so new fields can be added without breakage. | |
530 | _sealed: (), | |
531 | } | |
532 | ||
533 | /// Operation that `unpark_requeue` should perform. | |
534 | #[derive(Copy, Clone, Eq, PartialEq, Debug)] | |
535 | pub enum RequeueOp { | |
536 | /// Abort the operation without doing anything. | |
537 | Abort, | |
538 | ||
539 | /// Unpark one thread and requeue the rest onto the target queue. | |
540 | UnparkOneRequeueRest, | |
541 | ||
542 | /// Requeue all threads onto the target queue. | |
543 | RequeueAll, | |
544 | ||
545 | /// Unpark one thread and leave the rest parked. No requeuing is done. | |
546 | UnparkOne, | |
547 | ||
548 | /// Requeue one thread and leave the rest parked on the original queue. | |
549 | RequeueOne, | |
550 | } | |
551 | ||
552 | /// Operation that `unpark_filter` should perform for each thread. | |
553 | #[derive(Copy, Clone, Eq, PartialEq, Debug)] | |
554 | pub enum FilterOp { | |
555 | /// Unpark the thread and continue scanning the list of parked threads. | |
556 | Unpark, | |
557 | ||
558 | /// Don't unpark the thread and continue scanning the list of parked threads. | |
559 | Skip, | |
560 | ||
561 | /// Don't unpark the thread and stop scanning the list of parked threads. | |
562 | Stop, | |
563 | } | |
564 | ||
565 | /// A value which is passed from an unparker to a parked thread. | |
566 | #[derive(Copy, Clone, Eq, PartialEq, Debug)] | |
567 | pub struct UnparkToken(pub usize); | |
568 | ||
569 | /// A value associated with a parked thread which can be used by `unpark_filter`. | |
570 | #[derive(Copy, Clone, Eq, PartialEq, Debug)] | |
571 | pub struct ParkToken(pub usize); | |
572 | ||
573 | /// A default unpark token to use. | |
574 | pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0); | |
575 | ||
576 | /// A default park token to use. | |
577 | pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0); | |
578 | ||
579 | /// Parks the current thread in the queue associated with the given key. | |
580 | /// | |
581 | /// The `validate` function is called while the queue is locked and can abort | |
582 | /// the operation by returning false. If `validate` returns true then the | |
583 | /// current thread is appended to the queue and the queue is unlocked. | |
584 | /// | |
585 | /// The `before_sleep` function is called after the queue is unlocked but before | |
586 | /// the thread is put to sleep. The thread will then sleep until it is unparked | |
587 | /// or the given timeout is reached. | |
588 | /// | |
589 | /// The `timed_out` function is also called while the queue is locked, but only | |
590 | /// if the timeout was reached. It is passed the key of the queue it was in when | |
591 | /// it timed out, which may be different from the original key if | |
592 | /// `unpark_requeue` was called. It is also passed a bool which indicates | |
593 | /// whether it was the last thread in the queue. | |
594 | /// | |
595 | /// # Safety | |
596 | /// | |
597 | /// You should only call this function with an address that you control, since | |
598 | /// you could otherwise interfere with the operation of other synchronization | |
599 | /// primitives. | |
600 | /// | |
601 | /// The `validate` and `timed_out` functions are called while the queue is | |
602 | /// locked and must not panic or call into any function in `parking_lot`. | |
603 | /// | |
604 | /// The `before_sleep` function is called outside the queue lock and is allowed | |
605 | /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but | |
606 | /// it is not allowed to call `park` or panic. | |
607 | #[inline] | |
608 | pub unsafe fn park( | |
609 | key: usize, | |
610 | validate: impl FnOnce() -> bool, | |
611 | before_sleep: impl FnOnce(), | |
612 | timed_out: impl FnOnce(usize, bool), | |
613 | park_token: ParkToken, | |
614 | timeout: Option<Instant>, | |
615 | ) -> ParkResult { | |
616 | // Grab our thread data, this also ensures that the hash table exists | |
617 | with_thread_data(|thread_data| { | |
618 | // Lock the bucket for the given key | |
619 | let bucket = lock_bucket(key); | |
620 | ||
621 | // If the validation function fails, just return | |
622 | if !validate() { | |
623 | // SAFETY: We hold the lock here, as required | |
624 | bucket.mutex.unlock(); | |
625 | return ParkResult::Invalid; | |
626 | } | |
627 | ||
628 | // Append our thread data to the queue and unlock the bucket | |
629 | thread_data.parked_with_timeout.set(timeout.is_some()); | |
630 | thread_data.next_in_queue.set(ptr::null()); | |
631 | thread_data.key.store(key, Ordering::Relaxed); | |
632 | thread_data.park_token.set(park_token); | |
633 | thread_data.parker.prepare_park(); | |
634 | if !bucket.queue_head.get().is_null() { | |
635 | (*bucket.queue_tail.get()).next_in_queue.set(thread_data); | |
636 | } else { | |
637 | bucket.queue_head.set(thread_data); | |
638 | } | |
639 | bucket.queue_tail.set(thread_data); | |
640 | // SAFETY: We hold the lock here, as required | |
641 | bucket.mutex.unlock(); | |
642 | ||
643 | // Invoke the pre-sleep callback | |
644 | before_sleep(); | |
645 | ||
646 | // Park our thread and determine whether we were woken up by an unpark | |
647 | // or by our timeout. Note that this isn't precise: we can still be | |
648 | // unparked since we are still in the queue. | |
649 | let unparked = match timeout { | |
650 | Some(timeout) => thread_data.parker.park_until(timeout), | |
651 | None => { | |
652 | thread_data.parker.park(); | |
653 | // call deadlock detection on_unpark hook | |
654 | deadlock::on_unpark(thread_data); | |
655 | true | |
656 | } | |
657 | }; | |
658 | ||
659 | // If we were unparked, return now | |
660 | if unparked { | |
661 | return ParkResult::Unparked(thread_data.unpark_token.get()); | |
662 | } | |
663 | ||
664 | // Lock our bucket again. Note that the hashtable may have been rehashed in | |
665 | // the meantime. Our key may also have changed if we were requeued. | |
666 | let (key, bucket) = lock_bucket_checked(&thread_data.key); | |
667 | ||
668 | // Now we need to check again if we were unparked or timed out. Unlike the | |
669 | // last check this is precise because we hold the bucket lock. | |
670 | if !thread_data.parker.timed_out() { | |
671 | // SAFETY: We hold the lock here, as required | |
672 | bucket.mutex.unlock(); | |
673 | return ParkResult::Unparked(thread_data.unpark_token.get()); | |
674 | } | |
675 | ||
676 | // We timed out, so we now need to remove our thread from the queue | |
677 | let mut link = &bucket.queue_head; | |
678 | let mut current = bucket.queue_head.get(); | |
679 | let mut previous = ptr::null(); | |
680 | let mut was_last_thread = true; | |
681 | while !current.is_null() { | |
682 | if current == thread_data { | |
683 | let next = (*current).next_in_queue.get(); | |
684 | link.set(next); | |
685 | if bucket.queue_tail.get() == current { | |
686 | bucket.queue_tail.set(previous); | |
687 | } else { | |
688 | // Scan the rest of the queue to see if there are any other | |
689 | // entries with the given key. | |
690 | let mut scan = next; | |
691 | while !scan.is_null() { | |
692 | if (*scan).key.load(Ordering::Relaxed) == key { | |
693 | was_last_thread = false; | |
694 | break; | |
695 | } | |
696 | scan = (*scan).next_in_queue.get(); | |
697 | } | |
698 | } | |
699 | ||
700 | // Callback to indicate that we timed out, and whether we were the | |
701 | // last thread on the queue. | |
702 | timed_out(key, was_last_thread); | |
703 | break; | |
704 | } else { | |
705 | if (*current).key.load(Ordering::Relaxed) == key { | |
706 | was_last_thread = false; | |
707 | } | |
708 | link = &(*current).next_in_queue; | |
709 | previous = current; | |
710 | current = link.get(); | |
711 | } | |
712 | } | |
713 | ||
714 | // There should be no way for our thread to have been removed from the queue | |
715 | // if we timed out. | |
716 | debug_assert!(!current.is_null()); | |
717 | ||
718 | // Unlock the bucket, we are done | |
719 | // SAFETY: We hold the lock here, as required | |
720 | bucket.mutex.unlock(); | |
721 | ParkResult::TimedOut | |
722 | }) | |
723 | } | |
724 | ||
725 | /// Unparks one thread from the queue associated with the given key. | |
726 | /// | |
727 | /// The `callback` function is called while the queue is locked and before the | |
728 | /// target thread is woken up. The `UnparkResult` argument to the function | |
729 | /// indicates whether a thread was found in the queue and whether this was the | |
730 | /// last thread in the queue. This value is also returned by `unpark_one`. | |
731 | /// | |
732 | /// The `callback` function should return an `UnparkToken` value which will be | |
733 | /// passed to the thread that is unparked. If no thread is unparked then the | |
734 | /// returned value is ignored. | |
735 | /// | |
736 | /// # Safety | |
737 | /// | |
738 | /// You should only call this function with an address that you control, since | |
739 | /// you could otherwise interfere with the operation of other synchronization | |
740 | /// primitives. | |
741 | /// | |
742 | /// The `callback` function is called while the queue is locked and must not | |
743 | /// panic or call into any function in `parking_lot`. | |
744 | #[inline] | |
745 | pub unsafe fn unpark_one( | |
746 | key: usize, | |
747 | callback: impl FnOnce(UnparkResult) -> UnparkToken, | |
748 | ) -> UnparkResult { | |
749 | // Lock the bucket for the given key | |
750 | let bucket = lock_bucket(key); | |
751 | ||
752 | // Find a thread with a matching key and remove it from the queue | |
753 | let mut link = &bucket.queue_head; | |
754 | let mut current = bucket.queue_head.get(); | |
755 | let mut previous = ptr::null(); | |
756 | let mut result = UnparkResult::default(); | |
757 | while !current.is_null() { | |
758 | if (*current).key.load(Ordering::Relaxed) == key { | |
759 | // Remove the thread from the queue | |
760 | let next = (*current).next_in_queue.get(); | |
761 | link.set(next); | |
762 | if bucket.queue_tail.get() == current { | |
763 | bucket.queue_tail.set(previous); | |
764 | } else { | |
765 | // Scan the rest of the queue to see if there are any other | |
766 | // entries with the given key. | |
767 | let mut scan = next; | |
768 | while !scan.is_null() { | |
769 | if (*scan).key.load(Ordering::Relaxed) == key { | |
770 | result.have_more_threads = true; | |
771 | break; | |
772 | } | |
773 | scan = (*scan).next_in_queue.get(); | |
774 | } | |
775 | } | |
776 | ||
777 | // Invoke the callback before waking up the thread | |
778 | result.unparked_threads = 1; | |
779 | result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); | |
780 | let token = callback(result); | |
781 | ||
782 | // Set the token for the target thread | |
783 | (*current).unpark_token.set(token); | |
784 | ||
785 | // This is a bit tricky: we first lock the ThreadParker to prevent | |
786 | // the thread from exiting and freeing its ThreadData if its wait | |
787 | // times out. Then we unlock the queue since we don't want to keep | |
788 | // the queue locked while we perform a system call. Finally we wake | |
789 | // up the parked thread. | |
790 | let handle = (*current).parker.unpark_lock(); | |
791 | // SAFETY: We hold the lock here, as required | |
792 | bucket.mutex.unlock(); | |
793 | handle.unpark(); | |
794 | ||
795 | return result; | |
796 | } else { | |
797 | link = &(*current).next_in_queue; | |
798 | previous = current; | |
799 | current = link.get(); | |
800 | } | |
801 | } | |
802 | ||
803 | // No threads with a matching key were found in the bucket | |
804 | callback(result); | |
805 | // SAFETY: We hold the lock here, as required | |
806 | bucket.mutex.unlock(); | |
807 | result | |
808 | } | |
809 | ||
810 | /// Unparks all threads in the queue associated with the given key. | |
811 | /// | |
812 | /// The given `UnparkToken` is passed to all unparked threads. | |
813 | /// | |
814 | /// This function returns the number of threads that were unparked. | |
815 | /// | |
816 | /// # Safety | |
817 | /// | |
818 | /// You should only call this function with an address that you control, since | |
819 | /// you could otherwise interfere with the operation of other synchronization | |
820 | /// primitives. | |
821 | #[inline] | |
822 | pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { | |
823 | // Lock the bucket for the given key | |
824 | let bucket = lock_bucket(key); | |
825 | ||
826 | // Remove all threads with the given key in the bucket | |
827 | let mut link = &bucket.queue_head; | |
828 | let mut current = bucket.queue_head.get(); | |
829 | let mut previous = ptr::null(); | |
830 | let mut threads = SmallVec::<[_; 8]>::new(); | |
831 | while !current.is_null() { | |
832 | if (*current).key.load(Ordering::Relaxed) == key { | |
833 | // Remove the thread from the queue | |
834 | let next = (*current).next_in_queue.get(); | |
835 | link.set(next); | |
836 | if bucket.queue_tail.get() == current { | |
837 | bucket.queue_tail.set(previous); | |
838 | } | |
839 | ||
840 | // Set the token for the target thread | |
841 | (*current).unpark_token.set(unpark_token); | |
842 | ||
843 | // Don't wake up threads while holding the queue lock. See comment | |
844 | // in unpark_one. For now just record which threads we need to wake | |
845 | // up. | |
846 | threads.push((*current).parker.unpark_lock()); | |
847 | current = next; | |
848 | } else { | |
849 | link = &(*current).next_in_queue; | |
850 | previous = current; | |
851 | current = link.get(); | |
852 | } | |
853 | } | |
854 | ||
855 | // Unlock the bucket | |
856 | // SAFETY: We hold the lock here, as required | |
857 | bucket.mutex.unlock(); | |
858 | ||
859 | // Now that we are outside the lock, wake up all the threads that we removed | |
860 | // from the queue. | |
861 | let num_threads = threads.len(); | |
862 | for handle in threads.into_iter() { | |
863 | handle.unpark(); | |
864 | } | |
865 | ||
866 | num_threads | |
867 | } | |
868 | ||
869 | /// Removes all threads from the queue associated with `key_from`, optionally | |
870 | /// unparks the first one and requeues the rest onto the queue associated with | |
871 | /// `key_to`. | |
872 | /// | |
873 | /// The `validate` function is called while both queues are locked. Its return | |
874 | /// value will determine which operation is performed, or whether the operation | |
875 | /// should be aborted. See `RequeueOp` for details about the different possible | |
876 | /// return values. | |
877 | /// | |
878 | /// The `callback` function is also called while both queues are locked. It is | |
879 | /// passed the `RequeueOp` returned by `validate` and an `UnparkResult` | |
880 | /// indicating whether a thread was unparked and whether there are threads still | |
881 | /// parked in the new queue. This `UnparkResult` value is also returned by | |
882 | /// `unpark_requeue`. | |
883 | /// | |
884 | /// The `callback` function should return an `UnparkToken` value which will be | |
885 | /// passed to the thread that is unparked. If no thread is unparked then the | |
886 | /// returned value is ignored. | |
887 | /// | |
888 | /// # Safety | |
889 | /// | |
890 | /// You should only call this function with an address that you control, since | |
891 | /// you could otherwise interfere with the operation of other synchronization | |
892 | /// primitives. | |
893 | /// | |
894 | /// The `validate` and `callback` functions are called while the queue is locked | |
895 | /// and must not panic or call into any function in `parking_lot`. | |
896 | #[inline] | |
897 | pub unsafe fn unpark_requeue( | |
898 | key_from: usize, | |
899 | key_to: usize, | |
900 | validate: impl FnOnce() -> RequeueOp, | |
901 | callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, | |
902 | ) -> UnparkResult { | |
903 | // Lock the two buckets for the given key | |
904 | let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to); | |
905 | ||
906 | // If the validation function fails, just return | |
907 | let mut result = UnparkResult::default(); | |
908 | let op = validate(); | |
909 | if op == RequeueOp::Abort { | |
910 | // SAFETY: Both buckets are locked, as required. | |
911 | unlock_bucket_pair(bucket_from, bucket_to); | |
912 | return result; | |
913 | } | |
914 | ||
915 | // Remove all threads with the given key in the source bucket | |
916 | let mut link = &bucket_from.queue_head; | |
917 | let mut current = bucket_from.queue_head.get(); | |
918 | let mut previous = ptr::null(); | |
919 | let mut requeue_threads: *const ThreadData = ptr::null(); | |
920 | let mut requeue_threads_tail: *const ThreadData = ptr::null(); | |
921 | let mut wakeup_thread = None; | |
922 | while !current.is_null() { | |
923 | if (*current).key.load(Ordering::Relaxed) == key_from { | |
924 | // Remove the thread from the queue | |
925 | let next = (*current).next_in_queue.get(); | |
926 | link.set(next); | |
927 | if bucket_from.queue_tail.get() == current { | |
928 | bucket_from.queue_tail.set(previous); | |
929 | } | |
930 | ||
931 | // Prepare the first thread for wakeup and requeue the rest. | |
932 | if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne) | |
933 | && wakeup_thread.is_none() | |
934 | { | |
935 | wakeup_thread = Some(current); | |
936 | result.unparked_threads = 1; | |
937 | } else { | |
938 | if !requeue_threads.is_null() { | |
939 | (*requeue_threads_tail).next_in_queue.set(current); | |
940 | } else { | |
941 | requeue_threads = current; | |
942 | } | |
943 | requeue_threads_tail = current; | |
944 | (*current).key.store(key_to, Ordering::Relaxed); | |
945 | result.requeued_threads += 1; | |
946 | } | |
947 | if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne { | |
948 | // Scan the rest of the queue to see if there are any other | |
949 | // entries with the given key. | |
950 | let mut scan = next; | |
951 | while !scan.is_null() { | |
952 | if (*scan).key.load(Ordering::Relaxed) == key_from { | |
953 | result.have_more_threads = true; | |
954 | break; | |
955 | } | |
956 | scan = (*scan).next_in_queue.get(); | |
957 | } | |
958 | break; | |
959 | } | |
960 | current = next; | |
961 | } else { | |
962 | link = &(*current).next_in_queue; | |
963 | previous = current; | |
964 | current = link.get(); | |
965 | } | |
966 | } | |
967 | ||
968 | // Add the requeued threads to the destination bucket | |
969 | if !requeue_threads.is_null() { | |
970 | (*requeue_threads_tail).next_in_queue.set(ptr::null()); | |
971 | if !bucket_to.queue_head.get().is_null() { | |
972 | (*bucket_to.queue_tail.get()) | |
973 | .next_in_queue | |
974 | .set(requeue_threads); | |
975 | } else { | |
976 | bucket_to.queue_head.set(requeue_threads); | |
977 | } | |
978 | bucket_to.queue_tail.set(requeue_threads_tail); | |
979 | } | |
980 | ||
981 | // Invoke the callback before waking up the thread | |
982 | if result.unparked_threads != 0 { | |
983 | result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout(); | |
984 | } | |
985 | let token = callback(op, result); | |
986 | ||
987 | // See comment in unpark_one for why we mess with the locking | |
988 | if let Some(wakeup_thread) = wakeup_thread { | |
989 | (*wakeup_thread).unpark_token.set(token); | |
990 | let handle = (*wakeup_thread).parker.unpark_lock(); | |
991 | // SAFETY: Both buckets are locked, as required. | |
992 | unlock_bucket_pair(bucket_from, bucket_to); | |
993 | handle.unpark(); | |
994 | } else { | |
995 | // SAFETY: Both buckets are locked, as required. | |
996 | unlock_bucket_pair(bucket_from, bucket_to); | |
997 | } | |
998 | ||
999 | result | |
1000 | } | |
1001 | ||
1002 | /// Unparks a number of threads from the front of the queue associated with | |
1003 | /// `key` depending on the results of a filter function which inspects the | |
1004 | /// `ParkToken` associated with each thread. | |
1005 | /// | |
1006 | /// The `filter` function is called for each thread in the queue or until | |
1007 | /// `FilterOp::Stop` is returned. This function is passed the `ParkToken` | |
1008 | /// associated with a particular thread, which is unparked if `FilterOp::Unpark` | |
1009 | /// is returned. | |
1010 | /// | |
1011 | /// The `callback` function is also called while both queues are locked. It is | |
1012 | /// passed an `UnparkResult` indicating the number of threads that were unparked | |
1013 | /// and whether there are still parked threads in the queue. This `UnparkResult` | |
1014 | /// value is also returned by `unpark_filter`. | |
1015 | /// | |
1016 | /// The `callback` function should return an `UnparkToken` value which will be | |
1017 | /// passed to all threads that are unparked. If no thread is unparked then the | |
1018 | /// returned value is ignored. | |
1019 | /// | |
1020 | /// # Safety | |
1021 | /// | |
1022 | /// You should only call this function with an address that you control, since | |
1023 | /// you could otherwise interfere with the operation of other synchronization | |
1024 | /// primitives. | |
1025 | /// | |
1026 | /// The `filter` and `callback` functions are called while the queue is locked | |
1027 | /// and must not panic or call into any function in `parking_lot`. | |
1028 | #[inline] | |
1029 | pub unsafe fn unpark_filter( | |
1030 | key: usize, | |
1031 | mut filter: impl FnMut(ParkToken) -> FilterOp, | |
1032 | callback: impl FnOnce(UnparkResult) -> UnparkToken, | |
1033 | ) -> UnparkResult { | |
1034 | // Lock the bucket for the given key | |
1035 | let bucket = lock_bucket(key); | |
1036 | ||
1037 | // Go through the queue looking for threads with a matching key | |
1038 | let mut link = &bucket.queue_head; | |
1039 | let mut current = bucket.queue_head.get(); | |
1040 | let mut previous = ptr::null(); | |
1041 | let mut threads = SmallVec::<[_; 8]>::new(); | |
1042 | let mut result = UnparkResult::default(); | |
1043 | while !current.is_null() { | |
1044 | if (*current).key.load(Ordering::Relaxed) == key { | |
1045 | // Call the filter function with the thread's ParkToken | |
1046 | let next = (*current).next_in_queue.get(); | |
1047 | match filter((*current).park_token.get()) { | |
1048 | FilterOp::Unpark => { | |
1049 | // Remove the thread from the queue | |
1050 | link.set(next); | |
1051 | if bucket.queue_tail.get() == current { | |
1052 | bucket.queue_tail.set(previous); | |
1053 | } | |
1054 | ||
1055 | // Add the thread to our list of threads to unpark | |
1056 | threads.push((current, None)); | |
1057 | ||
1058 | current = next; | |
1059 | } | |
1060 | FilterOp::Skip => { | |
1061 | result.have_more_threads = true; | |
1062 | link = &(*current).next_in_queue; | |
1063 | previous = current; | |
1064 | current = link.get(); | |
1065 | } | |
1066 | FilterOp::Stop => { | |
1067 | result.have_more_threads = true; | |
1068 | break; | |
1069 | } | |
1070 | } | |
1071 | } else { | |
1072 | link = &(*current).next_in_queue; | |
1073 | previous = current; | |
1074 | current = link.get(); | |
1075 | } | |
1076 | } | |
1077 | ||
1078 | // Invoke the callback before waking up the threads | |
1079 | result.unparked_threads = threads.len(); | |
1080 | if result.unparked_threads != 0 { | |
1081 | result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); | |
1082 | } | |
1083 | let token = callback(result); | |
1084 | ||
1085 | // Pass the token to all threads that are going to be unparked and prepare | |
1086 | // them for unparking. | |
1087 | for t in threads.iter_mut() { | |
1088 | (*t.0).unpark_token.set(token); | |
1089 | t.1 = Some((*t.0).parker.unpark_lock()); | |
1090 | } | |
1091 | ||
1092 | // SAFETY: We hold the lock here, as required | |
1093 | bucket.mutex.unlock(); | |
1094 | ||
1095 | // Now that we are outside the lock, wake up all the threads that we removed | |
1096 | // from the queue. | |
1097 | for (_, handle) in threads.into_iter() { | |
1098 | handle.unchecked_unwrap().unpark(); | |
1099 | } | |
1100 | ||
1101 | result | |
1102 | } | |
1103 | ||
1104 | /// \[Experimental\] Deadlock detection | |
1105 | /// | |
1106 | /// Enabled via the `deadlock_detection` feature flag. | |
1107 | pub mod deadlock { | |
1108 | #[cfg(feature = "deadlock_detection")] | |
1109 | use super::deadlock_impl; | |
1110 | ||
1111 | #[cfg(feature = "deadlock_detection")] | |
1112 | pub(super) use super::deadlock_impl::DeadlockData; | |
1113 | ||
1114 | /// Acquire a resource identified by key in the deadlock detector | |
1115 | /// Noop if deadlock_detection feature isn't enabled. | |
1116 | /// | |
1117 | /// # Safety | |
1118 | /// | |
1119 | /// Call after the resource is acquired | |
1120 | #[inline] | |
1121 | pub unsafe fn acquire_resource(_key: usize) { | |
1122 | #[cfg(feature = "deadlock_detection")] | |
1123 | deadlock_impl::acquire_resource(_key); | |
1124 | } | |
1125 | ||
1126 | /// Release a resource identified by key in the deadlock detector. | |
1127 | /// Noop if deadlock_detection feature isn't enabled. | |
1128 | /// | |
1129 | /// # Panics | |
1130 | /// | |
1131 | /// Panics if the resource was already released or wasn't acquired in this thread. | |
1132 | /// | |
1133 | /// # Safety | |
1134 | /// | |
1135 | /// Call before the resource is released | |
1136 | #[inline] | |
1137 | pub unsafe fn release_resource(_key: usize) { | |
1138 | #[cfg(feature = "deadlock_detection")] | |
1139 | deadlock_impl::release_resource(_key); | |
1140 | } | |
1141 | ||
1142 | /// Returns all deadlocks detected *since* the last call. | |
1143 | /// Each cycle consist of a vector of `DeadlockedThread`. | |
1144 | #[cfg(feature = "deadlock_detection")] | |
1145 | #[inline] | |
1146 | pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> { | |
1147 | deadlock_impl::check_deadlock() | |
1148 | } | |
1149 | ||
1150 | #[inline] | |
1151 | pub(super) unsafe fn on_unpark(_td: &super::ThreadData) { | |
1152 | #[cfg(feature = "deadlock_detection")] | |
1153 | deadlock_impl::on_unpark(_td); | |
1154 | } | |
1155 | } | |
1156 | ||
1157 | #[cfg(feature = "deadlock_detection")] | |
1158 | mod deadlock_impl { | |
1159 | use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS}; | |
1160 | use crate::thread_parker::{ThreadParkerT, UnparkHandleT}; | |
1161 | use crate::word_lock::WordLock; | |
1162 | use backtrace::Backtrace; | |
1163 | use petgraph; | |
1164 | use petgraph::graphmap::DiGraphMap; | |
1165 | use std::cell::{Cell, UnsafeCell}; | |
1166 | use std::collections::HashSet; | |
1167 | use std::sync::atomic::Ordering; | |
1168 | use std::sync::mpsc; | |
1169 | use thread_id; | |
1170 | ||
1171 | /// Representation of a deadlocked thread | |
1172 | pub struct DeadlockedThread { | |
1173 | thread_id: usize, | |
1174 | backtrace: Backtrace, | |
1175 | } | |
1176 | ||
1177 | impl DeadlockedThread { | |
1178 | /// The system thread id | |
1179 | pub fn thread_id(&self) -> usize { | |
1180 | self.thread_id | |
1181 | } | |
1182 | ||
1183 | /// The thread backtrace | |
1184 | pub fn backtrace(&self) -> &Backtrace { | |
1185 | &self.backtrace | |
1186 | } | |
1187 | } | |
1188 | ||
1189 | pub struct DeadlockData { | |
1190 | // Currently owned resources (keys) | |
1191 | resources: UnsafeCell<Vec<usize>>, | |
1192 | ||
1193 | // Set when there's a pending callstack request | |
1194 | deadlocked: Cell<bool>, | |
1195 | ||
1196 | // Sender used to report the backtrace | |
1197 | backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>, | |
1198 | ||
1199 | // System thread id | |
1200 | thread_id: usize, | |
1201 | } | |
1202 | ||
1203 | impl DeadlockData { | |
1204 | pub fn new() -> Self { | |
1205 | DeadlockData { | |
1206 | resources: UnsafeCell::new(Vec::new()), | |
1207 | deadlocked: Cell::new(false), | |
1208 | backtrace_sender: UnsafeCell::new(None), | |
1209 | thread_id: thread_id::get(), | |
1210 | } | |
1211 | } | |
1212 | } | |
1213 | ||
1214 | pub(super) unsafe fn on_unpark(td: &ThreadData) { | |
1215 | if td.deadlock_data.deadlocked.get() { | |
1216 | let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap(); | |
1217 | sender | |
1218 | .send(DeadlockedThread { | |
1219 | thread_id: td.deadlock_data.thread_id, | |
1220 | backtrace: Backtrace::new(), | |
1221 | }) | |
1222 | .unwrap(); | |
1223 | // make sure to close this sender | |
1224 | drop(sender); | |
1225 | ||
1226 | // park until the end of the time | |
1227 | td.parker.prepare_park(); | |
1228 | td.parker.park(); | |
1229 | unreachable!("unparked deadlocked thread!"); | |
1230 | } | |
1231 | } | |
1232 | ||
1233 | pub unsafe fn acquire_resource(key: usize) { | |
1234 | with_thread_data(|thread_data| { | |
1235 | (*thread_data.deadlock_data.resources.get()).push(key); | |
1236 | }); | |
1237 | } | |
1238 | ||
1239 | pub unsafe fn release_resource(key: usize) { | |
1240 | with_thread_data(|thread_data| { | |
1241 | let resources = &mut (*thread_data.deadlock_data.resources.get()); | |
1242 | ||
1243 | // There is only one situation where we can fail to find the | |
1244 | // resource: we are currently running TLS destructors and our | |
1245 | // ThreadData has already been freed. There isn't much we can do | |
1246 | // about it at this point, so just ignore it. | |
1247 | if let Some(p) = resources.iter().rposition(|x| *x == key) { | |
1248 | resources.swap_remove(p); | |
1249 | } | |
1250 | }); | |
1251 | } | |
1252 | ||
1253 | pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> { | |
1254 | unsafe { | |
1255 | // fast pass | |
1256 | if check_wait_graph_fast() { | |
1257 | // double check | |
1258 | check_wait_graph_slow() | |
1259 | } else { | |
1260 | Vec::new() | |
1261 | } | |
1262 | } | |
1263 | } | |
1264 | ||
1265 | // Simple algorithm that builds a wait graph f the threads and the resources, | |
1266 | // then checks for the presence of cycles (deadlocks). | |
1267 | // This variant isn't precise as it doesn't lock the entire table before checking | |
1268 | unsafe fn check_wait_graph_fast() -> bool { | |
1269 | let table = get_hashtable(); | |
1270 | let thread_count = NUM_THREADS.load(Ordering::Relaxed); | |
1271 | let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2); | |
1272 | ||
1273 | for b in &(*table).entries[..] { | |
1274 | b.mutex.lock(); | |
1275 | let mut current = b.queue_head.get(); | |
1276 | while !current.is_null() { | |
1277 | if !(*current).parked_with_timeout.get() | |
1278 | && !(*current).deadlock_data.deadlocked.get() | |
1279 | { | |
1280 | // .resources are waiting for their owner | |
1281 | for &resource in &(*(*current).deadlock_data.resources.get()) { | |
1282 | graph.add_edge(resource, current as usize, ()); | |
1283 | } | |
1284 | // owner waits for resource .key | |
1285 | graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ()); | |
1286 | } | |
1287 | current = (*current).next_in_queue.get(); | |
1288 | } | |
1289 | // SAFETY: We hold the lock here, as required | |
1290 | b.mutex.unlock(); | |
1291 | } | |
1292 | ||
1293 | petgraph::algo::is_cyclic_directed(&graph) | |
1294 | } | |
1295 | ||
1296 | #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] | |
1297 | enum WaitGraphNode { | |
1298 | Thread(*const ThreadData), | |
1299 | Resource(usize), | |
1300 | } | |
1301 | ||
1302 | use self::WaitGraphNode::*; | |
1303 | ||
1304 | // Contrary to the _fast variant this locks the entries table before looking for cycles. | |
1305 | // Returns all detected thread wait cycles. | |
1306 | // Note that once a cycle is reported it's never reported again. | |
1307 | unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> { | |
1308 | static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new(); | |
1309 | DEADLOCK_DETECTION_LOCK.lock(); | |
1310 | ||
1311 | let mut table = get_hashtable(); | |
1312 | loop { | |
1313 | // Lock all buckets in the old table | |
1314 | for b in &table.entries[..] { | |
1315 | b.mutex.lock(); | |
1316 | } | |
1317 | ||
1318 | // Now check if our table is still the latest one. Another thread could | |
1319 | // have grown the hash table between us getting and locking the hash table. | |
1320 | let new_table = get_hashtable(); | |
1321 | if new_table as *const _ == table as *const _ { | |
1322 | break; | |
1323 | } | |
1324 | ||
1325 | // Unlock buckets and try again | |
1326 | for b in &table.entries[..] { | |
1327 | // SAFETY: We hold the lock here, as required | |
1328 | b.mutex.unlock(); | |
1329 | } | |
1330 | ||
1331 | table = new_table; | |
1332 | } | |
1333 | ||
1334 | let thread_count = NUM_THREADS.load(Ordering::Relaxed); | |
1335 | let mut graph = | |
1336 | DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2); | |
1337 | ||
1338 | for b in &table.entries[..] { | |
1339 | let mut current = b.queue_head.get(); | |
1340 | while !current.is_null() { | |
1341 | if !(*current).parked_with_timeout.get() | |
1342 | && !(*current).deadlock_data.deadlocked.get() | |
1343 | { | |
1344 | // .resources are waiting for their owner | |
1345 | for &resource in &(*(*current).deadlock_data.resources.get()) { | |
1346 | graph.add_edge(Resource(resource), Thread(current), ()); | |
1347 | } | |
1348 | // owner waits for resource .key | |
1349 | graph.add_edge( | |
1350 | Thread(current), | |
1351 | Resource((*current).key.load(Ordering::Relaxed)), | |
1352 | (), | |
1353 | ); | |
1354 | } | |
1355 | current = (*current).next_in_queue.get(); | |
1356 | } | |
1357 | } | |
1358 | ||
1359 | for b in &table.entries[..] { | |
1360 | // SAFETY: We hold the lock here, as required | |
1361 | b.mutex.unlock(); | |
1362 | } | |
1363 | ||
1364 | // find cycles | |
1365 | let cycles = graph_cycles(&graph); | |
1366 | ||
1367 | let mut results = Vec::with_capacity(cycles.len()); | |
1368 | ||
1369 | for cycle in cycles { | |
1370 | let (sender, receiver) = mpsc::channel(); | |
1371 | for td in cycle { | |
1372 | let bucket = lock_bucket((*td).key.load(Ordering::Relaxed)); | |
1373 | (*td).deadlock_data.deadlocked.set(true); | |
1374 | *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone()); | |
1375 | let handle = (*td).parker.unpark_lock(); | |
1376 | // SAFETY: We hold the lock here, as required | |
1377 | bucket.mutex.unlock(); | |
1378 | // unpark the deadlocked thread! | |
1379 | // on unpark it'll notice the deadlocked flag and report back | |
1380 | handle.unpark(); | |
1381 | } | |
1382 | // make sure to drop our sender before collecting results | |
1383 | drop(sender); | |
1384 | results.push(receiver.iter().collect()); | |
1385 | } | |
1386 | ||
1387 | DEADLOCK_DETECTION_LOCK.unlock(); | |
1388 | ||
1389 | results | |
1390 | } | |
1391 | ||
1392 | // normalize a cycle to start with the "smallest" node | |
1393 | fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> { | |
1394 | let min_pos = input | |
1395 | .iter() | |
1396 | .enumerate() | |
1397 | .min_by_key(|&(_, &t)| t) | |
1398 | .map(|(p, _)| p) | |
1399 | .unwrap_or(0); | |
1400 | input | |
1401 | .iter() | |
1402 | .cycle() | |
1403 | .skip(min_pos) | |
1404 | .take(input.len()) | |
1405 | .cloned() | |
1406 | .collect() | |
1407 | } | |
1408 | ||
1409 | // returns all thread cycles in the wait graph | |
1410 | fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> { | |
1411 | use petgraph::visit::depth_first_search; | |
1412 | use petgraph::visit::DfsEvent; | |
1413 | use petgraph::visit::NodeIndexable; | |
1414 | ||
1415 | let mut cycles = HashSet::new(); | |
1416 | let mut path = Vec::with_capacity(g.node_bound()); | |
1417 | // start from threads to get the correct threads cycle | |
1418 | let threads = g | |
1419 | .nodes() | |
1420 | .filter(|n| if let &Thread(_) = n { true } else { false }); | |
1421 | ||
1422 | depth_first_search(g, threads, |e| match e { | |
1423 | DfsEvent::Discover(Thread(n), _) => path.push(n), | |
1424 | DfsEvent::Finish(Thread(_), _) => { | |
1425 | path.pop(); | |
1426 | } | |
1427 | DfsEvent::BackEdge(_, Thread(n)) => { | |
1428 | let from = path.iter().rposition(|&i| i == n).unwrap(); | |
1429 | cycles.insert(normalize_cycle(&path[from..])); | |
1430 | } | |
1431 | _ => (), | |
1432 | }); | |
1433 | ||
1434 | cycles.iter().cloned().collect() | |
1435 | } | |
1436 | } | |
1437 | ||
1438 | #[cfg(test)] | |
1439 | mod tests { | |
1440 | use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN}; | |
1441 | use std::{ | |
1442 | ptr, | |
1443 | sync::{ | |
1444 | atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering}, | |
1445 | Arc, | |
1446 | }, | |
1447 | thread, | |
1448 | time::Duration, | |
1449 | }; | |
1450 | ||
1451 | /// Calls a closure for every `ThreadData` currently parked on a given key | |
1452 | fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) { | |
1453 | let bucket = super::lock_bucket(key); | |
1454 | ||
1455 | let mut current: *const ThreadData = bucket.queue_head.get(); | |
1456 | while !current.is_null() { | |
1457 | let current_ref = unsafe { &*current }; | |
1458 | if current_ref.key.load(Ordering::Relaxed) == key { | |
1459 | f(current_ref); | |
1460 | } | |
1461 | current = current_ref.next_in_queue.get(); | |
1462 | } | |
1463 | ||
1464 | // SAFETY: We hold the lock here, as required | |
1465 | unsafe { bucket.mutex.unlock() }; | |
1466 | } | |
1467 | ||
1468 | macro_rules! test { | |
1469 | ( $( $name:ident( | |
1470 | repeats: $repeats:expr, | |
1471 | latches: $latches:expr, | |
1472 | delay: $delay:expr, | |
1473 | threads: $threads:expr, | |
1474 | single_unparks: $single_unparks:expr); | |
1475 | )* ) => { | |
1476 | $(#[test] | |
1477 | fn $name() { | |
1478 | let delay = Duration::from_micros($delay); | |
1479 | for _ in 0..$repeats { | |
1480 | run_parking_test($latches, delay, $threads, $single_unparks); | |
1481 | } | |
1482 | })* | |
1483 | }; | |
1484 | } | |
1485 | ||
1486 | test! { | |
1487 | unpark_all_one_fast( | |
1488 | repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0 | |
1489 | ); | |
1490 | unpark_all_hundred_fast( | |
1491 | repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0 | |
1492 | ); | |
1493 | unpark_one_one_fast( | |
1494 | repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1 | |
1495 | ); | |
1496 | unpark_one_hundred_fast( | |
1497 | repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100 | |
1498 | ); | |
1499 | unpark_one_fifty_then_fifty_all_fast( | |
1500 | repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50 | |
1501 | ); | |
1502 | unpark_all_one( | |
1503 | repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0 | |
1504 | ); | |
1505 | unpark_all_hundred( | |
1506 | repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0 | |
1507 | ); | |
1508 | unpark_one_one( | |
1509 | repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1 | |
1510 | ); | |
1511 | unpark_one_fifty( | |
1512 | repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50 | |
1513 | ); | |
1514 | unpark_one_fifty_then_fifty_all( | |
1515 | repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50 | |
1516 | ); | |
1517 | hundred_unpark_all_one_fast( | |
1518 | repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0 | |
1519 | ); | |
1520 | hundred_unpark_all_one( | |
1521 | repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0 | |
1522 | ); | |
1523 | } | |
1524 | ||
1525 | fn run_parking_test( | |
1526 | num_latches: usize, | |
1527 | delay: Duration, | |
1528 | num_threads: usize, | |
1529 | num_single_unparks: usize, | |
1530 | ) { | |
1531 | let mut tests = Vec::with_capacity(num_latches); | |
1532 | ||
1533 | for _ in 0..num_latches { | |
1534 | let test = Arc::new(SingleLatchTest::new(num_threads)); | |
1535 | let mut threads = Vec::with_capacity(num_threads); | |
1536 | for _ in 0..num_threads { | |
1537 | let test = test.clone(); | |
1538 | threads.push(thread::spawn(move || test.run())); | |
1539 | } | |
1540 | tests.push((test, threads)); | |
1541 | } | |
1542 | ||
1543 | for unpark_index in 0..num_single_unparks { | |
1544 | thread::sleep(delay); | |
1545 | for (test, _) in &tests { | |
1546 | test.unpark_one(unpark_index); | |
1547 | } | |
1548 | } | |
1549 | ||
1550 | for (test, threads) in tests { | |
1551 | test.finish(num_single_unparks); | |
1552 | for thread in threads { | |
1553 | thread.join().expect("Test thread panic"); | |
1554 | } | |
1555 | } | |
1556 | } | |
1557 | ||
1558 | struct SingleLatchTest { | |
1559 | semaphore: AtomicIsize, | |
1560 | num_awake: AtomicUsize, | |
1561 | /// Holds the pointer to the last *unprocessed* woken up thread. | |
1562 | last_awoken: AtomicPtr<ThreadData>, | |
1563 | /// Total number of threads participating in this test. | |
1564 | num_threads: usize, | |
1565 | } | |
1566 | ||
1567 | impl SingleLatchTest { | |
1568 | pub fn new(num_threads: usize) -> Self { | |
1569 | Self { | |
1570 | // This implements a fair (FIFO) semaphore, and it starts out unavailable. | |
1571 | semaphore: AtomicIsize::new(0), | |
1572 | num_awake: AtomicUsize::new(0), | |
1573 | last_awoken: AtomicPtr::new(ptr::null_mut()), | |
1574 | num_threads, | |
1575 | } | |
1576 | } | |
1577 | ||
1578 | pub fn run(&self) { | |
1579 | // Get one slot from the semaphore | |
1580 | self.down(); | |
1581 | ||
1582 | // Report back to the test verification code that this thread woke up | |
1583 | let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _); | |
1584 | self.last_awoken.store(this_thread_ptr, Ordering::SeqCst); | |
1585 | self.num_awake.fetch_add(1, Ordering::SeqCst); | |
1586 | } | |
1587 | ||
1588 | pub fn unpark_one(&self, single_unpark_index: usize) { | |
1589 | // last_awoken should be null at all times except between self.up() and at the bottom | |
1590 | // of this method where it's reset to null again | |
1591 | assert!(self.last_awoken.load(Ordering::SeqCst).is_null()); | |
1592 | ||
1593 | let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads); | |
1594 | for_each(self.semaphore_addr(), |thread_data| { | |
1595 | queue.push(thread_data as *const _ as *mut _); | |
1596 | }); | |
1597 | assert!(queue.len() <= self.num_threads - single_unpark_index); | |
1598 | ||
1599 | let num_awake_before_up = self.num_awake.load(Ordering::SeqCst); | |
1600 | ||
1601 | self.up(); | |
1602 | ||
1603 | // Wait for a parked thread to wake up and update num_awake + last_awoken. | |
1604 | while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 { | |
1605 | thread::yield_now(); | |
1606 | } | |
1607 | ||
1608 | // At this point the other thread should have set last_awoken inside the run() method | |
1609 | let last_awoken = self.last_awoken.load(Ordering::SeqCst); | |
1610 | assert!(!last_awoken.is_null()); | |
1611 | if !queue.is_empty() && queue[0] != last_awoken { | |
1612 | panic!( | |
1613 | "Woke up wrong thread:\n\tqueue: {:?}\n\tlast awoken: {:?}", | |
1614 | queue, last_awoken | |
1615 | ); | |
1616 | } | |
1617 | self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst); | |
1618 | } | |
1619 | ||
1620 | pub fn finish(&self, num_single_unparks: usize) { | |
1621 | // The amount of threads not unparked via unpark_one | |
1622 | let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap(); | |
1623 | ||
1624 | // Wake remaining threads up with unpark_all. Has to be in a loop, because there might | |
1625 | // still be threads that has not yet parked. | |
1626 | while num_threads_left > 0 { | |
1627 | let mut num_waiting_on_address = 0; | |
1628 | for_each(self.semaphore_addr(), |_thread_data| { | |
1629 | num_waiting_on_address += 1; | |
1630 | }); | |
1631 | assert!(num_waiting_on_address <= num_threads_left); | |
1632 | ||
1633 | let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst); | |
1634 | ||
1635 | let num_unparked = | |
1636 | unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) }; | |
1637 | assert!(num_unparked >= num_waiting_on_address); | |
1638 | assert!(num_unparked <= num_threads_left); | |
1639 | ||
1640 | // Wait for all unparked threads to wake up and update num_awake + last_awoken. | |
1641 | while self.num_awake.load(Ordering::SeqCst) | |
1642 | != num_awake_before_unpark + num_unparked | |
1643 | { | |
1644 | thread::yield_now() | |
1645 | } | |
1646 | ||
1647 | num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap(); | |
1648 | } | |
1649 | // By now, all threads should have been woken up | |
1650 | assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads); | |
1651 | ||
1652 | // Make sure no thread is parked on our semaphore address | |
1653 | let mut num_waiting_on_address = 0; | |
1654 | for_each(self.semaphore_addr(), |_thread_data| { | |
1655 | num_waiting_on_address += 1; | |
1656 | }); | |
1657 | assert_eq!(num_waiting_on_address, 0); | |
1658 | } | |
1659 | ||
1660 | pub fn down(&self) { | |
1661 | let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst); | |
1662 | ||
1663 | if old_semaphore_value > 0 { | |
1664 | // We acquired the semaphore. Done. | |
1665 | return; | |
1666 | } | |
1667 | ||
1668 | // We need to wait. | |
1669 | let validate = || true; | |
1670 | let before_sleep = || {}; | |
1671 | let timed_out = |_, _| {}; | |
1672 | unsafe { | |
1673 | super::park( | |
1674 | self.semaphore_addr(), | |
1675 | validate, | |
1676 | before_sleep, | |
1677 | timed_out, | |
1678 | DEFAULT_PARK_TOKEN, | |
1679 | None, | |
1680 | ); | |
1681 | } | |
1682 | } | |
1683 | ||
1684 | pub fn up(&self) { | |
1685 | let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst); | |
1686 | ||
1687 | // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them. | |
1688 | if old_semaphore_value < 0 { | |
1689 | // We need to continue until we have actually unparked someone. It might be that | |
1690 | // the thread we want to pass ownership to has decremented the semaphore counter, | |
1691 | // but not yet parked. | |
1692 | loop { | |
1693 | match unsafe { | |
1694 | super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN) | |
1695 | .unparked_threads | |
1696 | } { | |
1697 | 1 => break, | |
1698 | 0 => (), | |
1699 | i => panic!("Should not wake up {} threads", i), | |
1700 | } | |
1701 | } | |
1702 | } | |
1703 | } | |
1704 | ||
1705 | fn semaphore_addr(&self) -> usize { | |
1706 | &self.semaphore as *const _ as usize | |
1707 | } | |
1708 | } | |
1709 | } |