X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fboost%2Fboost%2Finterprocess%2Fsync%2Fspin%2Fcondition.hpp;fp=ceph%2Fsrc%2Fboost%2Fboost%2Finterprocess%2Fsync%2Fspin%2Fcondition.hpp;h=cf3a15970f78ca9841d45d458b92b907545ea0a7;hb=1e59de90020f1d8d374046ef9cca56ccd4e806e2;hp=af7c65cf7b933e3ee46b0fbe9cad98ad4d654475;hpb=bd41e436e25044e8e83156060a37c23cb661c364;p=ceph.git diff --git a/ceph/src/boost/boost/interprocess/sync/spin/condition.hpp b/ceph/src/boost/boost/interprocess/sync/spin/condition.hpp index af7c65cf7..cf3a15970 100644 --- a/ceph/src/boost/boost/interprocess/sync/spin/condition.hpp +++ b/ceph/src/boost/boost/interprocess/sync/spin/condition.hpp @@ -21,12 +21,14 @@ #include #include + +#include #include -#include #include #include #include #include +#include #include #include #include @@ -39,261 +41,253 @@ class spin_condition { spin_condition(const spin_condition &); spin_condition &operator=(const spin_condition &); + public: - spin_condition(); - ~spin_condition(); + spin_condition() + { + //Note that this class is initialized to zero. + //So zeroed memory can be interpreted as an initialized + //condition variable + m_command = SLEEP; + m_num_waiters = 0; + } + + ~spin_condition() + { + //Notify all waiting threads + //to allow POSIX semantics on condition destruction + this->notify_all(); + } - void notify_one(); - void notify_all(); + void notify_one() + { this->notify(NOTIFY_ONE); } + + void notify_all() + { this->notify(NOTIFY_ALL); } template - bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time) + void wait(L& lock) + { + if (!lock) + throw lock_exception(); + this->do_timed_wait_impl(0, *lock.mutex()); + } + + template + void wait(L& lock, Pr pred) + { + if (!lock) + throw lock_exception(); + + while (!pred()) + this->do_timed_wait_impl(0, *lock.mutex()); + } + + template + bool timed_wait(L& lock, const TimePoint &abs_time) { if (!lock) throw lock_exception(); //Handle infinity absolute time here to avoid complications in do_timed_wait - if(abs_time.is_pos_infinity()){ + if(is_pos_infinity(abs_time)){ this->wait(lock); return true; } - return this->do_timed_wait(abs_time, *lock.mutex()); + return this->do_timed_wait_impl(abs_time, *lock.mutex()); } - template - bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time, Pr pred) + template + bool timed_wait(L& lock, const TimePoint &abs_time, Pr pred) { if (!lock) throw lock_exception(); //Handle infinity absolute time here to avoid complications in do_timed_wait - if(abs_time.is_pos_infinity()){ + if(is_pos_infinity(abs_time)){ this->wait(lock, pred); return true; } while (!pred()){ - if (!this->do_timed_wait(abs_time, *lock.mutex())) + if (!this->do_timed_wait_impl(abs_time, *lock.mutex())) return pred(); } return true; } - template - void wait(L& lock) - { - if (!lock) - throw lock_exception(); - do_wait(*lock.mutex()); - } - - template - void wait(L& lock, Pr pred) - { - if (!lock) - throw lock_exception(); + template + cv_status wait_until(L& lock, const TimePoint &abs_time) + { return this->timed_wait(lock, abs_time) ? cv_status::no_timeout : cv_status::timeout; } - while (!pred()) - do_wait(*lock.mutex()); - } + template + bool wait_until(L& lock, const TimePoint &abs_time, Pr pred) + { return this->timed_wait(lock, abs_time, pred); } - template - void do_wait(InterprocessMutex &mut); + template + cv_status wait_for(L& lock, const Duration &dur) + { return this->wait_until(lock, duration_to_ustime(dur)); } - template - bool do_timed_wait(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut); + template + bool wait_for(L& lock, const Duration &dur, Pr pred) + { return this->wait_until(lock, duration_to_ustime(dur), pred); } private: - template - bool do_timed_wait(bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut); - - enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL }; - spin_mutex m_enter_mut; - volatile boost::uint32_t m_command; - volatile boost::uint32_t m_num_waiters; - void notify(boost::uint32_t command); -}; -inline spin_condition::spin_condition() -{ - //Note that this class is initialized to zero. - //So zeroed memory can be interpreted as an initialized - //condition variable - m_command = SLEEP; - m_num_waiters = 0; -} - -inline spin_condition::~spin_condition() -{ - //Notify all waiting threads - //to allow POSIX semantics on condition destruction - this->notify_all(); -} + template + bool do_timed_wait_impl(const TimePoint &abs_time, InterprocessMutex &mut) + { + typedef boost::interprocess::scoped_lock InternalLock; + //The enter mutex guarantees that while executing a notification, + //no other thread can execute the do_timed_wait method. + { + //--------------------------------------------------------------- + InternalLock lock; + get_lock(bool_(), m_enter_mut, lock, abs_time); + + if(!lock) + return false; + //--------------------------------------------------------------- + //We increment the waiting thread count protected so that it will be + //always constant when another thread enters the notification logic. + //The increment marks this thread as "waiting on spin_condition" + atomic_inc32(const_cast(&m_num_waiters)); + + //We unlock the external mutex atomically with the increment + mut.unlock(); + } -inline void spin_condition::notify_one() -{ - this->notify(NOTIFY_ONE); -} + //By default, we suppose that no timeout has happened + bool timed_out = false, unlock_enter_mut= false; + + //Loop until a notification indicates that the thread should + //exit or timeout occurs + while(1){ + //The thread sleeps/spins until a spin_condition commands a notification + //Notification occurred, we will lock the checking mutex so that + spin_wait swait; + while(atomic_read32(&m_command) == SLEEP){ + swait.yield(); + + //Check for timeout + if(TimeoutEnabled){ + TimePoint now = get_now(bool_()); + + if(now >= abs_time){ + //If we can lock the mutex it means that no notification + //is being executed in this spin_condition variable + timed_out = m_enter_mut.try_lock(); + + //If locking fails, indicates that another thread is executing + //notification, so we play the notification game + if(!timed_out){ + //There is an ongoing notification, we will try again later + continue; + } + //No notification in execution, since enter mutex is locked. + //We will execute time-out logic, so we will decrement count, + //release the enter mutex and return false. + break; + } + } + } -inline void spin_condition::notify_all() -{ - this->notify(NOTIFY_ALL); -} + //If a timeout occurred, the mutex will not execute checking logic + if(TimeoutEnabled && timed_out){ + //Decrement wait count + atomic_dec32(const_cast(&m_num_waiters)); + unlock_enter_mut = true; + break; + } + else{ + boost::uint32_t result = atomic_cas32 + (const_cast(&m_command), SLEEP, NOTIFY_ONE); + if(result == SLEEP){ + //Other thread has been notified and since it was a NOTIFY one + //command, this thread must sleep again + continue; + } + else if(result == NOTIFY_ONE){ + //If it was a NOTIFY_ONE command, only this thread should + //exit. This thread has atomically marked command as sleep before + //so no other thread will exit. + //Decrement wait count. + unlock_enter_mut = true; + atomic_dec32(const_cast(&m_num_waiters)); + break; + } + else{ + //If it is a NOTIFY_ALL command, all threads should return + //from do_timed_wait function. Decrement wait count. + unlock_enter_mut = 1 == atomic_dec32(const_cast(&m_num_waiters)); + //Check if this is the last thread of notify_all waiters + //Only the last thread will release the mutex + if(unlock_enter_mut){ + atomic_cas32(const_cast(&m_command), SLEEP, NOTIFY_ALL); + } + break; + } + } + } -inline void spin_condition::notify(boost::uint32_t command) -{ - //This mutex guarantees that no other thread can enter to the - //do_timed_wait method logic, so that thread count will be - //constant until the function writes a NOTIFY_ALL command. - //It also guarantees that no other notification can be signaled - //on this spin_condition before this one ends - m_enter_mut.lock(); - - //Return if there are no waiters - if(!atomic_read32(&m_num_waiters)) { - m_enter_mut.unlock(); - return; - } + //Unlock the enter mutex if it is a single notification, if this is + //the last notified thread in a notify_all or a timeout has occurred + if(unlock_enter_mut){ + m_enter_mut.unlock(); + } - //Notify that all threads should execute wait logic - spin_wait swait; - while(SLEEP != atomic_cas32(const_cast(&m_command), command, SLEEP)){ - swait.yield(); + //Lock external again before returning from the method + mut.lock(); + return !timed_out; } - //The enter mutex will rest locked until the last waiting thread unlocks it -} -template -inline void spin_condition::do_wait(InterprocessMutex &mut) -{ - this->do_timed_wait(false, boost::posix_time::ptime(), mut); -} + template + static TimePoint get_now(bool_) + { return microsec_clock::universal_time(); } -template -inline bool spin_condition::do_timed_wait - (const boost::posix_time::ptime &abs_time, InterprocessMutex &mut) -{ - return this->do_timed_wait(true, abs_time, mut); -} + template + static TimePoint get_now(bool_) + { return TimePoint(); } -template -inline bool spin_condition::do_timed_wait(bool tout_enabled, - const boost::posix_time::ptime &abs_time, - InterprocessMutex &mut) -{ - boost::posix_time::ptime now = microsec_clock::universal_time(); + template + static void get_lock(bool_, Mutex &m, Lock &lck, const TimePoint &abs_time) + { + Lock dummy(m, abs_time); + lck = boost::move(dummy); + } - if(tout_enabled){ - if(now >= abs_time) return false; + template + static void get_lock(bool_, Mutex &m, Lock &lck, const TimePoint &) + { + Lock dummy(m); + lck = boost::move(dummy); } - typedef boost::interprocess::scoped_lock InternalLock; - //The enter mutex guarantees that while executing a notification, - //no other thread can execute the do_timed_wait method. + void notify(boost::uint32_t command) { - //--------------------------------------------------------------- - InternalLock lock; - if(tout_enabled){ - InternalLock dummy(m_enter_mut, abs_time); - lock = boost::move(dummy); - } - else{ - InternalLock dummy(m_enter_mut); - lock = boost::move(dummy); + //This mutex guarantees that no other thread can enter to the + //do_timed_wait method logic, so that thread count will be + //constant until the function writes a NOTIFY_ALL command. + //It also guarantees that no other notification can be signaled + //on this spin_condition before this one ends + m_enter_mut.lock(); + + //Return if there are no waiters + if(!atomic_read32(&m_num_waiters)) { + m_enter_mut.unlock(); + return; } - if(!lock) - return false; - //--------------------------------------------------------------- - //We increment the waiting thread count protected so that it will be - //always constant when another thread enters the notification logic. - //The increment marks this thread as "waiting on spin_condition" - atomic_inc32(const_cast(&m_num_waiters)); - - //We unlock the external mutex atomically with the increment - mut.unlock(); - } - - //By default, we suppose that no timeout has happened - bool timed_out = false, unlock_enter_mut= false; - - //Loop until a notification indicates that the thread should - //exit or timeout occurs - while(1){ - //The thread sleeps/spins until a spin_condition commands a notification - //Notification occurred, we will lock the checking mutex so that + //Notify that all threads should execute wait logic spin_wait swait; - while(atomic_read32(&m_command) == SLEEP){ + while(SLEEP != atomic_cas32(const_cast(&m_command), command, SLEEP)){ swait.yield(); - - //Check for timeout - if(tout_enabled){ - now = microsec_clock::universal_time(); - - if(now >= abs_time){ - //If we can lock the mutex it means that no notification - //is being executed in this spin_condition variable - timed_out = m_enter_mut.try_lock(); - - //If locking fails, indicates that another thread is executing - //notification, so we play the notification game - if(!timed_out){ - //There is an ongoing notification, we will try again later - continue; - } - //No notification in execution, since enter mutex is locked. - //We will execute time-out logic, so we will decrement count, - //release the enter mutex and return false. - break; - } - } - } - - //If a timeout occurred, the mutex will not execute checking logic - if(tout_enabled && timed_out){ - //Decrement wait count - atomic_dec32(const_cast(&m_num_waiters)); - unlock_enter_mut = true; - break; - } - else{ - boost::uint32_t result = atomic_cas32 - (const_cast(&m_command), SLEEP, NOTIFY_ONE); - if(result == SLEEP){ - //Other thread has been notified and since it was a NOTIFY one - //command, this thread must sleep again - continue; - } - else if(result == NOTIFY_ONE){ - //If it was a NOTIFY_ONE command, only this thread should - //exit. This thread has atomically marked command as sleep before - //so no other thread will exit. - //Decrement wait count. - unlock_enter_mut = true; - atomic_dec32(const_cast(&m_num_waiters)); - break; - } - else{ - //If it is a NOTIFY_ALL command, all threads should return - //from do_timed_wait function. Decrement wait count. - unlock_enter_mut = 1 == atomic_dec32(const_cast(&m_num_waiters)); - //Check if this is the last thread of notify_all waiters - //Only the last thread will release the mutex - if(unlock_enter_mut){ - atomic_cas32(const_cast(&m_command), SLEEP, NOTIFY_ALL); - } - break; - } } + //The enter mutex will rest locked until the last waiting thread unlocks it } - //Unlock the enter mutex if it is a single notification, if this is - //the last notified thread in a notify_all or a timeout has occurred - if(unlock_enter_mut){ - m_enter_mut.unlock(); - } - - //Lock external again before returning from the method - mut.lock(); - return !timed_out; -} + enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL }; + spin_mutex m_enter_mut; + volatile boost::uint32_t m_command; + volatile boost::uint32_t m_num_waiters; +}; } //namespace ipcdetail } //namespace interprocess