]> git.proxmox.com Git - rustc.git/blob - vendor/parking_lot/src/raw_rwlock.rs
New upstream version 1.44.1+dfsg1
[rustc.git] / vendor / parking_lot / src / raw_rwlock.rs
1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7
8 use crate::elision::{have_elision, AtomicElisionExt};
9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10 use crate::util;
11 use core::{
12 cell::Cell,
13 sync::atomic::{AtomicUsize, Ordering},
14 };
15 use lock_api::{GuardNoSend, RawRwLock as RawRwLock_, RawRwLockUpgrade};
16 use parking_lot_core::{
17 self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18 };
19 use std::time::{Duration, Instant};
20
21 // This reader-writer lock implementation is based on Boost's upgrade_mutex:
22 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
23 //
24 // This implementation uses 2 wait queues, one at key [addr] and one at key
25 // [addr + 1]. The primary queue is used for all new waiting threads, and the
26 // secondary queue is used by the thread which has acquired WRITER_BIT but is
27 // waiting for the remaining readers to exit the lock.
28 //
29 // This implementation is fair between readers and writers since it uses the
30 // order in which threads first started queuing to alternate between read phases
31 // and write phases. In particular is it not vulnerable to write starvation
32 // since readers will block if there is a pending writer.
33
34 // There is at least one thread in the main queue.
35 const PARKED_BIT: usize = 0b0001;
36 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
37 const WRITER_PARKED_BIT: usize = 0b0010;
38 // A reader is holding an upgradable lock. The reader count must be non-zero and
39 // WRITER_BIT must not be set.
40 const UPGRADABLE_BIT: usize = 0b0100;
41 // If the reader count is zero: a writer is currently holding an exclusive lock.
42 // Otherwise: a writer is waiting for the remaining readers to exit the lock.
43 const WRITER_BIT: usize = 0b1000;
44 // Mask of bits used to count readers.
45 const READERS_MASK: usize = !0b1111;
46 // Base unit for counting readers.
47 const ONE_READER: usize = 0b10000;
48
49 // Token indicating what type of lock a queued thread is trying to acquire
50 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53
54 /// Raw reader-writer lock type backed by the parking lot.
55 pub struct RawRwLock {
56 state: AtomicUsize,
57 }
58
59 unsafe impl lock_api::RawRwLock for RawRwLock {
60 const INIT: RawRwLock = RawRwLock {
61 state: AtomicUsize::new(0),
62 };
63
64 type GuardMarker = GuardNoSend;
65
66 #[inline]
67 fn lock_exclusive(&self) {
68 if self
69 .state
70 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71 .is_err()
72 {
73 let result = self.lock_exclusive_slow(None);
74 debug_assert!(result);
75 }
76 self.deadlock_acquire();
77 }
78
79 #[inline]
80 fn try_lock_exclusive(&self) -> bool {
81 if self
82 .state
83 .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84 .is_ok()
85 {
86 self.deadlock_acquire();
87 true
88 } else {
89 false
90 }
91 }
92
93 #[inline]
94 fn unlock_exclusive(&self) {
95 self.deadlock_release();
96 if self
97 .state
98 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99 .is_ok()
100 {
101 return;
102 }
103 self.unlock_exclusive_slow(false);
104 }
105
106 #[inline]
107 fn lock_shared(&self) {
108 if !self.try_lock_shared_fast(false) {
109 let result = self.lock_shared_slow(false, None);
110 debug_assert!(result);
111 }
112 self.deadlock_acquire();
113 }
114
115 #[inline]
116 fn try_lock_shared(&self) -> bool {
117 let result = if self.try_lock_shared_fast(false) {
118 true
119 } else {
120 self.try_lock_shared_slow(false)
121 };
122 if result {
123 self.deadlock_acquire();
124 }
125 result
126 }
127
128 #[inline]
129 fn unlock_shared(&self) {
130 self.deadlock_release();
131 let state = if have_elision() {
132 self.state.elision_fetch_sub_release(ONE_READER)
133 } else {
134 self.state.fetch_sub(ONE_READER, Ordering::Release)
135 };
136 if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137 self.unlock_shared_slow();
138 }
139 }
140 }
141
142 unsafe impl lock_api::RawRwLockFair for RawRwLock {
143 #[inline]
144 fn unlock_shared_fair(&self) {
145 // Shared unlocking is always fair in this implementation.
146 self.unlock_shared();
147 }
148
149 #[inline]
150 fn unlock_exclusive_fair(&self) {
151 self.deadlock_release();
152 if self
153 .state
154 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
155 .is_ok()
156 {
157 return;
158 }
159 self.unlock_exclusive_slow(true);
160 }
161
162 #[inline]
163 fn bump_shared(&self) {
164 if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
165 == ONE_READER | WRITER_BIT
166 {
167 self.bump_shared_slow();
168 }
169 }
170
171 #[inline]
172 fn bump_exclusive(&self) {
173 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
174 self.bump_exclusive_slow();
175 }
176 }
177 }
178
179 unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
180 #[inline]
181 fn downgrade(&self) {
182 let state = self
183 .state
184 .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
185
186 // Wake up parked shared and upgradable threads if there are any
187 if state & PARKED_BIT != 0 {
188 self.downgrade_slow();
189 }
190 }
191 }
192
193 unsafe impl lock_api::RawRwLockTimed for RawRwLock {
194 type Duration = Duration;
195 type Instant = Instant;
196
197 #[inline]
198 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
199 let result = if self.try_lock_shared_fast(false) {
200 true
201 } else {
202 self.lock_shared_slow(false, util::to_deadline(timeout))
203 };
204 if result {
205 self.deadlock_acquire();
206 }
207 result
208 }
209
210 #[inline]
211 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
212 let result = if self.try_lock_shared_fast(false) {
213 true
214 } else {
215 self.lock_shared_slow(false, Some(timeout))
216 };
217 if result {
218 self.deadlock_acquire();
219 }
220 result
221 }
222
223 #[inline]
224 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
225 let result = if self
226 .state
227 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
228 .is_ok()
229 {
230 true
231 } else {
232 self.lock_exclusive_slow(util::to_deadline(timeout))
233 };
234 if result {
235 self.deadlock_acquire();
236 }
237 result
238 }
239
240 #[inline]
241 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
242 let result = if self
243 .state
244 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
245 .is_ok()
246 {
247 true
248 } else {
249 self.lock_exclusive_slow(Some(timeout))
250 };
251 if result {
252 self.deadlock_acquire();
253 }
254 result
255 }
256 }
257
258 unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
259 #[inline]
260 fn lock_shared_recursive(&self) {
261 if !self.try_lock_shared_fast(true) {
262 let result = self.lock_shared_slow(true, None);
263 debug_assert!(result);
264 }
265 self.deadlock_acquire();
266 }
267
268 #[inline]
269 fn try_lock_shared_recursive(&self) -> bool {
270 let result = if self.try_lock_shared_fast(true) {
271 true
272 } else {
273 self.try_lock_shared_slow(true)
274 };
275 if result {
276 self.deadlock_acquire();
277 }
278 result
279 }
280 }
281
282 unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
283 #[inline]
284 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
285 let result = if self.try_lock_shared_fast(true) {
286 true
287 } else {
288 self.lock_shared_slow(true, util::to_deadline(timeout))
289 };
290 if result {
291 self.deadlock_acquire();
292 }
293 result
294 }
295
296 #[inline]
297 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
298 let result = if self.try_lock_shared_fast(true) {
299 true
300 } else {
301 self.lock_shared_slow(true, Some(timeout))
302 };
303 if result {
304 self.deadlock_acquire();
305 }
306 result
307 }
308 }
309
310 unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
311 #[inline]
312 fn lock_upgradable(&self) {
313 if !self.try_lock_upgradable_fast() {
314 let result = self.lock_upgradable_slow(None);
315 debug_assert!(result);
316 }
317 self.deadlock_acquire();
318 }
319
320 #[inline]
321 fn try_lock_upgradable(&self) -> bool {
322 let result = if self.try_lock_upgradable_fast() {
323 true
324 } else {
325 self.try_lock_upgradable_slow()
326 };
327 if result {
328 self.deadlock_acquire();
329 }
330 result
331 }
332
333 #[inline]
334 fn unlock_upgradable(&self) {
335 self.deadlock_release();
336 let state = self.state.load(Ordering::Relaxed);
337 if state & PARKED_BIT == 0 {
338 if self
339 .state
340 .compare_exchange_weak(
341 state,
342 state - (ONE_READER | UPGRADABLE_BIT),
343 Ordering::Release,
344 Ordering::Relaxed,
345 )
346 .is_ok()
347 {
348 return;
349 }
350 }
351 self.unlock_upgradable_slow(false);
352 }
353
354 #[inline]
355 fn upgrade(&self) {
356 let state = self.state.fetch_sub(
357 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
358 Ordering::Relaxed,
359 );
360 if state & READERS_MASK != ONE_READER {
361 let result = self.upgrade_slow(None);
362 debug_assert!(result);
363 }
364 }
365
366 #[inline]
367 fn try_upgrade(&self) -> bool {
368 if self
369 .state
370 .compare_exchange_weak(
371 ONE_READER | UPGRADABLE_BIT,
372 WRITER_BIT,
373 Ordering::Relaxed,
374 Ordering::Relaxed,
375 )
376 .is_ok()
377 {
378 true
379 } else {
380 self.try_upgrade_slow()
381 }
382 }
383 }
384
385 unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
386 #[inline]
387 fn unlock_upgradable_fair(&self) {
388 self.deadlock_release();
389 let state = self.state.load(Ordering::Relaxed);
390 if state & PARKED_BIT == 0 {
391 if self
392 .state
393 .compare_exchange_weak(
394 state,
395 state - (ONE_READER | UPGRADABLE_BIT),
396 Ordering::Release,
397 Ordering::Relaxed,
398 )
399 .is_ok()
400 {
401 return;
402 }
403 }
404 self.unlock_upgradable_slow(false);
405 }
406
407 #[inline]
408 fn bump_upgradable(&self) {
409 if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
410 self.bump_upgradable_slow();
411 }
412 }
413 }
414
415 unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
416 #[inline]
417 fn downgrade_upgradable(&self) {
418 let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
419
420 // Wake up parked upgradable threads if there are any
421 if state & PARKED_BIT != 0 {
422 self.downgrade_slow();
423 }
424 }
425
426 #[inline]
427 fn downgrade_to_upgradable(&self) {
428 let state = self.state.fetch_add(
429 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
430 Ordering::Release,
431 );
432
433 // Wake up parked shared threads if there are any
434 if state & PARKED_BIT != 0 {
435 self.downgrade_to_upgradable_slow();
436 }
437 }
438 }
439
440 unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
441 #[inline]
442 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
443 let result = if self.try_lock_upgradable_fast() {
444 true
445 } else {
446 self.lock_upgradable_slow(Some(timeout))
447 };
448 if result {
449 self.deadlock_acquire();
450 }
451 result
452 }
453
454 #[inline]
455 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
456 let result = if self.try_lock_upgradable_fast() {
457 true
458 } else {
459 self.lock_upgradable_slow(util::to_deadline(timeout))
460 };
461 if result {
462 self.deadlock_acquire();
463 }
464 result
465 }
466
467 #[inline]
468 fn try_upgrade_until(&self, timeout: Instant) -> bool {
469 let state = self.state.fetch_sub(
470 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
471 Ordering::Relaxed,
472 );
473 if state & READERS_MASK == ONE_READER {
474 true
475 } else {
476 self.upgrade_slow(Some(timeout))
477 }
478 }
479
480 #[inline]
481 fn try_upgrade_for(&self, timeout: Duration) -> bool {
482 let state = self.state.fetch_sub(
483 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
484 Ordering::Relaxed,
485 );
486 if state & READERS_MASK == ONE_READER {
487 true
488 } else {
489 self.upgrade_slow(util::to_deadline(timeout))
490 }
491 }
492 }
493
494 impl RawRwLock {
495 #[inline(always)]
496 fn try_lock_shared_fast(&self, recursive: bool) -> bool {
497 let state = self.state.load(Ordering::Relaxed);
498
499 // We can't allow grabbing a shared lock if there is a writer, even if
500 // the writer is still waiting for the remaining readers to exit.
501 if state & WRITER_BIT != 0 {
502 // To allow recursive locks, we make an exception and allow readers
503 // to skip ahead of a pending writer to avoid deadlocking, at the
504 // cost of breaking the fairness guarantees.
505 if !recursive || state & READERS_MASK == 0 {
506 return false;
507 }
508 }
509
510 // Use hardware lock elision to avoid cache conflicts when multiple
511 // readers try to acquire the lock. We only do this if the lock is
512 // completely empty since elision handles conflicts poorly.
513 if have_elision() && state == 0 {
514 self.state
515 .elision_compare_exchange_acquire(0, ONE_READER)
516 .is_ok()
517 } else if let Some(new_state) = state.checked_add(ONE_READER) {
518 self.state
519 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
520 .is_ok()
521 } else {
522 false
523 }
524 }
525
526 #[cold]
527 fn try_lock_shared_slow(&self, recursive: bool) -> bool {
528 let mut state = self.state.load(Ordering::Relaxed);
529 loop {
530 // This mirrors the condition in try_lock_shared_fast
531 if state & WRITER_BIT != 0 {
532 if !recursive || state & READERS_MASK == 0 {
533 return false;
534 }
535 }
536 if have_elision() && state == 0 {
537 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
538 Ok(_) => return true,
539 Err(x) => state = x,
540 }
541 } else {
542 match self.state.compare_exchange_weak(
543 state,
544 state
545 .checked_add(ONE_READER)
546 .expect("RwLock reader count overflow"),
547 Ordering::Acquire,
548 Ordering::Relaxed,
549 ) {
550 Ok(_) => return true,
551 Err(x) => state = x,
552 }
553 }
554 }
555 }
556
557 #[inline(always)]
558 fn try_lock_upgradable_fast(&self) -> bool {
559 let state = self.state.load(Ordering::Relaxed);
560
561 // We can't grab an upgradable lock if there is already a writer or
562 // upgradable reader.
563 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
564 return false;
565 }
566
567 if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
568 self.state
569 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
570 .is_ok()
571 } else {
572 false
573 }
574 }
575
576 #[cold]
577 fn try_lock_upgradable_slow(&self) -> bool {
578 let mut state = self.state.load(Ordering::Relaxed);
579 loop {
580 // This mirrors the condition in try_lock_upgradable_fast
581 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
582 return false;
583 }
584
585 match self.state.compare_exchange_weak(
586 state,
587 state
588 .checked_add(ONE_READER | UPGRADABLE_BIT)
589 .expect("RwLock reader count overflow"),
590 Ordering::Acquire,
591 Ordering::Relaxed,
592 ) {
593 Ok(_) => return true,
594 Err(x) => state = x,
595 }
596 }
597 }
598
599 #[cold]
600 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
601 let try_lock = |state: &mut usize| {
602 loop {
603 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
604 return false;
605 }
606
607 // Grab WRITER_BIT if it isn't set, even if there are parked threads.
608 match self.state.compare_exchange_weak(
609 *state,
610 *state | WRITER_BIT,
611 Ordering::Acquire,
612 Ordering::Relaxed,
613 ) {
614 Ok(_) => return true,
615 Err(x) => *state = x,
616 }
617 }
618 };
619
620 // Step 1: grab exclusive ownership of WRITER_BIT
621 let timed_out = !self.lock_common(
622 timeout,
623 TOKEN_EXCLUSIVE,
624 try_lock,
625 WRITER_BIT | UPGRADABLE_BIT,
626 );
627 if timed_out {
628 return false;
629 }
630
631 // Step 2: wait for all remaining readers to exit the lock.
632 self.wait_for_readers(timeout, 0)
633 }
634
635 #[cold]
636 fn unlock_exclusive_slow(&self, force_fair: bool) {
637 // There are threads to unpark. Try to unpark as many as we can.
638 let callback = |mut new_state, result: UnparkResult| {
639 // If we are using a fair unlock then we should keep the
640 // rwlock locked and hand it off to the unparked threads.
641 if result.unparked_threads != 0 && (force_fair || result.be_fair) {
642 if result.have_more_threads {
643 new_state |= PARKED_BIT;
644 }
645 self.state.store(new_state, Ordering::Release);
646 TOKEN_HANDOFF
647 } else {
648 // Clear the parked bit if there are no more parked threads.
649 if result.have_more_threads {
650 self.state.store(PARKED_BIT, Ordering::Release);
651 } else {
652 self.state.store(0, Ordering::Release);
653 }
654 TOKEN_NORMAL
655 }
656 };
657 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
658 unsafe {
659 self.wake_parked_threads(0, callback);
660 }
661 }
662
663 #[cold]
664 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
665 let try_lock = |state: &mut usize| {
666 let mut spinwait_shared = SpinWait::new();
667 loop {
668 // Use hardware lock elision to avoid cache conflicts when multiple
669 // readers try to acquire the lock. We only do this if the lock is
670 // completely empty since elision handles conflicts poorly.
671 if have_elision() && *state == 0 {
672 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
673 Ok(_) => return true,
674 Err(x) => *state = x,
675 }
676 }
677
678 // This is the same condition as try_lock_shared_fast
679 if *state & WRITER_BIT != 0 {
680 if !recursive || *state & READERS_MASK == 0 {
681 return false;
682 }
683 }
684
685 if self
686 .state
687 .compare_exchange_weak(
688 *state,
689 state
690 .checked_add(ONE_READER)
691 .expect("RwLock reader count overflow"),
692 Ordering::Acquire,
693 Ordering::Relaxed,
694 )
695 .is_ok()
696 {
697 return true;
698 }
699
700 // If there is high contention on the reader count then we want
701 // to leave some time between attempts to acquire the lock to
702 // let other threads make progress.
703 spinwait_shared.spin_no_yield();
704 *state = self.state.load(Ordering::Relaxed);
705 }
706 };
707 self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
708 }
709
710 #[cold]
711 fn unlock_shared_slow(&self) {
712 // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
713 // just need to wake up a potentially sleeping pending writer.
714 // Using the 2nd key at addr + 1
715 let addr = self as *const _ as usize + 1;
716 let callback = |_result: UnparkResult| {
717 // Clear the WRITER_PARKED_BIT here since there can only be one
718 // parked writer thread.
719 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
720 TOKEN_NORMAL
721 };
722 // SAFETY:
723 // * `addr` is an address we control.
724 // * `callback` does not panic or call into any function of `parking_lot`.
725 unsafe {
726 parking_lot_core::unpark_one(addr, callback);
727 }
728 }
729
730 #[cold]
731 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
732 let try_lock = |state: &mut usize| {
733 let mut spinwait_shared = SpinWait::new();
734 loop {
735 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
736 return false;
737 }
738
739 if self
740 .state
741 .compare_exchange_weak(
742 *state,
743 state
744 .checked_add(ONE_READER | UPGRADABLE_BIT)
745 .expect("RwLock reader count overflow"),
746 Ordering::Acquire,
747 Ordering::Relaxed,
748 )
749 .is_ok()
750 {
751 return true;
752 }
753
754 // If there is high contention on the reader count then we want
755 // to leave some time between attempts to acquire the lock to
756 // let other threads make progress.
757 spinwait_shared.spin_no_yield();
758 *state = self.state.load(Ordering::Relaxed);
759 }
760 };
761 self.lock_common(
762 timeout,
763 TOKEN_UPGRADABLE,
764 try_lock,
765 WRITER_BIT | UPGRADABLE_BIT,
766 )
767 }
768
769 #[cold]
770 fn unlock_upgradable_slow(&self, force_fair: bool) {
771 // Just release the lock if there are no parked threads.
772 let mut state = self.state.load(Ordering::Relaxed);
773 while state & PARKED_BIT == 0 {
774 match self.state.compare_exchange_weak(
775 state,
776 state - (ONE_READER | UPGRADABLE_BIT),
777 Ordering::Release,
778 Ordering::Relaxed,
779 ) {
780 Ok(_) => return,
781 Err(x) => state = x,
782 }
783 }
784
785 // There are threads to unpark. Try to unpark as many as we can.
786 let callback = |new_state, result: UnparkResult| {
787 // If we are using a fair unlock then we should keep the
788 // rwlock locked and hand it off to the unparked threads.
789 let mut state = self.state.load(Ordering::Relaxed);
790 if force_fair || result.be_fair {
791 // Fall back to normal unpark on overflow. Panicking is
792 // not allowed in parking_lot callbacks.
793 while let Some(mut new_state) =
794 (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
795 {
796 if result.have_more_threads {
797 new_state |= PARKED_BIT;
798 } else {
799 new_state &= !PARKED_BIT;
800 }
801 match self.state.compare_exchange_weak(
802 state,
803 new_state,
804 Ordering::Relaxed,
805 Ordering::Relaxed,
806 ) {
807 Ok(_) => return TOKEN_HANDOFF,
808 Err(x) => state = x,
809 }
810 }
811 }
812
813 // Otherwise just release the upgradable lock and update PARKED_BIT.
814 loop {
815 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
816 if result.have_more_threads {
817 new_state |= PARKED_BIT;
818 } else {
819 new_state &= !PARKED_BIT;
820 }
821 match self.state.compare_exchange_weak(
822 state,
823 new_state,
824 Ordering::Relaxed,
825 Ordering::Relaxed,
826 ) {
827 Ok(_) => return TOKEN_NORMAL,
828 Err(x) => state = x,
829 }
830 }
831 };
832 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
833 unsafe {
834 self.wake_parked_threads(0, callback);
835 }
836 }
837
838 #[cold]
839 fn try_upgrade_slow(&self) -> bool {
840 let mut state = self.state.load(Ordering::Relaxed);
841 loop {
842 if state & READERS_MASK != ONE_READER {
843 return false;
844 }
845 match self.state.compare_exchange_weak(
846 state,
847 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
848 Ordering::Relaxed,
849 Ordering::Relaxed,
850 ) {
851 Ok(_) => return true,
852 Err(x) => state = x,
853 }
854 }
855 }
856
857 #[cold]
858 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
859 self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
860 }
861
862 #[cold]
863 fn downgrade_slow(&self) {
864 // We only reach this point if PARKED_BIT is set.
865 let callback = |_, result: UnparkResult| {
866 // Clear the parked bit if there no more parked threads
867 if !result.have_more_threads {
868 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
869 }
870 TOKEN_NORMAL
871 };
872 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
873 unsafe {
874 self.wake_parked_threads(ONE_READER, callback);
875 }
876 }
877
878 #[cold]
879 fn downgrade_to_upgradable_slow(&self) {
880 // We only reach this point if PARKED_BIT is set.
881 let callback = |_, result: UnparkResult| {
882 // Clear the parked bit if there no more parked threads
883 if !result.have_more_threads {
884 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
885 }
886 TOKEN_NORMAL
887 };
888 // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
889 unsafe {
890 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
891 }
892 }
893
894 #[cold]
895 fn bump_shared_slow(&self) {
896 self.unlock_shared();
897 self.lock_shared();
898 }
899
900 #[cold]
901 fn bump_exclusive_slow(&self) {
902 self.deadlock_release();
903 self.unlock_exclusive_slow(true);
904 self.lock_exclusive();
905 }
906
907 #[cold]
908 fn bump_upgradable_slow(&self) {
909 self.deadlock_release();
910 self.unlock_upgradable_slow(true);
911 self.lock_upgradable();
912 }
913
914 /// Common code for waking up parked threads after releasing WRITER_BIT or
915 /// UPGRADABLE_BIT.
916 ///
917 /// # Safety
918 ///
919 /// `callback` must uphold the requirements of the `callback` parameter to
920 /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in
921 /// `parking_lot`.
922 #[inline]
923 unsafe fn wake_parked_threads(
924 &self,
925 new_state: usize,
926 callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
927 ) {
928 // We must wake up at least one upgrader or writer if there is one,
929 // otherwise they may end up parked indefinitely since unlock_shared
930 // does not call wake_parked_threads.
931 let new_state = Cell::new(new_state);
932 let addr = self as *const _ as usize;
933 let filter = |ParkToken(token)| {
934 let s = new_state.get();
935
936 // If we are waking up a writer, don't wake anything else.
937 if s & WRITER_BIT != 0 {
938 return FilterOp::Stop;
939 }
940
941 // Otherwise wake *all* readers and one upgrader/writer.
942 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
943 // Skip writers and upgradable readers if we already have
944 // a writer/upgradable reader.
945 FilterOp::Skip
946 } else {
947 new_state.set(s + token);
948 FilterOp::Unpark
949 }
950 };
951 let callback = |result| callback(new_state.get(), result);
952 // SAFETY:
953 // * `addr` is an address we control.
954 // * `filter` does not panic or call into any function of `parking_lot`.
955 // * `callback` safety responsibility is on caller
956 parking_lot_core::unpark_filter(addr, filter, callback);
957 }
958
959 // Common code for waiting for readers to exit the lock after acquiring
960 // WRITER_BIT.
961 #[inline]
962 fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
963 // At this point WRITER_BIT is already set, we just need to wait for the
964 // remaining readers to exit the lock.
965 let mut spinwait = SpinWait::new();
966 let mut state = self.state.load(Ordering::Relaxed);
967 while state & READERS_MASK != 0 {
968 // Spin a few times to wait for readers to exit
969 if spinwait.spin() {
970 state = self.state.load(Ordering::Relaxed);
971 continue;
972 }
973
974 // Set the parked bit
975 if state & WRITER_PARKED_BIT == 0 {
976 if let Err(x) = self.state.compare_exchange_weak(
977 state,
978 state | WRITER_PARKED_BIT,
979 Ordering::Relaxed,
980 Ordering::Relaxed,
981 ) {
982 state = x;
983 continue;
984 }
985 }
986
987 // Park our thread until we are woken up by an unlock
988 // Using the 2nd key at addr + 1
989 let addr = self as *const _ as usize + 1;
990 let validate = || {
991 let state = self.state.load(Ordering::Relaxed);
992 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
993 };
994 let before_sleep = || {};
995 let timed_out = |_, _| {};
996 // SAFETY:
997 // * `addr` is an address we control.
998 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
999 // * `before_sleep` does not call `park`, nor does it panic.
1000 let park_result = unsafe {
1001 parking_lot_core::park(
1002 addr,
1003 validate,
1004 before_sleep,
1005 timed_out,
1006 TOKEN_EXCLUSIVE,
1007 timeout,
1008 )
1009 };
1010 match park_result {
1011 // We still need to re-check the state if we are unparked
1012 // since a previous writer timing-out could have allowed
1013 // another reader to sneak in before we parked.
1014 ParkResult::Unparked(_) | ParkResult::Invalid => {
1015 state = self.state.load(Ordering::Relaxed);
1016 continue;
1017 }
1018
1019 // Timeout expired
1020 ParkResult::TimedOut => {
1021 // We need to release WRITER_BIT and revert back to
1022 // our previous value. We also wake up any threads that
1023 // might be waiting on WRITER_BIT.
1024 let state = self.state.fetch_add(
1025 prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
1026 Ordering::Relaxed,
1027 );
1028 if state & PARKED_BIT != 0 {
1029 let callback = |_, result: UnparkResult| {
1030 // Clear the parked bit if there no more parked threads
1031 if !result.have_more_threads {
1032 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1033 }
1034 TOKEN_NORMAL
1035 };
1036 // SAFETY: `callback` does not panic or call any function of `parking_lot`.
1037 unsafe {
1038 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
1039 }
1040 }
1041 return false;
1042 }
1043 }
1044 }
1045 true
1046 }
1047
1048 /// Common code for acquiring a lock
1049 #[inline]
1050 fn lock_common(
1051 &self,
1052 timeout: Option<Instant>,
1053 token: ParkToken,
1054 mut try_lock: impl FnMut(&mut usize) -> bool,
1055 validate_flags: usize,
1056 ) -> bool {
1057 let mut spinwait = SpinWait::new();
1058 let mut state = self.state.load(Ordering::Relaxed);
1059 loop {
1060 // Attempt to grab the lock
1061 if try_lock(&mut state) {
1062 return true;
1063 }
1064
1065 // If there are no parked threads, try spinning a few times.
1066 if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1067 state = self.state.load(Ordering::Relaxed);
1068 continue;
1069 }
1070
1071 // Set the parked bit
1072 if state & PARKED_BIT == 0 {
1073 if let Err(x) = self.state.compare_exchange_weak(
1074 state,
1075 state | PARKED_BIT,
1076 Ordering::Relaxed,
1077 Ordering::Relaxed,
1078 ) {
1079 state = x;
1080 continue;
1081 }
1082 }
1083
1084 // Park our thread until we are woken up by an unlock
1085 let addr = self as *const _ as usize;
1086 let validate = || {
1087 let state = self.state.load(Ordering::Relaxed);
1088 state & PARKED_BIT != 0 && (state & validate_flags != 0)
1089 };
1090 let before_sleep = || {};
1091 let timed_out = |_, was_last_thread| {
1092 // Clear the parked bit if we were the last parked thread
1093 if was_last_thread {
1094 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1095 }
1096 };
1097
1098 // SAFETY:
1099 // * `addr` is an address we control.
1100 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1101 // * `before_sleep` does not call `park`, nor does it panic.
1102 let park_result = unsafe {
1103 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1104 };
1105 match park_result {
1106 // The thread that unparked us passed the lock on to us
1107 // directly without unlocking it.
1108 ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1109
1110 // We were unparked normally, try acquiring the lock again
1111 ParkResult::Unparked(_) => (),
1112
1113 // The validation function failed, try locking again
1114 ParkResult::Invalid => (),
1115
1116 // Timeout expired
1117 ParkResult::TimedOut => return false,
1118 }
1119
1120 // Loop back and try locking again
1121 spinwait.reset();
1122 state = self.state.load(Ordering::Relaxed);
1123 }
1124 }
1125
1126 #[inline]
1127 fn deadlock_acquire(&self) {
1128 unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1129 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1130 }
1131
1132 #[inline]
1133 fn deadlock_release(&self) {
1134 unsafe { deadlock::release_resource(self as *const _ as usize) };
1135 unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1136 }
1137 }