#include <boost/interprocess/detail/config_begin.hpp>
#include <boost/interprocess/detail/workaround.hpp>
+
+#include <boost/interprocess/sync/cv_status.hpp>
#include <boost/interprocess/sync/spin/mutex.hpp>
-#include <boost/interprocess/detail/posix_time_types_wrk.hpp>
#include <boost/interprocess/detail/atomic.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/exceptions.hpp>
#include <boost/interprocess/detail/os_thread_functions.hpp>
+#include <boost/interprocess/detail/timed_utils.hpp>
#include <boost/interprocess/sync/spin/wait.hpp>
#include <boost/move/utility_core.hpp>
#include <boost/cstdint.hpp>
{
spin_condition(const spin_condition &);
spin_condition &operator=(const spin_condition &);
+
public:
- spin_condition();
- ~spin_condition();
+ spin_condition()
+ {
+ //Note that this class is initialized to zero.
+ //So zeroed memory can be interpreted as an initialized
+ //condition variable
+ m_command = SLEEP;
+ m_num_waiters = 0;
+ }
+
+ ~spin_condition()
+ {
+ //Notify all waiting threads
+ //to allow POSIX semantics on condition destruction
+ this->notify_all();
+ }
- void notify_one();
- void notify_all();
+ void notify_one()
+ { this->notify(NOTIFY_ONE); }
+
+ void notify_all()
+ { this->notify(NOTIFY_ALL); }
template <typename L>
- bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time)
+ void wait(L& lock)
+ {
+ if (!lock)
+ throw lock_exception();
+ this->do_timed_wait_impl<false>(0, *lock.mutex());
+ }
+
+ template <typename L, typename Pr>
+ void wait(L& lock, Pr pred)
+ {
+ if (!lock)
+ throw lock_exception();
+
+ while (!pred())
+ this->do_timed_wait_impl<false>(0, *lock.mutex());
+ }
+
+ template <typename L, typename TimePoint>
+ bool timed_wait(L& lock, const TimePoint &abs_time)
{
if (!lock)
throw lock_exception();
//Handle infinity absolute time here to avoid complications in do_timed_wait
- if(abs_time.is_pos_infinity()){
+ if(is_pos_infinity(abs_time)){
this->wait(lock);
return true;
}
- return this->do_timed_wait(abs_time, *lock.mutex());
+ return this->do_timed_wait_impl<true>(abs_time, *lock.mutex());
}
- template <typename L, typename Pr>
- bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time, Pr pred)
+ template <typename L, typename TimePoint, typename Pr>
+ bool timed_wait(L& lock, const TimePoint &abs_time, Pr pred)
{
if (!lock)
throw lock_exception();
//Handle infinity absolute time here to avoid complications in do_timed_wait
- if(abs_time.is_pos_infinity()){
+ if(is_pos_infinity(abs_time)){
this->wait(lock, pred);
return true;
}
while (!pred()){
- if (!this->do_timed_wait(abs_time, *lock.mutex()))
+ if (!this->do_timed_wait_impl<true>(abs_time, *lock.mutex()))
return pred();
}
return true;
}
- template <typename L>
- void wait(L& lock)
- {
- if (!lock)
- throw lock_exception();
- do_wait(*lock.mutex());
- }
-
- template <typename L, typename Pr>
- void wait(L& lock, Pr pred)
- {
- if (!lock)
- throw lock_exception();
+ template <typename L, class TimePoint>
+ cv_status wait_until(L& lock, const TimePoint &abs_time)
+ { return this->timed_wait(lock, abs_time) ? cv_status::no_timeout : cv_status::timeout; }
- while (!pred())
- do_wait(*lock.mutex());
- }
+ template <typename L, class TimePoint, typename Pr>
+ bool wait_until(L& lock, const TimePoint &abs_time, Pr pred)
+ { return this->timed_wait(lock, abs_time, pred); }
- template<class InterprocessMutex>
- void do_wait(InterprocessMutex &mut);
+ template <typename L, class Duration>
+ cv_status wait_for(L& lock, const Duration &dur)
+ { return this->wait_until(lock, duration_to_ustime(dur)); }
- template<class InterprocessMutex>
- bool do_timed_wait(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
+ template <typename L, class Duration, typename Pr>
+ bool wait_for(L& lock, const Duration &dur, Pr pred)
+ { return this->wait_until(lock, duration_to_ustime(dur), pred); }
private:
- template<class InterprocessMutex>
- bool do_timed_wait(bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
-
- enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL };
- spin_mutex m_enter_mut;
- volatile boost::uint32_t m_command;
- volatile boost::uint32_t m_num_waiters;
- void notify(boost::uint32_t command);
-};
-inline spin_condition::spin_condition()
-{
- //Note that this class is initialized to zero.
- //So zeroed memory can be interpreted as an initialized
- //condition variable
- m_command = SLEEP;
- m_num_waiters = 0;
-}
-
-inline spin_condition::~spin_condition()
-{
- //Notify all waiting threads
- //to allow POSIX semantics on condition destruction
- this->notify_all();
-}
+ template<bool TimeoutEnabled, class InterprocessMutex, class TimePoint>
+ bool do_timed_wait_impl(const TimePoint &abs_time, InterprocessMutex &mut)
+ {
+ typedef boost::interprocess::scoped_lock<spin_mutex> InternalLock;
+ //The enter mutex guarantees that while executing a notification,
+ //no other thread can execute the do_timed_wait method.
+ {
+ //---------------------------------------------------------------
+ InternalLock lock;
+ get_lock(bool_<TimeoutEnabled>(), m_enter_mut, lock, abs_time);
+
+ if(!lock)
+ return false;
+ //---------------------------------------------------------------
+ //We increment the waiting thread count protected so that it will be
+ //always constant when another thread enters the notification logic.
+ //The increment marks this thread as "waiting on spin_condition"
+ atomic_inc32(const_cast<boost::uint32_t*>(&m_num_waiters));
+
+ //We unlock the external mutex atomically with the increment
+ mut.unlock();
+ }
-inline void spin_condition::notify_one()
-{
- this->notify(NOTIFY_ONE);
-}
+ //By default, we suppose that no timeout has happened
+ bool timed_out = false, unlock_enter_mut= false;
+
+ //Loop until a notification indicates that the thread should
+ //exit or timeout occurs
+ while(1){
+ //The thread sleeps/spins until a spin_condition commands a notification
+ //Notification occurred, we will lock the checking mutex so that
+ spin_wait swait;
+ while(atomic_read32(&m_command) == SLEEP){
+ swait.yield();
+
+ //Check for timeout
+ if(TimeoutEnabled){
+ TimePoint now = get_now<TimePoint>(bool_<TimeoutEnabled>());
+
+ if(now >= abs_time){
+ //If we can lock the mutex it means that no notification
+ //is being executed in this spin_condition variable
+ timed_out = m_enter_mut.try_lock();
+
+ //If locking fails, indicates that another thread is executing
+ //notification, so we play the notification game
+ if(!timed_out){
+ //There is an ongoing notification, we will try again later
+ continue;
+ }
+ //No notification in execution, since enter mutex is locked.
+ //We will execute time-out logic, so we will decrement count,
+ //release the enter mutex and return false.
+ break;
+ }
+ }
+ }
-inline void spin_condition::notify_all()
-{
- this->notify(NOTIFY_ALL);
-}
+ //If a timeout occurred, the mutex will not execute checking logic
+ if(TimeoutEnabled && timed_out){
+ //Decrement wait count
+ atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
+ unlock_enter_mut = true;
+ break;
+ }
+ else{
+ boost::uint32_t result = atomic_cas32
+ (const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ONE);
+ if(result == SLEEP){
+ //Other thread has been notified and since it was a NOTIFY one
+ //command, this thread must sleep again
+ continue;
+ }
+ else if(result == NOTIFY_ONE){
+ //If it was a NOTIFY_ONE command, only this thread should
+ //exit. This thread has atomically marked command as sleep before
+ //so no other thread will exit.
+ //Decrement wait count.
+ unlock_enter_mut = true;
+ atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
+ break;
+ }
+ else{
+ //If it is a NOTIFY_ALL command, all threads should return
+ //from do_timed_wait function. Decrement wait count.
+ unlock_enter_mut = 1 == atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
+ //Check if this is the last thread of notify_all waiters
+ //Only the last thread will release the mutex
+ if(unlock_enter_mut){
+ atomic_cas32(const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ALL);
+ }
+ break;
+ }
+ }
+ }
-inline void spin_condition::notify(boost::uint32_t command)
-{
- //This mutex guarantees that no other thread can enter to the
- //do_timed_wait method logic, so that thread count will be
- //constant until the function writes a NOTIFY_ALL command.
- //It also guarantees that no other notification can be signaled
- //on this spin_condition before this one ends
- m_enter_mut.lock();
-
- //Return if there are no waiters
- if(!atomic_read32(&m_num_waiters)) {
- m_enter_mut.unlock();
- return;
- }
+ //Unlock the enter mutex if it is a single notification, if this is
+ //the last notified thread in a notify_all or a timeout has occurred
+ if(unlock_enter_mut){
+ m_enter_mut.unlock();
+ }
- //Notify that all threads should execute wait logic
- spin_wait swait;
- while(SLEEP != atomic_cas32(const_cast<boost::uint32_t*>(&m_command), command, SLEEP)){
- swait.yield();
+ //Lock external again before returning from the method
+ mut.lock();
+ return !timed_out;
}
- //The enter mutex will rest locked until the last waiting thread unlocks it
-}
-template<class InterprocessMutex>
-inline void spin_condition::do_wait(InterprocessMutex &mut)
-{
- this->do_timed_wait(false, boost::posix_time::ptime(), mut);
-}
+ template <class TimePoint>
+ static TimePoint get_now(bool_<true>)
+ { return microsec_clock<TimePoint>::universal_time(); }
-template<class InterprocessMutex>
-inline bool spin_condition::do_timed_wait
- (const boost::posix_time::ptime &abs_time, InterprocessMutex &mut)
-{
- return this->do_timed_wait(true, abs_time, mut);
-}
+ template <class TimePoint>
+ static TimePoint get_now(bool_<false>)
+ { return TimePoint(); }
-template<class InterprocessMutex>
-inline bool spin_condition::do_timed_wait(bool tout_enabled,
- const boost::posix_time::ptime &abs_time,
- InterprocessMutex &mut)
-{
- boost::posix_time::ptime now = microsec_clock::universal_time();
+ template <class Mutex, class Lock, class TimePoint>
+ static void get_lock(bool_<true>, Mutex &m, Lock &lck, const TimePoint &abs_time)
+ {
+ Lock dummy(m, abs_time);
+ lck = boost::move(dummy);
+ }
- if(tout_enabled){
- if(now >= abs_time) return false;
+ template <class Mutex, class Lock, class TimePoint>
+ static void get_lock(bool_<false>, Mutex &m, Lock &lck, const TimePoint &)
+ {
+ Lock dummy(m);
+ lck = boost::move(dummy);
}
- typedef boost::interprocess::scoped_lock<spin_mutex> InternalLock;
- //The enter mutex guarantees that while executing a notification,
- //no other thread can execute the do_timed_wait method.
+ void notify(boost::uint32_t command)
{
- //---------------------------------------------------------------
- InternalLock lock;
- if(tout_enabled){
- InternalLock dummy(m_enter_mut, abs_time);
- lock = boost::move(dummy);
- }
- else{
- InternalLock dummy(m_enter_mut);
- lock = boost::move(dummy);
+ //This mutex guarantees that no other thread can enter to the
+ //do_timed_wait method logic, so that thread count will be
+ //constant until the function writes a NOTIFY_ALL command.
+ //It also guarantees that no other notification can be signaled
+ //on this spin_condition before this one ends
+ m_enter_mut.lock();
+
+ //Return if there are no waiters
+ if(!atomic_read32(&m_num_waiters)) {
+ m_enter_mut.unlock();
+ return;
}
- if(!lock)
- return false;
- //---------------------------------------------------------------
- //We increment the waiting thread count protected so that it will be
- //always constant when another thread enters the notification logic.
- //The increment marks this thread as "waiting on spin_condition"
- atomic_inc32(const_cast<boost::uint32_t*>(&m_num_waiters));
-
- //We unlock the external mutex atomically with the increment
- mut.unlock();
- }
-
- //By default, we suppose that no timeout has happened
- bool timed_out = false, unlock_enter_mut= false;
-
- //Loop until a notification indicates that the thread should
- //exit or timeout occurs
- while(1){
- //The thread sleeps/spins until a spin_condition commands a notification
- //Notification occurred, we will lock the checking mutex so that
+ //Notify that all threads should execute wait logic
spin_wait swait;
- while(atomic_read32(&m_command) == SLEEP){
+ while(SLEEP != atomic_cas32(const_cast<boost::uint32_t*>(&m_command), command, SLEEP)){
swait.yield();
-
- //Check for timeout
- if(tout_enabled){
- now = microsec_clock::universal_time();
-
- if(now >= abs_time){
- //If we can lock the mutex it means that no notification
- //is being executed in this spin_condition variable
- timed_out = m_enter_mut.try_lock();
-
- //If locking fails, indicates that another thread is executing
- //notification, so we play the notification game
- if(!timed_out){
- //There is an ongoing notification, we will try again later
- continue;
- }
- //No notification in execution, since enter mutex is locked.
- //We will execute time-out logic, so we will decrement count,
- //release the enter mutex and return false.
- break;
- }
- }
- }
-
- //If a timeout occurred, the mutex will not execute checking logic
- if(tout_enabled && timed_out){
- //Decrement wait count
- atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
- unlock_enter_mut = true;
- break;
- }
- else{
- boost::uint32_t result = atomic_cas32
- (const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ONE);
- if(result == SLEEP){
- //Other thread has been notified and since it was a NOTIFY one
- //command, this thread must sleep again
- continue;
- }
- else if(result == NOTIFY_ONE){
- //If it was a NOTIFY_ONE command, only this thread should
- //exit. This thread has atomically marked command as sleep before
- //so no other thread will exit.
- //Decrement wait count.
- unlock_enter_mut = true;
- atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
- break;
- }
- else{
- //If it is a NOTIFY_ALL command, all threads should return
- //from do_timed_wait function. Decrement wait count.
- unlock_enter_mut = 1 == atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
- //Check if this is the last thread of notify_all waiters
- //Only the last thread will release the mutex
- if(unlock_enter_mut){
- atomic_cas32(const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ALL);
- }
- break;
- }
}
+ //The enter mutex will rest locked until the last waiting thread unlocks it
}
- //Unlock the enter mutex if it is a single notification, if this is
- //the last notified thread in a notify_all or a timeout has occurred
- if(unlock_enter_mut){
- m_enter_mut.unlock();
- }
-
- //Lock external again before returning from the method
- mut.lock();
- return !timed_out;
-}
+ enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL };
+ spin_mutex m_enter_mut;
+ volatile boost::uint32_t m_command;
+ volatile boost::uint32_t m_num_waiters;
+};
} //namespace ipcdetail
} //namespace interprocess