]>
Commit | Line | Data |
---|---|---|
416331ca XL |
1 | //! The global data and participant for garbage collection. |
2 | //! | |
3 | //! # Registration | |
4 | //! | |
5 | //! In order to track all participants in one place, we need some form of participant | |
6 | //! registration. When a participant is created, it is registered to a global lock-free | |
7 | //! singly-linked list of registries; and when a participant is leaving, it is unregistered from the | |
8 | //! list. | |
9 | //! | |
10 | //! # Pinning | |
11 | //! | |
12 | //! Every participant contains an integer that tells whether the participant is pinned and if so, | |
13 | //! what was the global epoch at the time it was pinned. Participants also hold a pin counter that | |
14 | //! aids in periodic global epoch advancement. | |
15 | //! | |
16 | //! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned. | |
17 | //! Guards are necessary for performing atomic operations, and for freeing/dropping locations. | |
18 | //! | |
19 | //! # Thread-local bag | |
20 | //! | |
21 | //! Objects that get unlinked from concurrent data structures must be stashed away until the global | |
22 | //! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects | |
23 | //! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current | |
24 | //! global epoch and pushed into the global queue of bags. We store objects in thread-local storages | |
25 | //! for amortizing the synchronization cost of pushing the garbages to a global queue. | |
26 | //! | |
27 | //! # Global queue | |
28 | //! | |
29 | //! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and | |
30 | //! destroyed along the way. This design reduces contention on data structures. The global queue | |
31 | //! cannot be explicitly accessed: the only way to interact with it is by calling functions | |
5869c6ff | 32 | //! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers |
416331ca XL |
33 | //! garbage collection. |
34 | //! | |
35 | //! Ideally each instance of concurrent data structure may have its own queue that gets fully | |
36 | //! destroyed as soon as the data structure gets dropped. | |
37 | ||
6a06907d XL |
38 | use crate::primitive::cell::UnsafeCell; |
39 | use crate::primitive::sync::atomic; | |
40 | use core::cell::Cell; | |
416331ca XL |
41 | use core::mem::{self, ManuallyDrop}; |
42 | use core::num::Wrapping; | |
416331ca | 43 | use core::sync::atomic::Ordering; |
f035d41b | 44 | use core::{fmt, ptr}; |
416331ca | 45 | |
416331ca | 46 | use crossbeam_utils::CachePadded; |
5869c6ff | 47 | use memoffset::offset_of; |
416331ca | 48 | |
5869c6ff XL |
49 | use crate::atomic::{Owned, Shared}; |
50 | use crate::collector::{Collector, LocalHandle}; | |
51 | use crate::deferred::Deferred; | |
52 | use crate::epoch::{AtomicEpoch, Epoch}; | |
53 | use crate::guard::{unprotected, Guard}; | |
54 | use crate::sync::list::{Entry, IsElement, IterError, List}; | |
55 | use crate::sync::queue::Queue; | |
416331ca XL |
56 | |
57 | /// Maximum number of objects a bag can contain. | |
6a06907d | 58 | #[cfg(not(crossbeam_sanitize))] |
5869c6ff | 59 | const MAX_OBJECTS: usize = 62; |
6a06907d | 60 | #[cfg(crossbeam_sanitize)] |
416331ca XL |
61 | const MAX_OBJECTS: usize = 4; |
62 | ||
63 | /// A bag of deferred functions. | |
6a06907d | 64 | pub(crate) struct Bag { |
416331ca | 65 | /// Stashed objects. |
f035d41b XL |
66 | deferreds: [Deferred; MAX_OBJECTS], |
67 | len: usize, | |
416331ca XL |
68 | } |
69 | ||
70 | /// `Bag::try_push()` requires that it is safe for another thread to execute the given functions. | |
71 | unsafe impl Send for Bag {} | |
72 | ||
73 | impl Bag { | |
74 | /// Returns a new, empty bag. | |
6a06907d | 75 | pub(crate) fn new() -> Self { |
416331ca XL |
76 | Self::default() |
77 | } | |
78 | ||
79 | /// Returns `true` if the bag is empty. | |
6a06907d | 80 | pub(crate) fn is_empty(&self) -> bool { |
f035d41b | 81 | self.len == 0 |
416331ca XL |
82 | } |
83 | ||
84 | /// Attempts to insert a deferred function into the bag. | |
85 | /// | |
86 | /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is | |
87 | /// full. | |
88 | /// | |
89 | /// # Safety | |
90 | /// | |
91 | /// It should be safe for another thread to execute the given function. | |
6a06907d | 92 | pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> { |
f035d41b XL |
93 | if self.len < MAX_OBJECTS { |
94 | self.deferreds[self.len] = deferred; | |
95 | self.len += 1; | |
96 | Ok(()) | |
97 | } else { | |
98 | Err(deferred) | |
99 | } | |
416331ca XL |
100 | } |
101 | ||
102 | /// Seals the bag with the given epoch. | |
103 | fn seal(self, epoch: Epoch) -> SealedBag { | |
104 | SealedBag { epoch, bag: self } | |
105 | } | |
106 | } | |
107 | ||
f035d41b | 108 | impl Default for Bag { |
5869c6ff | 109 | #[rustfmt::skip] |
f035d41b XL |
110 | fn default() -> Self { |
111 | // TODO: [no_op; MAX_OBJECTS] syntax blocked by https://github.com/rust-lang/rust/issues/49147 | |
6a06907d | 112 | #[cfg(not(crossbeam_sanitize))] |
f035d41b XL |
113 | return Bag { |
114 | len: 0, | |
115 | deferreds: [ | |
116 | Deferred::new(no_op_func), | |
117 | Deferred::new(no_op_func), | |
118 | Deferred::new(no_op_func), | |
119 | Deferred::new(no_op_func), | |
120 | Deferred::new(no_op_func), | |
121 | Deferred::new(no_op_func), | |
122 | Deferred::new(no_op_func), | |
123 | Deferred::new(no_op_func), | |
124 | Deferred::new(no_op_func), | |
125 | Deferred::new(no_op_func), | |
126 | Deferred::new(no_op_func), | |
127 | Deferred::new(no_op_func), | |
128 | Deferred::new(no_op_func), | |
129 | Deferred::new(no_op_func), | |
130 | Deferred::new(no_op_func), | |
131 | Deferred::new(no_op_func), | |
132 | Deferred::new(no_op_func), | |
133 | Deferred::new(no_op_func), | |
134 | Deferred::new(no_op_func), | |
135 | Deferred::new(no_op_func), | |
136 | Deferred::new(no_op_func), | |
137 | Deferred::new(no_op_func), | |
138 | Deferred::new(no_op_func), | |
139 | Deferred::new(no_op_func), | |
140 | Deferred::new(no_op_func), | |
141 | Deferred::new(no_op_func), | |
142 | Deferred::new(no_op_func), | |
143 | Deferred::new(no_op_func), | |
144 | Deferred::new(no_op_func), | |
145 | Deferred::new(no_op_func), | |
146 | Deferred::new(no_op_func), | |
147 | Deferred::new(no_op_func), | |
148 | Deferred::new(no_op_func), | |
149 | Deferred::new(no_op_func), | |
150 | Deferred::new(no_op_func), | |
151 | Deferred::new(no_op_func), | |
152 | Deferred::new(no_op_func), | |
153 | Deferred::new(no_op_func), | |
154 | Deferred::new(no_op_func), | |
155 | Deferred::new(no_op_func), | |
156 | Deferred::new(no_op_func), | |
157 | Deferred::new(no_op_func), | |
158 | Deferred::new(no_op_func), | |
159 | Deferred::new(no_op_func), | |
160 | Deferred::new(no_op_func), | |
161 | Deferred::new(no_op_func), | |
162 | Deferred::new(no_op_func), | |
163 | Deferred::new(no_op_func), | |
164 | Deferred::new(no_op_func), | |
165 | Deferred::new(no_op_func), | |
166 | Deferred::new(no_op_func), | |
167 | Deferred::new(no_op_func), | |
168 | Deferred::new(no_op_func), | |
169 | Deferred::new(no_op_func), | |
170 | Deferred::new(no_op_func), | |
171 | Deferred::new(no_op_func), | |
172 | Deferred::new(no_op_func), | |
173 | Deferred::new(no_op_func), | |
174 | Deferred::new(no_op_func), | |
175 | Deferred::new(no_op_func), | |
176 | Deferred::new(no_op_func), | |
177 | Deferred::new(no_op_func), | |
f035d41b XL |
178 | ], |
179 | }; | |
6a06907d | 180 | #[cfg(crossbeam_sanitize)] |
f035d41b XL |
181 | return Bag { |
182 | len: 0, | |
183 | deferreds: [ | |
184 | Deferred::new(no_op_func), | |
185 | Deferred::new(no_op_func), | |
186 | Deferred::new(no_op_func), | |
187 | Deferred::new(no_op_func), | |
188 | ], | |
189 | }; | |
190 | } | |
191 | } | |
192 | ||
416331ca XL |
193 | impl Drop for Bag { |
194 | fn drop(&mut self) { | |
195 | // Call all deferred functions. | |
f035d41b XL |
196 | for deferred in &mut self.deferreds[..self.len] { |
197 | let no_op = Deferred::new(no_op_func); | |
198 | let owned_deferred = mem::replace(deferred, no_op); | |
199 | owned_deferred.call(); | |
416331ca XL |
200 | } |
201 | } | |
202 | } | |
203 | ||
f035d41b XL |
204 | // can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long |
205 | impl fmt::Debug for Bag { | |
5869c6ff | 206 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
f035d41b XL |
207 | f.debug_struct("Bag") |
208 | .field("deferreds", &&self.deferreds[..self.len]) | |
209 | .finish() | |
210 | } | |
211 | } | |
212 | ||
213 | fn no_op_func() {} | |
214 | ||
416331ca XL |
215 | /// A pair of an epoch and a bag. |
216 | #[derive(Default, Debug)] | |
217 | struct SealedBag { | |
218 | epoch: Epoch, | |
219 | bag: Bag, | |
220 | } | |
221 | ||
222 | /// It is safe to share `SealedBag` because `is_expired` only inspects the epoch. | |
223 | unsafe impl Sync for SealedBag {} | |
224 | ||
225 | impl SealedBag { | |
226 | /// Checks if it is safe to drop the bag w.r.t. the given global epoch. | |
227 | fn is_expired(&self, global_epoch: Epoch) -> bool { | |
228 | // A pinned participant can witness at most one epoch advancement. Therefore, any bag that | |
229 | // is within one epoch of the current one cannot be destroyed yet. | |
230 | global_epoch.wrapping_sub(self.epoch) >= 2 | |
231 | } | |
232 | } | |
233 | ||
234 | /// The global data for a garbage collector. | |
6a06907d | 235 | pub(crate) struct Global { |
416331ca XL |
236 | /// The intrusive linked list of `Local`s. |
237 | locals: List<Local>, | |
238 | ||
239 | /// The global queue of bags of deferred functions. | |
240 | queue: Queue<SealedBag>, | |
241 | ||
242 | /// The global epoch. | |
243 | pub(crate) epoch: CachePadded<AtomicEpoch>, | |
244 | } | |
245 | ||
246 | impl Global { | |
247 | /// Number of bags to destroy. | |
248 | const COLLECT_STEPS: usize = 8; | |
249 | ||
250 | /// Creates a new global data for garbage collection. | |
251 | #[inline] | |
6a06907d | 252 | pub(crate) fn new() -> Self { |
416331ca XL |
253 | Self { |
254 | locals: List::new(), | |
255 | queue: Queue::new(), | |
256 | epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())), | |
257 | } | |
258 | } | |
259 | ||
260 | /// Pushes the bag into the global queue and replaces the bag with a new empty bag. | |
6a06907d | 261 | pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) { |
416331ca XL |
262 | let bag = mem::replace(bag, Bag::new()); |
263 | ||
264 | atomic::fence(Ordering::SeqCst); | |
265 | ||
266 | let epoch = self.epoch.load(Ordering::Relaxed); | |
267 | self.queue.push(bag.seal(epoch), guard); | |
268 | } | |
269 | ||
270 | /// Collects several bags from the global queue and executes deferred functions in them. | |
271 | /// | |
272 | /// Note: This may itself produce garbage and in turn allocate new bags. | |
273 | /// | |
274 | /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold | |
275 | /// path. In other words, we want the compiler to optimize branching for the case when | |
276 | /// `collect()` is not called. | |
277 | #[cold] | |
6a06907d | 278 | pub(crate) fn collect(&self, guard: &Guard) { |
416331ca XL |
279 | let global_epoch = self.try_advance(guard); |
280 | ||
6a06907d | 281 | let steps = if cfg!(crossbeam_sanitize) { |
416331ca XL |
282 | usize::max_value() |
283 | } else { | |
284 | Self::COLLECT_STEPS | |
285 | }; | |
286 | ||
287 | for _ in 0..steps { | |
288 | match self.queue.try_pop_if( | |
289 | &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch), | |
290 | guard, | |
291 | ) { | |
292 | None => break, | |
293 | Some(sealed_bag) => drop(sealed_bag), | |
294 | } | |
295 | } | |
296 | } | |
297 | ||
298 | /// Attempts to advance the global epoch. | |
299 | /// | |
300 | /// The global epoch can advance only if all currently pinned participants have been pinned in | |
301 | /// the current epoch. | |
302 | /// | |
303 | /// Returns the current global epoch. | |
304 | /// | |
305 | /// `try_advance()` is annotated `#[cold]` because it is rarely called. | |
306 | #[cold] | |
6a06907d | 307 | pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch { |
416331ca XL |
308 | let global_epoch = self.epoch.load(Ordering::Relaxed); |
309 | atomic::fence(Ordering::SeqCst); | |
310 | ||
311 | // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly | |
312 | // easy to implement in a lock-free manner. However, traversal can be slow due to cache | |
313 | // misses and data dependencies. We should experiment with other data structures as well. | |
314 | for local in self.locals.iter(&guard) { | |
315 | match local { | |
316 | Err(IterError::Stalled) => { | |
317 | // A concurrent thread stalled this iteration. That thread might also try to | |
318 | // advance the epoch, in which case we leave the job to it. Otherwise, the | |
319 | // epoch will not be advanced. | |
320 | return global_epoch; | |
321 | } | |
322 | Ok(local) => { | |
323 | let local_epoch = local.epoch.load(Ordering::Relaxed); | |
324 | ||
325 | // If the participant was pinned in a different epoch, we cannot advance the | |
326 | // global epoch just yet. | |
327 | if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch { | |
328 | return global_epoch; | |
329 | } | |
330 | } | |
331 | } | |
332 | } | |
333 | atomic::fence(Ordering::Acquire); | |
334 | ||
335 | // All pinned participants were pinned in the current global epoch. | |
336 | // Now let's advance the global epoch... | |
337 | // | |
338 | // Note that if another thread already advanced it before us, this store will simply | |
339 | // overwrite the global epoch with the same value. This is true because `try_advance` was | |
340 | // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be | |
341 | // advanced two steps ahead of it. | |
342 | let new_epoch = global_epoch.successor(); | |
343 | self.epoch.store(new_epoch, Ordering::Release); | |
344 | new_epoch | |
345 | } | |
346 | } | |
347 | ||
348 | /// Participant for garbage collection. | |
6a06907d | 349 | pub(crate) struct Local { |
416331ca XL |
350 | /// A node in the intrusive linked list of `Local`s. |
351 | entry: Entry, | |
352 | ||
353 | /// The local epoch. | |
354 | epoch: AtomicEpoch, | |
355 | ||
356 | /// A reference to the global data. | |
357 | /// | |
358 | /// When all guards and handles get dropped, this reference is destroyed. | |
359 | collector: UnsafeCell<ManuallyDrop<Collector>>, | |
360 | ||
361 | /// The local bag of deferred functions. | |
362 | pub(crate) bag: UnsafeCell<Bag>, | |
363 | ||
364 | /// The number of guards keeping this participant pinned. | |
365 | guard_count: Cell<usize>, | |
366 | ||
367 | /// The number of active handles. | |
368 | handle_count: Cell<usize>, | |
369 | ||
370 | /// Total number of pinnings performed. | |
371 | /// | |
5869c6ff | 372 | /// This is just an auxiliary counter that sometimes kicks off collection. |
416331ca XL |
373 | pin_count: Cell<Wrapping<usize>>, |
374 | } | |
375 | ||
5869c6ff XL |
376 | // Make sure `Local` is less than or equal to 2048 bytes. |
377 | // https://github.com/crossbeam-rs/crossbeam/issues/551 | |
6a06907d | 378 | #[cfg(not(crossbeam_sanitize))] // `crossbeam_sanitize` reduces the size of `Local` |
5869c6ff XL |
379 | #[test] |
380 | fn local_size() { | |
6a06907d XL |
381 | assert!( |
382 | core::mem::size_of::<Local>() <= 2048, | |
383 | "An allocation of `Local` should be <= 2048 bytes." | |
384 | ); | |
5869c6ff XL |
385 | } |
386 | ||
416331ca XL |
387 | impl Local { |
388 | /// Number of pinnings after which a participant will execute some deferred functions from the | |
389 | /// global queue. | |
390 | const PINNINGS_BETWEEN_COLLECT: usize = 128; | |
391 | ||
392 | /// Registers a new `Local` in the provided `Global`. | |
6a06907d | 393 | pub(crate) fn register(collector: &Collector) -> LocalHandle { |
416331ca XL |
394 | unsafe { |
395 | // Since we dereference no pointers in this block, it is safe to use `unprotected`. | |
396 | ||
397 | let local = Owned::new(Local { | |
398 | entry: Entry::default(), | |
399 | epoch: AtomicEpoch::new(Epoch::starting()), | |
400 | collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())), | |
401 | bag: UnsafeCell::new(Bag::new()), | |
402 | guard_count: Cell::new(0), | |
403 | handle_count: Cell::new(1), | |
404 | pin_count: Cell::new(Wrapping(0)), | |
405 | }) | |
5869c6ff XL |
406 | .into_shared(unprotected()); |
407 | collector.global.locals.insert(local, unprotected()); | |
416331ca XL |
408 | LocalHandle { |
409 | local: local.as_raw(), | |
410 | } | |
411 | } | |
412 | } | |
413 | ||
414 | /// Returns a reference to the `Global` in which this `Local` resides. | |
415 | #[inline] | |
6a06907d | 416 | pub(crate) fn global(&self) -> &Global { |
416331ca XL |
417 | &self.collector().global |
418 | } | |
419 | ||
420 | /// Returns a reference to the `Collector` in which this `Local` resides. | |
421 | #[inline] | |
6a06907d XL |
422 | pub(crate) fn collector(&self) -> &Collector { |
423 | self.collector.with(|c| unsafe { &**c }) | |
416331ca XL |
424 | } |
425 | ||
426 | /// Returns `true` if the current participant is pinned. | |
427 | #[inline] | |
6a06907d | 428 | pub(crate) fn is_pinned(&self) -> bool { |
416331ca XL |
429 | self.guard_count.get() > 0 |
430 | } | |
431 | ||
432 | /// Adds `deferred` to the thread-local bag. | |
433 | /// | |
434 | /// # Safety | |
435 | /// | |
436 | /// It should be safe for another thread to execute the given function. | |
6a06907d XL |
437 | pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) { |
438 | let bag = self.bag.with_mut(|b| &mut *b); | |
416331ca XL |
439 | |
440 | while let Err(d) = bag.try_push(deferred) { | |
441 | self.global().push_bag(bag, guard); | |
442 | deferred = d; | |
443 | } | |
444 | } | |
445 | ||
6a06907d XL |
446 | pub(crate) fn flush(&self, guard: &Guard) { |
447 | let bag = self.bag.with_mut(|b| unsafe { &mut *b }); | |
416331ca XL |
448 | |
449 | if !bag.is_empty() { | |
450 | self.global().push_bag(bag, guard); | |
451 | } | |
452 | ||
453 | self.global().collect(guard); | |
454 | } | |
455 | ||
456 | /// Pins the `Local`. | |
457 | #[inline] | |
6a06907d | 458 | pub(crate) fn pin(&self) -> Guard { |
416331ca XL |
459 | let guard = Guard { local: self }; |
460 | ||
461 | let guard_count = self.guard_count.get(); | |
462 | self.guard_count.set(guard_count.checked_add(1).unwrap()); | |
463 | ||
464 | if guard_count == 0 { | |
465 | let global_epoch = self.global().epoch.load(Ordering::Relaxed); | |
466 | let new_epoch = global_epoch.pinned(); | |
467 | ||
468 | // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence. | |
469 | // The fence makes sure that any future loads from `Atomic`s will not happen before | |
470 | // this store. | |
471 | if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { | |
472 | // HACK(stjepang): On x86 architectures there are two different ways of executing | |
473 | // a `SeqCst` fence. | |
474 | // | |
475 | // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. | |
6a06907d | 476 | // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` |
416331ca XL |
477 | // instruction. |
478 | // | |
479 | // Both instructions have the effect of a full barrier, but benchmarks have shown | |
480 | // that the second one makes pinning faster in this particular case. It is not | |
481 | // clear that this is permitted by the C++ memory model (SC fences work very | |
482 | // differently from SC accesses), but experimental evidence suggests that this | |
483 | // works fine. Using inline assembly would be a viable (and correct) alternative, | |
484 | // but alas, that is not possible on stable Rust. | |
485 | let current = Epoch::starting(); | |
6a06907d XL |
486 | let res = self.epoch.compare_exchange( |
487 | current, | |
488 | new_epoch, | |
489 | Ordering::SeqCst, | |
490 | Ordering::SeqCst, | |
491 | ); | |
492 | debug_assert!(res.is_ok(), "participant was expected to be unpinned"); | |
416331ca XL |
493 | // We add a compiler fence to make it less likely for LLVM to do something wrong |
494 | // here. Formally, this is not enough to get rid of data races; practically, | |
495 | // it should go a long way. | |
496 | atomic::compiler_fence(Ordering::SeqCst); | |
497 | } else { | |
498 | self.epoch.store(new_epoch, Ordering::Relaxed); | |
499 | atomic::fence(Ordering::SeqCst); | |
500 | } | |
501 | ||
502 | // Increment the pin counter. | |
503 | let count = self.pin_count.get(); | |
504 | self.pin_count.set(count + Wrapping(1)); | |
505 | ||
506 | // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting | |
507 | // some garbage. | |
508 | if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 { | |
509 | self.global().collect(&guard); | |
510 | } | |
511 | } | |
512 | ||
513 | guard | |
514 | } | |
515 | ||
516 | /// Unpins the `Local`. | |
517 | #[inline] | |
6a06907d | 518 | pub(crate) fn unpin(&self) { |
416331ca XL |
519 | let guard_count = self.guard_count.get(); |
520 | self.guard_count.set(guard_count - 1); | |
521 | ||
522 | if guard_count == 1 { | |
523 | self.epoch.store(Epoch::starting(), Ordering::Release); | |
524 | ||
525 | if self.handle_count.get() == 0 { | |
526 | self.finalize(); | |
527 | } | |
528 | } | |
529 | } | |
530 | ||
531 | /// Unpins and then pins the `Local`. | |
532 | #[inline] | |
6a06907d | 533 | pub(crate) fn repin(&self) { |
416331ca XL |
534 | let guard_count = self.guard_count.get(); |
535 | ||
536 | // Update the local epoch only if there's only one guard. | |
537 | if guard_count == 1 { | |
538 | let epoch = self.epoch.load(Ordering::Relaxed); | |
539 | let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned(); | |
540 | ||
541 | // Update the local epoch only if the global epoch is greater than the local epoch. | |
542 | if epoch != global_epoch { | |
543 | // We store the new epoch with `Release` because we need to ensure any memory | |
544 | // accesses from the previous epoch do not leak into the new one. | |
545 | self.epoch.store(global_epoch, Ordering::Release); | |
546 | ||
547 | // However, we don't need a following `SeqCst` fence, because it is safe for memory | |
548 | // accesses from the new epoch to be executed before updating the local epoch. At | |
549 | // worse, other threads will see the new epoch late and delay GC slightly. | |
550 | } | |
551 | } | |
552 | } | |
553 | ||
554 | /// Increments the handle count. | |
555 | #[inline] | |
6a06907d | 556 | pub(crate) fn acquire_handle(&self) { |
416331ca XL |
557 | let handle_count = self.handle_count.get(); |
558 | debug_assert!(handle_count >= 1); | |
559 | self.handle_count.set(handle_count + 1); | |
560 | } | |
561 | ||
562 | /// Decrements the handle count. | |
563 | #[inline] | |
6a06907d | 564 | pub(crate) fn release_handle(&self) { |
416331ca XL |
565 | let guard_count = self.guard_count.get(); |
566 | let handle_count = self.handle_count.get(); | |
567 | debug_assert!(handle_count >= 1); | |
568 | self.handle_count.set(handle_count - 1); | |
569 | ||
570 | if guard_count == 0 && handle_count == 1 { | |
571 | self.finalize(); | |
572 | } | |
573 | } | |
574 | ||
575 | /// Removes the `Local` from the global linked list. | |
576 | #[cold] | |
577 | fn finalize(&self) { | |
578 | debug_assert_eq!(self.guard_count.get(), 0); | |
579 | debug_assert_eq!(self.handle_count.get(), 0); | |
580 | ||
581 | // Temporarily increment handle count. This is required so that the following call to `pin` | |
582 | // doesn't call `finalize` again. | |
583 | self.handle_count.set(1); | |
584 | unsafe { | |
585 | // Pin and move the local bag into the global queue. It's important that `push_bag` | |
586 | // doesn't defer destruction on any new garbage. | |
587 | let guard = &self.pin(); | |
6a06907d XL |
588 | self.global() |
589 | .push_bag(self.bag.with_mut(|b| &mut *b), guard); | |
416331ca XL |
590 | } |
591 | // Revert the handle count back to zero. | |
592 | self.handle_count.set(0); | |
593 | ||
594 | unsafe { | |
595 | // Take the reference to the `Global` out of this `Local`. Since we're not protected | |
596 | // by a guard at this time, it's crucial that the reference is read before marking the | |
597 | // `Local` as deleted. | |
6a06907d | 598 | let collector: Collector = ptr::read(self.collector.with(|c| &*(*c))); |
416331ca XL |
599 | |
600 | // Mark this node in the linked list as deleted. | |
5869c6ff | 601 | self.entry.delete(unprotected()); |
416331ca XL |
602 | |
603 | // Finally, drop the reference to the global. Note that this might be the last reference | |
604 | // to the `Global`. If so, the global data will be destroyed and all deferred functions | |
605 | // in its queue will be executed. | |
606 | drop(collector); | |
607 | } | |
608 | } | |
609 | } | |
610 | ||
611 | impl IsElement<Local> for Local { | |
612 | fn entry_of(local: &Local) -> &Entry { | |
613 | let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry; | |
614 | unsafe { &*entry_ptr } | |
615 | } | |
616 | ||
617 | unsafe fn element_of(entry: &Entry) -> &Local { | |
618 | // offset_of! macro uses unsafe, but it's unnecessary in this context. | |
619 | #[allow(unused_unsafe)] | |
620 | let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local; | |
621 | &*local_ptr | |
622 | } | |
623 | ||
624 | unsafe fn finalize(entry: &Entry, guard: &Guard) { | |
625 | guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _)); | |
626 | } | |
627 | } | |
628 | ||
6a06907d | 629 | #[cfg(all(test, not(crossbeam_loom)))] |
416331ca XL |
630 | mod tests { |
631 | use std::sync::atomic::{AtomicUsize, Ordering}; | |
632 | ||
633 | use super::*; | |
634 | ||
635 | #[test] | |
636 | fn check_defer() { | |
637 | static FLAG: AtomicUsize = AtomicUsize::new(0); | |
638 | fn set() { | |
639 | FLAG.store(42, Ordering::Relaxed); | |
640 | } | |
641 | ||
642 | let d = Deferred::new(set); | |
643 | assert_eq!(FLAG.load(Ordering::Relaxed), 0); | |
644 | d.call(); | |
645 | assert_eq!(FLAG.load(Ordering::Relaxed), 42); | |
646 | } | |
647 | ||
648 | #[test] | |
649 | fn check_bag() { | |
650 | static FLAG: AtomicUsize = AtomicUsize::new(0); | |
651 | fn incr() { | |
652 | FLAG.fetch_add(1, Ordering::Relaxed); | |
653 | } | |
654 | ||
655 | let mut bag = Bag::new(); | |
656 | assert!(bag.is_empty()); | |
657 | ||
658 | for _ in 0..MAX_OBJECTS { | |
659 | assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() }); | |
660 | assert!(!bag.is_empty()); | |
661 | assert_eq!(FLAG.load(Ordering::Relaxed), 0); | |
662 | } | |
663 | ||
664 | let result = unsafe { bag.try_push(Deferred::new(incr)) }; | |
665 | assert!(result.is_err()); | |
666 | assert!(!bag.is_empty()); | |
667 | assert_eq!(FLAG.load(Ordering::Relaxed), 0); | |
668 | ||
669 | drop(bag); | |
670 | assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS); | |
671 | } | |
672 | } |