1 // Manages a single participant in the epoch scheme. This is where all
2 // of the actual epoch management logic happens!
5 use std
::cell
::UnsafeCell
;
7 use std
::sync
::atomic
::{self, AtomicUsize, AtomicBool}
;
8 use std
::sync
::atomic
::Ordering
::{Relaxed, Acquire, Release, SeqCst}
;
10 use mem
::epoch
::{Atomic, Guard, garbage, global}
;
11 use mem
::epoch
::participants
::ParticipantNode
;
13 /// Thread-local data for epoch participation.
14 pub struct Participant
{
18 /// Number of pending uses of `epoch::pin()`; keeping a count allows for
19 /// reentrant use of epoch management.
20 in_critical
: AtomicUsize
,
22 /// Thread-local garbage tracking
23 garbage
: UnsafeCell
<garbage
::Local
>,
25 /// Is the thread still active? Becomes `false` when the thread exits. This
26 /// is ultimately used to free `Participant` records.
27 pub active
: AtomicBool
,
29 /// The participant list is coded intrusively; here's the `next` pointer.
30 pub next
: Atomic
<ParticipantNode
>,
33 impl fmt
::Debug
for Participant
{
34 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
35 write
!(f
, "Participant {{ ... }}")
39 unsafe impl Sync
for Participant {}
41 const GC_THRESH
: usize = 32;
44 pub fn new() -> Participant
{
46 epoch
: AtomicUsize
::new(0),
47 in_critical
: AtomicUsize
::new(0),
48 active
: AtomicBool
::new(true),
49 garbage
: UnsafeCell
::new(garbage
::Local
::new()),
54 /// Enter a critical section.
56 /// This method is reentrant, allowing for nested critical sections.
58 /// Returns `true` is this is the first entry on the stack (as opposed to a
60 pub fn enter(&self) -> bool
{
61 let new_count
= self.in_critical
.load(Relaxed
) + 1;
62 self.in_critical
.store(new_count
, Relaxed
);
63 if new_count
> 1 { return false }
65 atomic
::fence(SeqCst
);
67 let global_epoch
= global
::get().epoch
.load(Relaxed
);
68 if global_epoch
!= self.epoch
.load(Relaxed
) {
69 self.epoch
.store(global_epoch
, Relaxed
);
70 unsafe { (*self.garbage.get()).collect(); }
76 /// Exit the current (nested) critical section.
78 let new_count
= self.in_critical
.load(Relaxed
) - 1;
79 self.in_critical
.store(
81 if new_count
> 0 { Relaxed }
else { Release }
);
84 /// Begin the reclamation process for a piece of data.
85 pub unsafe fn reclaim
<T
>(&self, data
: *mut T
) {
86 (*self.garbage
.get()).insert(data
);
89 /// Attempt to collect garbage by moving the global epoch forward.
91 /// Returns `true` on success.
92 pub fn try_collect(&self, guard
: &Guard
) -> bool
{
93 let cur_epoch
= global
::get().epoch
.load(SeqCst
);
95 for p
in global
::get().participants
.iter(guard
) {
96 if p
.in_critical
.load(Relaxed
) > 0 && p
.epoch
.load(Relaxed
) != cur_epoch
{
101 let new_epoch
= cur_epoch
.wrapping_add(1);
102 atomic
::fence(Acquire
);
103 if global
::get().epoch
.compare_and_swap(cur_epoch
, new_epoch
, SeqCst
) != cur_epoch
{
108 (*self.garbage
.get()).collect();
109 global
::get().garbage
[new_epoch
.wrapping_add(1) % 3].collect();
111 self.epoch
.store(new_epoch
, Release
);
116 /// Move the current thread-local garbage into the global garbage bags.
117 pub fn migrate_garbage(&self) {
118 let cur_epoch
= self.epoch
.load(Relaxed
);
119 let local
= unsafe { mem::replace(&mut *self.garbage.get(), garbage::Local::new()) }
;
120 global
::get().garbage
[cur_epoch
.wrapping_sub(1) % 3].insert(local
.old
);
121 global
::get().garbage
[cur_epoch
% 3].insert(local
.cur
);
122 global
::get().garbage
[global
::get().epoch
.load(Relaxed
) % 3].insert(local
.new
);
125 /// How much garbage is this participant currently storing?
126 pub fn garbage_size(&self) -> usize {
127 unsafe { (*self.garbage.get()).size() }
129 /// Is this participant past its local GC threshhold?
130 pub fn should_gc(&self) -> bool
{
131 self.garbage_size() >= GC_THRESH