]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | ////////////////////////////////////////////////////////////////////////////// |
2 | // | |
3 | // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost | |
4 | // Software License, Version 1.0. (See accompanying file | |
5 | // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | // See http://www.boost.org/libs/interprocess for documentation. | |
8 | // | |
9 | ////////////////////////////////////////////////////////////////////////////// | |
10 | ||
11 | #ifndef BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP | |
12 | #define BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP | |
13 | ||
14 | #ifndef BOOST_CONFIG_HPP | |
15 | # include <boost/config.hpp> | |
16 | #endif | |
17 | # | |
18 | #if defined(BOOST_HAS_PRAGMA_ONCE) | |
19 | # pragma once | |
20 | #endif | |
21 | ||
22 | #include <boost/interprocess/detail/config_begin.hpp> | |
23 | #include <boost/interprocess/detail/workaround.hpp> | |
24 | #include <boost/interprocess/sync/spin/mutex.hpp> | |
25 | #include <boost/interprocess/detail/posix_time_types_wrk.hpp> | |
26 | #include <boost/interprocess/detail/atomic.hpp> | |
27 | #include <boost/interprocess/sync/scoped_lock.hpp> | |
28 | #include <boost/interprocess/exceptions.hpp> | |
29 | #include <boost/interprocess/detail/os_thread_functions.hpp> | |
30 | #include <boost/interprocess/sync/spin/wait.hpp> | |
31 | #include <boost/move/utility_core.hpp> | |
32 | #include <boost/cstdint.hpp> | |
33 | ||
34 | namespace boost { | |
35 | namespace interprocess { | |
36 | namespace ipcdetail { | |
37 | ||
38 | class spin_condition | |
39 | { | |
40 | spin_condition(const spin_condition &); | |
41 | spin_condition &operator=(const spin_condition &); | |
42 | public: | |
43 | spin_condition(); | |
44 | ~spin_condition(); | |
45 | ||
46 | void notify_one(); | |
47 | void notify_all(); | |
48 | ||
49 | template <typename L> | |
50 | bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time) | |
51 | { | |
52 | if (!lock) | |
53 | throw lock_exception(); | |
54 | //Handle infinity absolute time here to avoid complications in do_timed_wait | |
55 | if(abs_time == boost::posix_time::pos_infin){ | |
56 | this->wait(lock); | |
57 | return true; | |
58 | } | |
59 | return this->do_timed_wait(abs_time, *lock.mutex()); | |
60 | } | |
61 | ||
62 | template <typename L, typename Pr> | |
63 | bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time, Pr pred) | |
64 | { | |
65 | if (!lock) | |
66 | throw lock_exception(); | |
67 | //Handle infinity absolute time here to avoid complications in do_timed_wait | |
68 | if(abs_time == boost::posix_time::pos_infin){ | |
69 | this->wait(lock, pred); | |
70 | return true; | |
71 | } | |
72 | while (!pred()){ | |
73 | if (!this->do_timed_wait(abs_time, *lock.mutex())) | |
74 | return pred(); | |
75 | } | |
76 | return true; | |
77 | } | |
78 | ||
79 | template <typename L> | |
80 | void wait(L& lock) | |
81 | { | |
82 | if (!lock) | |
83 | throw lock_exception(); | |
84 | do_wait(*lock.mutex()); | |
85 | } | |
86 | ||
87 | template <typename L, typename Pr> | |
88 | void wait(L& lock, Pr pred) | |
89 | { | |
90 | if (!lock) | |
91 | throw lock_exception(); | |
92 | ||
93 | while (!pred()) | |
94 | do_wait(*lock.mutex()); | |
95 | } | |
96 | ||
97 | template<class InterprocessMutex> | |
98 | void do_wait(InterprocessMutex &mut); | |
99 | ||
100 | template<class InterprocessMutex> | |
101 | bool do_timed_wait(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut); | |
102 | ||
103 | private: | |
104 | template<class InterprocessMutex> | |
105 | bool do_timed_wait(bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut); | |
106 | ||
107 | enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL }; | |
108 | spin_mutex m_enter_mut; | |
109 | volatile boost::uint32_t m_command; | |
110 | volatile boost::uint32_t m_num_waiters; | |
111 | void notify(boost::uint32_t command); | |
112 | }; | |
113 | ||
114 | inline spin_condition::spin_condition() | |
115 | { | |
116 | //Note that this class is initialized to zero. | |
117 | //So zeroed memory can be interpreted as an initialized | |
118 | //condition variable | |
119 | m_command = SLEEP; | |
120 | m_num_waiters = 0; | |
121 | } | |
122 | ||
123 | inline spin_condition::~spin_condition() | |
124 | { | |
125 | //Notify all waiting threads | |
126 | //to allow POSIX semantics on condition destruction | |
127 | this->notify_all(); | |
128 | } | |
129 | ||
130 | inline void spin_condition::notify_one() | |
131 | { | |
132 | this->notify(NOTIFY_ONE); | |
133 | } | |
134 | ||
135 | inline void spin_condition::notify_all() | |
136 | { | |
137 | this->notify(NOTIFY_ALL); | |
138 | } | |
139 | ||
140 | inline void spin_condition::notify(boost::uint32_t command) | |
141 | { | |
142 | //This mutex guarantees that no other thread can enter to the | |
143 | //do_timed_wait method logic, so that thread count will be | |
144 | //constant until the function writes a NOTIFY_ALL command. | |
145 | //It also guarantees that no other notification can be signaled | |
146 | //on this spin_condition before this one ends | |
147 | m_enter_mut.lock(); | |
148 | ||
149 | //Return if there are no waiters | |
150 | if(!atomic_read32(&m_num_waiters)) { | |
151 | m_enter_mut.unlock(); | |
152 | return; | |
153 | } | |
154 | ||
155 | //Notify that all threads should execute wait logic | |
156 | spin_wait swait; | |
157 | while(SLEEP != atomic_cas32(const_cast<boost::uint32_t*>(&m_command), command, SLEEP)){ | |
158 | swait.yield(); | |
159 | } | |
160 | //The enter mutex will rest locked until the last waiting thread unlocks it | |
161 | } | |
162 | ||
163 | template<class InterprocessMutex> | |
164 | inline void spin_condition::do_wait(InterprocessMutex &mut) | |
165 | { | |
166 | this->do_timed_wait(false, boost::posix_time::ptime(), mut); | |
167 | } | |
168 | ||
169 | template<class InterprocessMutex> | |
170 | inline bool spin_condition::do_timed_wait | |
171 | (const boost::posix_time::ptime &abs_time, InterprocessMutex &mut) | |
172 | { | |
173 | return this->do_timed_wait(true, abs_time, mut); | |
174 | } | |
175 | ||
176 | template<class InterprocessMutex> | |
177 | inline bool spin_condition::do_timed_wait(bool tout_enabled, | |
178 | const boost::posix_time::ptime &abs_time, | |
179 | InterprocessMutex &mut) | |
180 | { | |
181 | boost::posix_time::ptime now = microsec_clock::universal_time(); | |
182 | ||
183 | if(tout_enabled){ | |
184 | if(now >= abs_time) return false; | |
185 | } | |
186 | ||
187 | typedef boost::interprocess::scoped_lock<spin_mutex> InternalLock; | |
188 | //The enter mutex guarantees that while executing a notification, | |
189 | //no other thread can execute the do_timed_wait method. | |
190 | { | |
191 | //--------------------------------------------------------------- | |
192 | InternalLock lock; | |
193 | if(tout_enabled){ | |
194 | InternalLock dummy(m_enter_mut, abs_time); | |
195 | lock = boost::move(dummy); | |
196 | } | |
197 | else{ | |
198 | InternalLock dummy(m_enter_mut); | |
199 | lock = boost::move(dummy); | |
200 | } | |
201 | ||
202 | if(!lock) | |
203 | return false; | |
204 | //--------------------------------------------------------------- | |
205 | //We increment the waiting thread count protected so that it will be | |
206 | //always constant when another thread enters the notification logic. | |
207 | //The increment marks this thread as "waiting on spin_condition" | |
208 | atomic_inc32(const_cast<boost::uint32_t*>(&m_num_waiters)); | |
209 | ||
210 | //We unlock the external mutex atomically with the increment | |
211 | mut.unlock(); | |
212 | } | |
213 | ||
214 | //By default, we suppose that no timeout has happened | |
215 | bool timed_out = false, unlock_enter_mut= false; | |
216 | ||
217 | //Loop until a notification indicates that the thread should | |
218 | //exit or timeout occurs | |
219 | while(1){ | |
220 | //The thread sleeps/spins until a spin_condition commands a notification | |
221 | //Notification occurred, we will lock the checking mutex so that | |
222 | spin_wait swait; | |
223 | while(atomic_read32(&m_command) == SLEEP){ | |
224 | swait.yield(); | |
225 | ||
226 | //Check for timeout | |
227 | if(tout_enabled){ | |
228 | now = microsec_clock::universal_time(); | |
229 | ||
230 | if(now >= abs_time){ | |
231 | //If we can lock the mutex it means that no notification | |
232 | //is being executed in this spin_condition variable | |
233 | timed_out = m_enter_mut.try_lock(); | |
234 | ||
235 | //If locking fails, indicates that another thread is executing | |
236 | //notification, so we play the notification game | |
237 | if(!timed_out){ | |
238 | //There is an ongoing notification, we will try again later | |
239 | continue; | |
240 | } | |
241 | //No notification in execution, since enter mutex is locked. | |
242 | //We will execute time-out logic, so we will decrement count, | |
243 | //release the enter mutex and return false. | |
244 | break; | |
245 | } | |
246 | } | |
247 | } | |
248 | ||
249 | //If a timeout occurred, the mutex will not execute checking logic | |
250 | if(tout_enabled && timed_out){ | |
251 | //Decrement wait count | |
252 | atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters)); | |
253 | unlock_enter_mut = true; | |
254 | break; | |
255 | } | |
256 | else{ | |
257 | boost::uint32_t result = atomic_cas32 | |
258 | (const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ONE); | |
259 | if(result == SLEEP){ | |
260 | //Other thread has been notified and since it was a NOTIFY one | |
261 | //command, this thread must sleep again | |
262 | continue; | |
263 | } | |
264 | else if(result == NOTIFY_ONE){ | |
265 | //If it was a NOTIFY_ONE command, only this thread should | |
266 | //exit. This thread has atomically marked command as sleep before | |
267 | //so no other thread will exit. | |
268 | //Decrement wait count. | |
269 | unlock_enter_mut = true; | |
270 | atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters)); | |
271 | break; | |
272 | } | |
273 | else{ | |
274 | //If it is a NOTIFY_ALL command, all threads should return | |
275 | //from do_timed_wait function. Decrement wait count. | |
276 | unlock_enter_mut = 1 == atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters)); | |
277 | //Check if this is the last thread of notify_all waiters | |
278 | //Only the last thread will release the mutex | |
279 | if(unlock_enter_mut){ | |
280 | atomic_cas32(const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ALL); | |
281 | } | |
282 | break; | |
283 | } | |
284 | } | |
285 | } | |
286 | ||
287 | //Unlock the enter mutex if it is a single notification, if this is | |
288 | //the last notified thread in a notify_all or a timeout has occurred | |
289 | if(unlock_enter_mut){ | |
290 | m_enter_mut.unlock(); | |
291 | } | |
292 | ||
293 | //Lock external again before returning from the method | |
294 | mut.lock(); | |
295 | return !timed_out; | |
296 | } | |
297 | ||
298 | } //namespace ipcdetail | |
299 | } //namespace interprocess | |
300 | } //namespace boost | |
301 | ||
302 | #include <boost/interprocess/detail/config_end.hpp> | |
303 | ||
304 | #endif //BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP |