use prelude::v1::*;
+use alloc::boxed::FnBox;
use any::Any;
use cell::UnsafeCell;
use fmt;
use sync::{Mutex, Condvar, Arc};
use sys::thread as imp;
use sys_common::{stack, thread_info};
-use thunk::Thunk;
use time::Duration;
////////////////////////////////////////////////////////////////////////////////
pub fn spawn<F, T>(self, f: F) -> io::Result<JoinHandle<T>> where
F: FnOnce() -> T, F: Send + 'static, T: Send + 'static
{
- self.spawn_inner(Box::new(f)).map(|i| JoinHandle(i))
+ unsafe {
+ self.spawn_inner(Box::new(f)).map(JoinHandle)
+ }
}
/// Spawns a new child thread that must be joined within a given
pub fn scoped<'a, T, F>(self, f: F) -> io::Result<JoinGuard<'a, T>> where
T: Send + 'a, F: FnOnce() -> T, F: Send + 'a
{
- self.spawn_inner(Box::new(f)).map(|inner| {
- JoinGuard { inner: inner, _marker: PhantomData }
- })
+ unsafe {
+ self.spawn_inner(Box::new(f)).map(|inner| {
+ JoinGuard { inner: inner, _marker: PhantomData }
+ })
+ }
}
- fn spawn_inner<T: Send>(self, f: Thunk<(), T>) -> io::Result<JoinInner<T>> {
+ // NB: this function is unsafe as the lifetime parameter of the code to run
+ // in the new thread is not tied into the return value, and the return
+ // value must not outlast that lifetime.
+ unsafe fn spawn_inner<'a, T: Send>(self, f: Box<FnBox() -> T + Send + 'a>)
+ -> io::Result<JoinInner<T>> {
let Builder { name, stack_size } = self;
let stack_size = stack_size.unwrap_or(rt::min_stack());
let my_thread = Thread::new(name);
let their_thread = my_thread.clone();
- let my_packet = Packet(Arc::new(UnsafeCell::new(None)));
- let their_packet = Packet(my_packet.0.clone());
+ let my_packet = Arc::new(UnsafeCell::new(None));
+ let their_packet = my_packet.clone();
// Spawning a new OS thread guarantees that __morestack will never get
// triggered, but we must manually set up the actual stack bounds once
let addr = &something_around_the_top_of_the_stack as *const i32;
let my_stack_top = addr as usize;
let my_stack_bottom = my_stack_top - stack_size + 1024;
- unsafe {
- if let Some(name) = their_thread.name() {
- imp::set_name(name);
- }
- stack::record_os_managed_stack_bounds(my_stack_bottom,
- my_stack_top);
- thread_info::set(imp::guard::current(), their_thread);
+ stack::record_os_managed_stack_bounds(my_stack_bottom, my_stack_top);
+
+ if let Some(name) = their_thread.name() {
+ imp::Thread::set_name(name);
}
+ thread_info::set(imp::guard::current(), their_thread);
- let mut output: Option<T> = None;
+ let mut output = None;
let try_result = {
let ptr = &mut output;
-
- // There are two primary reasons that general try/catch is
- // unsafe. The first is that we do not support nested
- // try/catch. The fact that this is happening in a newly-spawned
- // thread suffices. The second is that unwinding while unwinding
- // is not defined. We take care of that by having an
- // 'unwinding' flag in the thread itself. For these reasons,
- // this unsafety should be ok.
- unsafe {
- unwind::try(move || {
- let f: Thunk<(), T> = f;
- let v: T = f();
- *ptr = Some(v)
- })
- }
+ unwind::try(move || *ptr = Some(f()))
};
- unsafe {
- *their_packet.0.get() = Some(match (output, try_result) {
- (Some(data), Ok(_)) => Ok(data),
- (None, Err(cause)) => Err(cause),
- _ => unreachable!()
- });
- }
+ *their_packet.get() = Some(try_result.map(|()| {
+ output.unwrap()
+ }));
};
Ok(JoinInner {
- native: try!(unsafe { imp::create(stack_size, Box::new(main)) }),
+ native: Some(try!(imp::Thread::new(stack_size, Box::new(main)))),
thread: my_thread,
- packet: my_packet,
- joined: false,
+ packet: Packet(my_packet),
})
}
}
/// Gets a handle to the thread that invokes it.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn current() -> Thread {
- thread_info::current_thread()
+ thread_info::current_thread().expect("use of std::thread::current() is not \
+ possible after the thread's local \
+ data has been destroyed")
}
/// Cooperatively gives up a timeslice to the OS scheduler.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn yield_now() {
- unsafe { imp::yield_now() }
+ imp::Thread::yield_now()
}
/// Determines whether the current thread is unwinding because of panic.
/// spurious wakeup.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn sleep_ms(ms: u32) {
- imp::sleep(Duration::milliseconds(ms as i64))
+ sleep(Duration::from_millis(ms as u64))
+}
+
+/// Puts the current thread to sleep for the specified amount of time.
+///
+/// The thread may sleep longer than the duration specified due to scheduling
+/// specifics or platform-dependent functionality.
+///
+/// # Platform behavior
+///
+/// On Unix platforms this function will not return early due to a
+/// signal being received or a spurious wakeup. Platforms which do not support
+/// nanosecond precision for sleeping will have `dur` rounded up to the nearest
+/// granularity of time they can sleep for.
+#[unstable(feature = "thread_sleep", reason = "waiting on Duration")]
+pub fn sleep(dur: Duration) {
+ imp::Thread::sleep(dur)
}
/// Blocks unless or until the current thread's token is made available (may wake spuriously).
/// the specified duration has been reached (may wake spuriously).
///
/// The semantics of this function are equivalent to `park()` except that the
-/// thread will be blocked for roughly no longer than *duration*. This method
+/// thread will be blocked for roughly no longer than *ms*. This method
/// should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
-/// amount of time waited to be precisely *duration* long.
+/// amount of time waited to be precisely *ms* long.
///
/// See the module doc for more detail.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn park_timeout_ms(ms: u32) {
+ park_timeout(Duration::from_millis(ms as u64))
+}
+
+/// Blocks unless or until the current thread's token is made available or
+/// the specified duration has been reached (may wake spuriously).
+///
+/// The semantics of this function are equivalent to `park()` except that the
+/// thread will be blocked for roughly no longer than *dur*. This method
+/// should not be used for precise timing due to anomalies such as
+/// preemption or platform differences that may not cause the maximum
+/// amount of time waited to be precisely *dur* long.
+///
+/// See the module doc for more detail.
+///
+/// # Platform behavior
+///
+/// Platforms which do not support nanosecond precision for sleeping will have
+/// `dur` rounded up to the nearest granularity of time they can sleep for.
+#[unstable(feature = "park_timeout", reason = "waiting on Duration")]
+pub fn park_timeout(dur: Duration) {
let thread = current();
let mut guard = thread.inner.lock.lock().unwrap();
if !*guard {
- let (g, _) = thread.inner.cvar.wait_timeout_ms(guard, ms).unwrap();
+ let (g, _) = thread.inner.cvar.wait_timeout(guard, dur).unwrap();
guard = g;
}
*guard = false;
cvar: Condvar,
}
-unsafe impl Sync for Inner {}
-
#[derive(Clone)]
#[stable(feature = "rust1", since = "1.0.0")]
/// A handle to a thread.
#[stable(feature = "rust1", since = "1.0.0")]
pub type Result<T> = ::result::Result<T, Box<Any + Send + 'static>>;
+// This packet is used to communicate the return value between the child thread
+// and the parent thread. Memory is shared through the `Arc` within and there's
+// no need for a mutex here because synchronization happens with `join()` (the
+// parent thread never reads this packet until the child has exited).
+//
+// This packet itself is then stored into a `JoinInner` which in turns is placed
+// in `JoinHandle` and `JoinGuard`. Due to the usage of `UnsafeCell` we need to
+// manually worry about impls like Send and Sync. The type `T` should
+// already always be Send (otherwise the thread could not have been created) and
+// this type is inherently Sync because no methods take &self. Regardless,
+// however, we add inheriting impls for Send/Sync to this type to ensure it's
+// Send/Sync and that future modifications will still appropriately classify it.
struct Packet<T>(Arc<UnsafeCell<Option<Result<T>>>>);
-unsafe impl<T:Send> Send for Packet<T> {}
-unsafe impl<T> Sync for Packet<T> {}
+unsafe impl<T: Send> Send for Packet<T> {}
+unsafe impl<T: Sync> Sync for Packet<T> {}
/// Inner representation for JoinHandle and JoinGuard
struct JoinInner<T> {
- native: imp::rust_thread,
+ native: Option<imp::Thread>,
thread: Thread,
packet: Packet<T>,
- joined: bool,
}
impl<T> JoinInner<T> {
fn join(&mut self) -> Result<T> {
- assert!(!self.joined);
- unsafe { imp::join(self.native) };
- self.joined = true;
+ self.native.take().unwrap().join();
unsafe {
(*self.packet.0.get()).take().unwrap()
}
}
}
-#[stable(feature = "rust1", since = "1.0.0")]
-#[unsafe_destructor]
-impl<T> Drop for JoinHandle<T> {
- fn drop(&mut self) {
- if !self.0.joined {
- unsafe { imp::detach(self.0.native) }
- }
- }
-}
-
/// An RAII-style guard that will block until thread termination when dropped.
///
/// The type `T` is the return type for the thread's main function.
}
}
-#[unsafe_destructor]
#[unstable(feature = "scoped",
reason = "memory unsafe if destructor is avoided, see #24292")]
impl<'a, T: Send + 'a> Drop for JoinGuard<'a, T> {
fn drop(&mut self) {
- if !self.inner.joined {
- if self.inner.join().is_err() {
- panic!("child thread {:?} panicked", self.thread());
- }
+ if self.inner.native.is_some() && self.inner.join().is_err() {
+ panic!("child thread {:?} panicked", self.thread());
}
}
}
+fn _assert_sync_and_send() {
+ fn _assert_both<T: Send + Sync>() {}
+ _assert_both::<JoinHandle<()>>();
+ _assert_both::<JoinGuard<()>>();
+ _assert_both::<Thread>();
+}
+
////////////////////////////////////////////////////////////////////////////////
// Tests
////////////////////////////////////////////////////////////////////////////////
#[cfg(test)]
-mod test {
+mod tests {
use prelude::v1::*;
use any::Any;