1 // Copyright 2016 Amanieu d'Antras
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
8 use crate::elision
::{have_elision, AtomicElisionExt}
;
9 use crate::raw_mutex
::{TOKEN_HANDOFF, TOKEN_NORMAL}
;
13 sync
::atomic
::{AtomicUsize, Ordering}
,
15 use lock_api
::{GuardNoSend, RawRwLock as RawRwLock_, RawRwLockUpgrade}
;
16 use parking_lot_core
::{
17 self, deadlock
, FilterOp
, ParkResult
, ParkToken
, SpinWait
, UnparkResult
, UnparkToken
,
19 use std
::time
::{Duration, Instant}
;
21 // This reader-writer lock implementation is based on Boost's upgrade_mutex:
22 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
24 // This implementation uses 2 wait queues, one at key [addr] and one at key
25 // [addr + 1]. The primary queue is used for all new waiting threads, and the
26 // secondary queue is used by the thread which has acquired WRITER_BIT but is
27 // waiting for the remaining readers to exit the lock.
29 // This implementation is fair between readers and writers since it uses the
30 // order in which threads first started queuing to alternate between read phases
31 // and write phases. In particular is it not vulnerable to write starvation
32 // since readers will block if there is a pending writer.
34 // There is at least one thread in the main queue.
35 const PARKED_BIT
: usize = 0b0001;
36 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
37 const WRITER_PARKED_BIT
: usize = 0b0010;
38 // A reader is holding an upgradable lock. The reader count must be non-zero and
39 // WRITER_BIT must not be set.
40 const UPGRADABLE_BIT
: usize = 0b0100;
41 // If the reader count is zero: a writer is currently holding an exclusive lock.
42 // Otherwise: a writer is waiting for the remaining readers to exit the lock.
43 const WRITER_BIT
: usize = 0b1000;
44 // Mask of bits used to count readers.
45 const READERS_MASK
: usize = !0b1111;
46 // Base unit for counting readers.
47 const ONE_READER
: usize = 0b10000;
49 // Token indicating what type of lock a queued thread is trying to acquire
50 const TOKEN_SHARED
: ParkToken
= ParkToken(ONE_READER
);
51 const TOKEN_EXCLUSIVE
: ParkToken
= ParkToken(WRITER_BIT
);
52 const TOKEN_UPGRADABLE
: ParkToken
= ParkToken(ONE_READER
| UPGRADABLE_BIT
);
54 /// Raw reader-writer lock type backed by the parking lot.
55 pub struct RawRwLock
{
59 unsafe impl lock_api
::RawRwLock
for RawRwLock
{
60 const INIT
: RawRwLock
= RawRwLock
{
61 state
: AtomicUsize
::new(0),
64 type GuardMarker
= GuardNoSend
;
67 fn lock_exclusive(&self) {
70 .compare_exchange_weak(0, WRITER_BIT
, Ordering
::Acquire
, Ordering
::Relaxed
)
73 let result
= self.lock_exclusive_slow(None
);
74 debug_assert
!(result
);
76 self.deadlock_acquire();
80 fn try_lock_exclusive(&self) -> bool
{
83 .compare_exchange(0, WRITER_BIT
, Ordering
::Acquire
, Ordering
::Relaxed
)
86 self.deadlock_acquire();
94 fn unlock_exclusive(&self) {
95 self.deadlock_release();
98 .compare_exchange(WRITER_BIT
, 0, Ordering
::Release
, Ordering
::Relaxed
)
103 self.unlock_exclusive_slow(false);
107 fn lock_shared(&self) {
108 if !self.try_lock_shared_fast(false) {
109 let result
= self.lock_shared_slow(false, None
);
110 debug_assert
!(result
);
112 self.deadlock_acquire();
116 fn try_lock_shared(&self) -> bool
{
117 let result
= if self.try_lock_shared_fast(false) {
120 self.try_lock_shared_slow(false)
123 self.deadlock_acquire();
129 fn unlock_shared(&self) {
130 self.deadlock_release();
131 let state
= if have_elision() {
132 self.state
.elision_fetch_sub_release(ONE_READER
)
134 self.state
.fetch_sub(ONE_READER
, Ordering
::Release
)
136 if state
& (READERS_MASK
| WRITER_PARKED_BIT
) == (ONE_READER
| WRITER_PARKED_BIT
) {
137 self.unlock_shared_slow();
142 unsafe impl lock_api
::RawRwLockFair
for RawRwLock
{
144 fn unlock_shared_fair(&self) {
145 // Shared unlocking is always fair in this implementation.
146 self.unlock_shared();
150 fn unlock_exclusive_fair(&self) {
151 self.deadlock_release();
154 .compare_exchange(WRITER_BIT
, 0, Ordering
::Release
, Ordering
::Relaxed
)
159 self.unlock_exclusive_slow(true);
163 fn bump_shared(&self) {
164 if self.state
.load(Ordering
::Relaxed
) & (READERS_MASK
| WRITER_BIT
)
165 == ONE_READER
| WRITER_BIT
167 self.bump_shared_slow();
172 fn bump_exclusive(&self) {
173 if self.state
.load(Ordering
::Relaxed
) & PARKED_BIT
!= 0 {
174 self.bump_exclusive_slow();
179 unsafe impl lock_api
::RawRwLockDowngrade
for RawRwLock
{
181 fn downgrade(&self) {
184 .fetch_add(ONE_READER
- WRITER_BIT
, Ordering
::Release
);
186 // Wake up parked shared and upgradable threads if there are any
187 if state
& PARKED_BIT
!= 0 {
188 self.downgrade_slow();
193 unsafe impl lock_api
::RawRwLockTimed
for RawRwLock
{
194 type Duration
= Duration
;
195 type Instant
= Instant
;
198 fn try_lock_shared_for(&self, timeout
: Self::Duration
) -> bool
{
199 let result
= if self.try_lock_shared_fast(false) {
202 self.lock_shared_slow(false, util
::to_deadline(timeout
))
205 self.deadlock_acquire();
211 fn try_lock_shared_until(&self, timeout
: Self::Instant
) -> bool
{
212 let result
= if self.try_lock_shared_fast(false) {
215 self.lock_shared_slow(false, Some(timeout
))
218 self.deadlock_acquire();
224 fn try_lock_exclusive_for(&self, timeout
: Duration
) -> bool
{
227 .compare_exchange_weak(0, WRITER_BIT
, Ordering
::Acquire
, Ordering
::Relaxed
)
232 self.lock_exclusive_slow(util
::to_deadline(timeout
))
235 self.deadlock_acquire();
241 fn try_lock_exclusive_until(&self, timeout
: Instant
) -> bool
{
244 .compare_exchange_weak(0, WRITER_BIT
, Ordering
::Acquire
, Ordering
::Relaxed
)
249 self.lock_exclusive_slow(Some(timeout
))
252 self.deadlock_acquire();
258 unsafe impl lock_api
::RawRwLockRecursive
for RawRwLock
{
260 fn lock_shared_recursive(&self) {
261 if !self.try_lock_shared_fast(true) {
262 let result
= self.lock_shared_slow(true, None
);
263 debug_assert
!(result
);
265 self.deadlock_acquire();
269 fn try_lock_shared_recursive(&self) -> bool
{
270 let result
= if self.try_lock_shared_fast(true) {
273 self.try_lock_shared_slow(true)
276 self.deadlock_acquire();
282 unsafe impl lock_api
::RawRwLockRecursiveTimed
for RawRwLock
{
284 fn try_lock_shared_recursive_for(&self, timeout
: Self::Duration
) -> bool
{
285 let result
= if self.try_lock_shared_fast(true) {
288 self.lock_shared_slow(true, util
::to_deadline(timeout
))
291 self.deadlock_acquire();
297 fn try_lock_shared_recursive_until(&self, timeout
: Self::Instant
) -> bool
{
298 let result
= if self.try_lock_shared_fast(true) {
301 self.lock_shared_slow(true, Some(timeout
))
304 self.deadlock_acquire();
310 unsafe impl lock_api
::RawRwLockUpgrade
for RawRwLock
{
312 fn lock_upgradable(&self) {
313 if !self.try_lock_upgradable_fast() {
314 let result
= self.lock_upgradable_slow(None
);
315 debug_assert
!(result
);
317 self.deadlock_acquire();
321 fn try_lock_upgradable(&self) -> bool
{
322 let result
= if self.try_lock_upgradable_fast() {
325 self.try_lock_upgradable_slow()
328 self.deadlock_acquire();
334 fn unlock_upgradable(&self) {
335 self.deadlock_release();
336 let state
= self.state
.load(Ordering
::Relaxed
);
337 if state
& PARKED_BIT
== 0 {
340 .compare_exchange_weak(
342 state
- (ONE_READER
| UPGRADABLE_BIT
),
351 self.unlock_upgradable_slow(false);
356 let state
= self.state
.fetch_sub(
357 (ONE_READER
| UPGRADABLE_BIT
) - WRITER_BIT
,
360 if state
& READERS_MASK
!= ONE_READER
{
361 let result
= self.upgrade_slow(None
);
362 debug_assert
!(result
);
367 fn try_upgrade(&self) -> bool
{
370 .compare_exchange_weak(
371 ONE_READER
| UPGRADABLE_BIT
,
380 self.try_upgrade_slow()
385 unsafe impl lock_api
::RawRwLockUpgradeFair
for RawRwLock
{
387 fn unlock_upgradable_fair(&self) {
388 self.deadlock_release();
389 let state
= self.state
.load(Ordering
::Relaxed
);
390 if state
& PARKED_BIT
== 0 {
393 .compare_exchange_weak(
395 state
- (ONE_READER
| UPGRADABLE_BIT
),
404 self.unlock_upgradable_slow(false);
408 fn bump_upgradable(&self) {
409 if self.state
.load(Ordering
::Relaxed
) == ONE_READER
| UPGRADABLE_BIT
| PARKED_BIT
{
410 self.bump_upgradable_slow();
415 unsafe impl lock_api
::RawRwLockUpgradeDowngrade
for RawRwLock
{
417 fn downgrade_upgradable(&self) {
418 let state
= self.state
.fetch_sub(UPGRADABLE_BIT
, Ordering
::Relaxed
);
420 // Wake up parked upgradable threads if there are any
421 if state
& PARKED_BIT
!= 0 {
422 self.downgrade_slow();
427 fn downgrade_to_upgradable(&self) {
428 let state
= self.state
.fetch_add(
429 (ONE_READER
| UPGRADABLE_BIT
) - WRITER_BIT
,
433 // Wake up parked shared threads if there are any
434 if state
& PARKED_BIT
!= 0 {
435 self.downgrade_to_upgradable_slow();
440 unsafe impl lock_api
::RawRwLockUpgradeTimed
for RawRwLock
{
442 fn try_lock_upgradable_until(&self, timeout
: Instant
) -> bool
{
443 let result
= if self.try_lock_upgradable_fast() {
446 self.lock_upgradable_slow(Some(timeout
))
449 self.deadlock_acquire();
455 fn try_lock_upgradable_for(&self, timeout
: Duration
) -> bool
{
456 let result
= if self.try_lock_upgradable_fast() {
459 self.lock_upgradable_slow(util
::to_deadline(timeout
))
462 self.deadlock_acquire();
468 fn try_upgrade_until(&self, timeout
: Instant
) -> bool
{
469 let state
= self.state
.fetch_sub(
470 (ONE_READER
| UPGRADABLE_BIT
) - WRITER_BIT
,
473 if state
& READERS_MASK
== ONE_READER
{
476 self.upgrade_slow(Some(timeout
))
481 fn try_upgrade_for(&self, timeout
: Duration
) -> bool
{
482 let state
= self.state
.fetch_sub(
483 (ONE_READER
| UPGRADABLE_BIT
) - WRITER_BIT
,
486 if state
& READERS_MASK
== ONE_READER
{
489 self.upgrade_slow(util
::to_deadline(timeout
))
496 fn try_lock_shared_fast(&self, recursive
: bool
) -> bool
{
497 let state
= self.state
.load(Ordering
::Relaxed
);
499 // We can't allow grabbing a shared lock if there is a writer, even if
500 // the writer is still waiting for the remaining readers to exit.
501 if state
& WRITER_BIT
!= 0 {
502 // To allow recursive locks, we make an exception and allow readers
503 // to skip ahead of a pending writer to avoid deadlocking, at the
504 // cost of breaking the fairness guarantees.
505 if !recursive
|| state
& READERS_MASK
== 0 {
510 // Use hardware lock elision to avoid cache conflicts when multiple
511 // readers try to acquire the lock. We only do this if the lock is
512 // completely empty since elision handles conflicts poorly.
513 if have_elision() && state
== 0 {
515 .elision_compare_exchange_acquire(0, ONE_READER
)
517 } else if let Some(new_state
) = state
.checked_add(ONE_READER
) {
519 .compare_exchange_weak(state
, new_state
, Ordering
::Acquire
, Ordering
::Relaxed
)
527 fn try_lock_shared_slow(&self, recursive
: bool
) -> bool
{
528 let mut state
= self.state
.load(Ordering
::Relaxed
);
530 // This mirrors the condition in try_lock_shared_fast
531 if state
& WRITER_BIT
!= 0 {
532 if !recursive
|| state
& READERS_MASK
== 0 {
536 if have_elision() && state
== 0 {
537 match self.state
.elision_compare_exchange_acquire(0, ONE_READER
) {
538 Ok(_
) => return true,
542 match self.state
.compare_exchange_weak(
545 .checked_add(ONE_READER
)
546 .expect("RwLock reader count overflow"),
550 Ok(_
) => return true,
558 fn try_lock_upgradable_fast(&self) -> bool
{
559 let state
= self.state
.load(Ordering
::Relaxed
);
561 // We can't grab an upgradable lock if there is already a writer or
562 // upgradable reader.
563 if state
& (WRITER_BIT
| UPGRADABLE_BIT
) != 0 {
567 if let Some(new_state
) = state
.checked_add(ONE_READER
| UPGRADABLE_BIT
) {
569 .compare_exchange_weak(state
, new_state
, Ordering
::Acquire
, Ordering
::Relaxed
)
577 fn try_lock_upgradable_slow(&self) -> bool
{
578 let mut state
= self.state
.load(Ordering
::Relaxed
);
580 // This mirrors the condition in try_lock_upgradable_fast
581 if state
& (WRITER_BIT
| UPGRADABLE_BIT
) != 0 {
585 match self.state
.compare_exchange_weak(
588 .checked_add(ONE_READER
| UPGRADABLE_BIT
)
589 .expect("RwLock reader count overflow"),
593 Ok(_
) => return true,
600 fn lock_exclusive_slow(&self, timeout
: Option
<Instant
>) -> bool
{
601 let try_lock
= |state
: &mut usize| {
603 if *state
& (WRITER_BIT
| UPGRADABLE_BIT
) != 0 {
607 // Grab WRITER_BIT if it isn't set, even if there are parked threads.
608 match self.state
.compare_exchange_weak(
614 Ok(_
) => return true,
615 Err(x
) => *state
= x
,
620 // Step 1: grab exclusive ownership of WRITER_BIT
621 let timed_out
= !self.lock_common(
625 WRITER_BIT
| UPGRADABLE_BIT
,
631 // Step 2: wait for all remaining readers to exit the lock.
632 self.wait_for_readers(timeout
, 0)
636 fn unlock_exclusive_slow(&self, force_fair
: bool
) {
637 // There are threads to unpark. Try to unpark as many as we can.
638 let callback
= |mut new_state
, result
: UnparkResult
| {
639 // If we are using a fair unlock then we should keep the
640 // rwlock locked and hand it off to the unparked threads.
641 if result
.unparked_threads
!= 0 && (force_fair
|| result
.be_fair
) {
642 if result
.have_more_threads
{
643 new_state
|= PARKED_BIT
;
645 self.state
.store(new_state
, Ordering
::Release
);
648 // Clear the parked bit if there are no more parked threads.
649 if result
.have_more_threads
{
650 self.state
.store(PARKED_BIT
, Ordering
::Release
);
652 self.state
.store(0, Ordering
::Release
);
657 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
659 self.wake_parked_threads(0, callback
);
664 fn lock_shared_slow(&self, recursive
: bool
, timeout
: Option
<Instant
>) -> bool
{
665 let try_lock
= |state
: &mut usize| {
666 let mut spinwait_shared
= SpinWait
::new();
668 // Use hardware lock elision to avoid cache conflicts when multiple
669 // readers try to acquire the lock. We only do this if the lock is
670 // completely empty since elision handles conflicts poorly.
671 if have_elision() && *state
== 0 {
672 match self.state
.elision_compare_exchange_acquire(0, ONE_READER
) {
673 Ok(_
) => return true,
674 Err(x
) => *state
= x
,
678 // This is the same condition as try_lock_shared_fast
679 if *state
& WRITER_BIT
!= 0 {
680 if !recursive
|| *state
& READERS_MASK
== 0 {
687 .compare_exchange_weak(
690 .checked_add(ONE_READER
)
691 .expect("RwLock reader count overflow"),
700 // If there is high contention on the reader count then we want
701 // to leave some time between attempts to acquire the lock to
702 // let other threads make progress.
703 spinwait_shared
.spin_no_yield();
704 *state
= self.state
.load(Ordering
::Relaxed
);
707 self.lock_common(timeout
, TOKEN_SHARED
, try_lock
, WRITER_BIT
)
711 fn unlock_shared_slow(&self) {
712 // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
713 // just need to wake up a potentially sleeping pending writer.
714 // Using the 2nd key at addr + 1
715 let addr
= self as *const _
as usize + 1;
716 let callback
= |_result
: UnparkResult
| {
717 // Clear the WRITER_PARKED_BIT here since there can only be one
718 // parked writer thread.
719 self.state
.fetch_and(!WRITER_PARKED_BIT
, Ordering
::Relaxed
);
723 // * `addr` is an address we control.
724 // * `callback` does not panic or call into any function of `parking_lot`.
726 parking_lot_core
::unpark_one(addr
, callback
);
731 fn lock_upgradable_slow(&self, timeout
: Option
<Instant
>) -> bool
{
732 let try_lock
= |state
: &mut usize| {
733 let mut spinwait_shared
= SpinWait
::new();
735 if *state
& (WRITER_BIT
| UPGRADABLE_BIT
) != 0 {
741 .compare_exchange_weak(
744 .checked_add(ONE_READER
| UPGRADABLE_BIT
)
745 .expect("RwLock reader count overflow"),
754 // If there is high contention on the reader count then we want
755 // to leave some time between attempts to acquire the lock to
756 // let other threads make progress.
757 spinwait_shared
.spin_no_yield();
758 *state
= self.state
.load(Ordering
::Relaxed
);
765 WRITER_BIT
| UPGRADABLE_BIT
,
770 fn unlock_upgradable_slow(&self, force_fair
: bool
) {
771 // Just release the lock if there are no parked threads.
772 let mut state
= self.state
.load(Ordering
::Relaxed
);
773 while state
& PARKED_BIT
== 0 {
774 match self.state
.compare_exchange_weak(
776 state
- (ONE_READER
| UPGRADABLE_BIT
),
785 // There are threads to unpark. Try to unpark as many as we can.
786 let callback
= |new_state
, result
: UnparkResult
| {
787 // If we are using a fair unlock then we should keep the
788 // rwlock locked and hand it off to the unparked threads.
789 let mut state
= self.state
.load(Ordering
::Relaxed
);
790 if force_fair
|| result
.be_fair
{
791 // Fall back to normal unpark on overflow. Panicking is
792 // not allowed in parking_lot callbacks.
793 while let Some(mut new_state
) =
794 (state
- (ONE_READER
| UPGRADABLE_BIT
)).checked_add(new_state
)
796 if result
.have_more_threads
{
797 new_state
|= PARKED_BIT
;
799 new_state
&= !PARKED_BIT
;
801 match self.state
.compare_exchange_weak(
807 Ok(_
) => return TOKEN_HANDOFF
,
813 // Otherwise just release the upgradable lock and update PARKED_BIT.
815 let mut new_state
= state
- (ONE_READER
| UPGRADABLE_BIT
);
816 if result
.have_more_threads
{
817 new_state
|= PARKED_BIT
;
819 new_state
&= !PARKED_BIT
;
821 match self.state
.compare_exchange_weak(
827 Ok(_
) => return TOKEN_NORMAL
,
832 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
834 self.wake_parked_threads(0, callback
);
839 fn try_upgrade_slow(&self) -> bool
{
840 let mut state
= self.state
.load(Ordering
::Relaxed
);
842 if state
& READERS_MASK
!= ONE_READER
{
845 match self.state
.compare_exchange_weak(
847 state
- (ONE_READER
| UPGRADABLE_BIT
) + WRITER_BIT
,
851 Ok(_
) => return true,
858 fn upgrade_slow(&self, timeout
: Option
<Instant
>) -> bool
{
859 self.wait_for_readers(timeout
, ONE_READER
| UPGRADABLE_BIT
)
863 fn downgrade_slow(&self) {
864 // We only reach this point if PARKED_BIT is set.
865 let callback
= |_
, result
: UnparkResult
| {
866 // Clear the parked bit if there no more parked threads
867 if !result
.have_more_threads
{
868 self.state
.fetch_and(!PARKED_BIT
, Ordering
::Relaxed
);
872 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
874 self.wake_parked_threads(ONE_READER
, callback
);
879 fn downgrade_to_upgradable_slow(&self) {
880 // We only reach this point if PARKED_BIT is set.
881 let callback
= |_
, result
: UnparkResult
| {
882 // Clear the parked bit if there no more parked threads
883 if !result
.have_more_threads
{
884 self.state
.fetch_and(!PARKED_BIT
, Ordering
::Relaxed
);
888 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
890 self.wake_parked_threads(ONE_READER
| UPGRADABLE_BIT
, callback
);
895 fn bump_shared_slow(&self) {
896 self.unlock_shared();
901 fn bump_exclusive_slow(&self) {
902 self.deadlock_release();
903 self.unlock_exclusive_slow(true);
904 self.lock_exclusive();
908 fn bump_upgradable_slow(&self) {
909 self.deadlock_release();
910 self.unlock_upgradable_slow(true);
911 self.lock_upgradable();
914 /// Common code for waking up parked threads after releasing WRITER_BIT or
919 /// `callback` must uphold the requirements of the `callback` parameter to
920 /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in
923 unsafe fn wake_parked_threads(
926 callback
: impl FnOnce(usize, UnparkResult
) -> UnparkToken
,
928 // We must wake up at least one upgrader or writer if there is one,
929 // otherwise they may end up parked indefinitely since unlock_shared
930 // does not call wake_parked_threads.
931 let new_state
= Cell
::new(new_state
);
932 let addr
= self as *const _
as usize;
933 let filter
= |ParkToken(token
)| {
934 let s
= new_state
.get();
936 // If we are waking up a writer, don't wake anything else.
937 if s
& WRITER_BIT
!= 0 {
938 return FilterOp
::Stop
;
941 // Otherwise wake *all* readers and one upgrader/writer.
942 if token
& (UPGRADABLE_BIT
| WRITER_BIT
) != 0 && s
& UPGRADABLE_BIT
!= 0 {
943 // Skip writers and upgradable readers if we already have
944 // a writer/upgradable reader.
947 new_state
.set(s
+ token
);
951 let callback
= |result
| callback(new_state
.get(), result
);
953 // * `addr` is an address we control.
954 // * `filter` does not panic or call into any function of `parking_lot`.
955 // * `callback` safety responsibility is on caller
956 parking_lot_core
::unpark_filter(addr
, filter
, callback
);
959 // Common code for waiting for readers to exit the lock after acquiring
962 fn wait_for_readers(&self, timeout
: Option
<Instant
>, prev_value
: usize) -> bool
{
963 // At this point WRITER_BIT is already set, we just need to wait for the
964 // remaining readers to exit the lock.
965 let mut spinwait
= SpinWait
::new();
966 let mut state
= self.state
.load(Ordering
::Relaxed
);
967 while state
& READERS_MASK
!= 0 {
968 // Spin a few times to wait for readers to exit
970 state
= self.state
.load(Ordering
::Relaxed
);
974 // Set the parked bit
975 if state
& WRITER_PARKED_BIT
== 0 {
976 if let Err(x
) = self.state
.compare_exchange_weak(
978 state
| WRITER_PARKED_BIT
,
987 // Park our thread until we are woken up by an unlock
988 // Using the 2nd key at addr + 1
989 let addr
= self as *const _
as usize + 1;
991 let state
= self.state
.load(Ordering
::Relaxed
);
992 state
& READERS_MASK
!= 0 && state
& WRITER_PARKED_BIT
!= 0
994 let before_sleep
= || {}
;
995 let timed_out
= |_
, _
| {}
;
997 // * `addr` is an address we control.
998 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
999 // * `before_sleep` does not call `park`, nor does it panic.
1000 let park_result
= unsafe {
1001 parking_lot_core
::park(
1011 // We still need to re-check the state if we are unparked
1012 // since a previous writer timing-out could have allowed
1013 // another reader to sneak in before we parked.
1014 ParkResult
::Unparked(_
) | ParkResult
::Invalid
=> {
1015 state
= self.state
.load(Ordering
::Relaxed
);
1020 ParkResult
::TimedOut
=> {
1021 // We need to release WRITER_BIT and revert back to
1022 // our previous value. We also wake up any threads that
1023 // might be waiting on WRITER_BIT.
1024 let state
= self.state
.fetch_add(
1025 prev_value
.wrapping_sub(WRITER_BIT
| WRITER_PARKED_BIT
),
1028 if state
& PARKED_BIT
!= 0 {
1029 let callback
= |_
, result
: UnparkResult
| {
1030 // Clear the parked bit if there no more parked threads
1031 if !result
.have_more_threads
{
1032 self.state
.fetch_and(!PARKED_BIT
, Ordering
::Relaxed
);
1036 // SAFETY: `callback` does not panic or call any function of `parking_lot`.
1038 self.wake_parked_threads(ONE_READER
| UPGRADABLE_BIT
, callback
);
1048 /// Common code for acquiring a lock
1052 timeout
: Option
<Instant
>,
1054 mut try_lock
: impl FnMut(&mut usize) -> bool
,
1055 validate_flags
: usize,
1057 let mut spinwait
= SpinWait
::new();
1058 let mut state
= self.state
.load(Ordering
::Relaxed
);
1060 // Attempt to grab the lock
1061 if try_lock(&mut state
) {
1065 // If there are no parked threads, try spinning a few times.
1066 if state
& (PARKED_BIT
| WRITER_PARKED_BIT
) == 0 && spinwait
.spin() {
1067 state
= self.state
.load(Ordering
::Relaxed
);
1071 // Set the parked bit
1072 if state
& PARKED_BIT
== 0 {
1073 if let Err(x
) = self.state
.compare_exchange_weak(
1084 // Park our thread until we are woken up by an unlock
1085 let addr
= self as *const _
as usize;
1087 let state
= self.state
.load(Ordering
::Relaxed
);
1088 state
& PARKED_BIT
!= 0 && (state
& validate_flags
!= 0)
1090 let before_sleep
= || {}
;
1091 let timed_out
= |_
, was_last_thread
| {
1092 // Clear the parked bit if we were the last parked thread
1093 if was_last_thread
{
1094 self.state
.fetch_and(!PARKED_BIT
, Ordering
::Relaxed
);
1099 // * `addr` is an address we control.
1100 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1101 // * `before_sleep` does not call `park`, nor does it panic.
1102 let park_result
= unsafe {
1103 parking_lot_core
::park(addr
, validate
, before_sleep
, timed_out
, token
, timeout
)
1106 // The thread that unparked us passed the lock on to us
1107 // directly without unlocking it.
1108 ParkResult
::Unparked(TOKEN_HANDOFF
) => return true,
1110 // We were unparked normally, try acquiring the lock again
1111 ParkResult
::Unparked(_
) => (),
1113 // The validation function failed, try locking again
1114 ParkResult
::Invalid
=> (),
1117 ParkResult
::TimedOut
=> return false,
1120 // Loop back and try locking again
1122 state
= self.state
.load(Ordering
::Relaxed
);
1127 fn deadlock_acquire(&self) {
1128 unsafe { deadlock::acquire_resource(self as *const _ as usize) }
;
1129 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) }
;
1133 fn deadlock_release(&self) {
1134 unsafe { deadlock::release_resource(self as *const _ as usize) }
;
1135 unsafe { deadlock::release_resource(self as *const _ as usize + 1) }
;