]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | ////////////////////////////////////////////////////////////////////////////// |
2 | // | |
3 | // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost | |
4 | // Software License, Version 1.0. (See accompanying file | |
5 | // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | // See http://www.boost.org/libs/interprocess for documentation. | |
8 | // | |
9 | ////////////////////////////////////////////////////////////////////////////// | |
10 | ||
11 | #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP | |
12 | #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP | |
13 | ||
14 | #ifndef BOOST_CONFIG_HPP | |
15 | # include <boost/config.hpp> | |
16 | #endif | |
17 | # | |
18 | #if defined(BOOST_HAS_PRAGMA_ONCE) | |
19 | # pragma once | |
20 | #endif | |
21 | ||
22 | #include <boost/interprocess/detail/config_begin.hpp> | |
23 | #include <boost/interprocess/detail/workaround.hpp> | |
24 | ||
25 | #include <boost/interprocess/shared_memory_object.hpp> | |
26 | #include <boost/interprocess/detail/managed_open_or_create_impl.hpp> | |
27 | #include <boost/interprocess/sync/interprocess_condition.hpp> | |
28 | #include <boost/interprocess/sync/interprocess_mutex.hpp> | |
29 | #include <boost/interprocess/sync/scoped_lock.hpp> | |
30 | #include <boost/interprocess/detail/utilities.hpp> | |
31 | #include <boost/interprocess/offset_ptr.hpp> | |
32 | #include <boost/interprocess/creation_tags.hpp> | |
33 | #include <boost/interprocess/exceptions.hpp> | |
34 | #include <boost/interprocess/permissions.hpp> | |
35 | #include <boost/core/no_exceptions_support.hpp> | |
36 | #include <boost/interprocess/detail/type_traits.hpp> | |
37 | #include <boost/intrusive/pointer_traits.hpp> | |
38 | #include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of | |
39 | #include <boost/intrusive/pointer_traits.hpp> | |
40 | #include <boost/assert.hpp> | |
41 | #include <algorithm> //std::lower_bound | |
42 | #include <cstddef> //std::size_t | |
43 | #include <cstring> //memcpy | |
44 | ||
45 | ||
46 | //!\file | |
47 | //!Describes an inter-process message queue. This class allows sending | |
48 | //!messages between processes and allows blocking, non-blocking and timed | |
49 | //!sending and receiving. | |
50 | ||
51 | namespace boost{ namespace interprocess{ | |
52 | ||
53 | namespace ipcdetail | |
54 | { | |
55 | template<class VoidPointer> | |
56 | class msg_queue_initialization_func_t; | |
57 | } | |
58 | ||
59 | //!A class that allows sending messages | |
60 | //!between processes. | |
61 | template<class VoidPointer> | |
62 | class message_queue_t | |
63 | { | |
64 | #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) | |
65 | //Blocking modes | |
66 | enum block_t { blocking, timed, non_blocking }; | |
67 | ||
68 | message_queue_t(); | |
69 | #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED | |
70 | ||
71 | public: | |
72 | typedef VoidPointer void_pointer; | |
73 | typedef typename boost::intrusive:: | |
74 | pointer_traits<void_pointer>::template | |
75 | rebind_pointer<char>::type char_ptr; | |
76 | typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; | |
77 | typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type; | |
78 | ||
79 | //!Creates a process shared message queue with name "name". For this message queue, | |
80 | //!the maximum number of messages will be "max_num_msg" and the maximum message size | |
81 | //!will be "max_msg_size". Throws on error and if the queue was previously created. | |
82 | message_queue_t(create_only_t create_only, | |
83 | const char *name, | |
84 | size_type max_num_msg, | |
85 | size_type max_msg_size, | |
86 | const permissions &perm = permissions()); | |
87 | ||
88 | //!Opens or creates a process shared message queue with name "name". | |
89 | //!If the queue is created, the maximum number of messages will be "max_num_msg" | |
90 | //!and the maximum message size will be "max_msg_size". If queue was previously | |
91 | //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters | |
92 | //!are ignored. Throws on error. | |
93 | message_queue_t(open_or_create_t open_or_create, | |
94 | const char *name, | |
95 | size_type max_num_msg, | |
96 | size_type max_msg_size, | |
97 | const permissions &perm = permissions()); | |
98 | ||
99 | //!Opens a previously created process shared message queue with name "name". | |
100 | //!If the queue was not previously created or there are no free resources, | |
101 | //!throws an error. | |
102 | message_queue_t(open_only_t open_only, | |
103 | const char *name); | |
104 | ||
105 | //!Destroys *this and indicates that the calling process is finished using | |
106 | //!the resource. All opened message queues are still | |
107 | //!valid after destruction. The destructor function will deallocate | |
108 | //!any system resources allocated by the system for use by this process for | |
109 | //!this resource. The resource can still be opened again calling | |
110 | //!the open constructor overload. To erase the message queue from the system | |
111 | //!use remove(). | |
112 | ~message_queue_t(); | |
113 | ||
114 | //!Sends a message stored in buffer "buffer" with size "buffer_size" in the | |
115 | //!message queue with priority "priority". If the message queue is full | |
116 | //!the sender is blocked. Throws interprocess_error on error. | |
117 | void send (const void *buffer, size_type buffer_size, | |
118 | unsigned int priority); | |
119 | ||
120 | //!Sends a message stored in buffer "buffer" with size "buffer_size" through the | |
121 | //!message queue with priority "priority". If the message queue is full | |
122 | //!the sender is not blocked and returns false, otherwise returns true. | |
123 | //!Throws interprocess_error on error. | |
124 | bool try_send (const void *buffer, size_type buffer_size, | |
125 | unsigned int priority); | |
126 | ||
127 | //!Sends a message stored in buffer "buffer" with size "buffer_size" in the | |
128 | //!message queue with priority "priority". If the message queue is full | |
129 | //!the sender retries until time "abs_time" is reached. Returns true if | |
130 | //!the message has been successfully sent. Returns false if timeout is reached. | |
131 | //!Throws interprocess_error on error. | |
132 | bool timed_send (const void *buffer, size_type buffer_size, | |
133 | unsigned int priority, const boost::posix_time::ptime& abs_time); | |
134 | ||
135 | //!Receives a message from the message queue. The message is stored in buffer | |
136 | //!"buffer", which has size "buffer_size". The received message has size | |
137 | //!"recvd_size" and priority "priority". If the message queue is empty | |
138 | //!the receiver is blocked. Throws interprocess_error on error. | |
139 | void receive (void *buffer, size_type buffer_size, | |
140 | size_type &recvd_size,unsigned int &priority); | |
141 | ||
142 | //!Receives a message from the message queue. The message is stored in buffer | |
143 | //!"buffer", which has size "buffer_size". The received message has size | |
144 | //!"recvd_size" and priority "priority". If the message queue is empty | |
145 | //!the receiver is not blocked and returns false, otherwise returns true. | |
146 | //!Throws interprocess_error on error. | |
147 | bool try_receive (void *buffer, size_type buffer_size, | |
148 | size_type &recvd_size,unsigned int &priority); | |
149 | ||
150 | //!Receives a message from the message queue. The message is stored in buffer | |
151 | //!"buffer", which has size "buffer_size". The received message has size | |
152 | //!"recvd_size" and priority "priority". If the message queue is empty | |
153 | //!the receiver retries until time "abs_time" is reached. Returns true if | |
154 | //!the message has been successfully sent. Returns false if timeout is reached. | |
155 | //!Throws interprocess_error on error. | |
156 | bool timed_receive (void *buffer, size_type buffer_size, | |
157 | size_type &recvd_size,unsigned int &priority, | |
158 | const boost::posix_time::ptime &abs_time); | |
159 | ||
160 | //!Returns the maximum number of messages allowed by the queue. The message | |
161 | //!queue must be opened or created previously. Otherwise, returns 0. | |
162 | //!Never throws | |
163 | size_type get_max_msg() const; | |
164 | ||
165 | //!Returns the maximum size of message allowed by the queue. The message | |
166 | //!queue must be opened or created previously. Otherwise, returns 0. | |
167 | //!Never throws | |
168 | size_type get_max_msg_size() const; | |
169 | ||
170 | //!Returns the number of messages currently stored. | |
171 | //!Never throws | |
172 | size_type get_num_msg() const; | |
173 | ||
174 | //!Removes the message queue from the system. | |
175 | //!Returns false on error. Never throws | |
176 | static bool remove(const char *name); | |
177 | ||
178 | #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) | |
179 | private: | |
180 | typedef boost::posix_time::ptime ptime; | |
181 | ||
182 | friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>; | |
183 | ||
184 | bool do_receive(block_t block, | |
185 | void *buffer, size_type buffer_size, | |
186 | size_type &recvd_size, unsigned int &priority, | |
187 | const ptime &abs_time); | |
188 | ||
189 | bool do_send(block_t block, | |
190 | const void *buffer, size_type buffer_size, | |
191 | unsigned int priority, const ptime &abs_time); | |
192 | ||
193 | //!Returns the needed memory size for the shared message queue. | |
194 | //!Never throws | |
195 | static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg); | |
196 | typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; | |
197 | open_create_impl_t m_shmem; | |
198 | #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED | |
199 | }; | |
200 | ||
201 | #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) | |
202 | ||
203 | namespace ipcdetail { | |
204 | ||
205 | //!This header is the prefix of each message in the queue | |
206 | template<class VoidPointer> | |
207 | class msg_hdr_t | |
208 | { | |
209 | typedef VoidPointer void_pointer; | |
210 | typedef typename boost::intrusive:: | |
211 | pointer_traits<void_pointer>::template | |
212 | rebind_pointer<char>::type char_ptr; | |
213 | typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; | |
214 | typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type; | |
215 | ||
216 | public: | |
217 | size_type len; // Message length | |
218 | unsigned int priority;// Message priority | |
219 | //!Returns the data buffer associated with this this message | |
220 | void * data(){ return this+1; } // | |
221 | }; | |
222 | ||
223 | //!This functor is the predicate to order stored messages by priority | |
224 | template<class VoidPointer> | |
225 | class priority_functor | |
226 | { | |
227 | typedef typename boost::intrusive:: | |
228 | pointer_traits<VoidPointer>::template | |
229 | rebind_pointer<msg_hdr_t<VoidPointer> >::type msg_hdr_ptr_t; | |
230 | ||
231 | public: | |
232 | bool operator()(const msg_hdr_ptr_t &msg1, | |
233 | const msg_hdr_ptr_t &msg2) const | |
234 | { return msg1->priority < msg2->priority; } | |
235 | }; | |
236 | ||
237 | //!This header is placed in the beginning of the shared memory and contains | |
238 | //!the data to control the queue. This class initializes the shared memory | |
239 | //!in the following way: in ascending memory address with proper alignment | |
240 | //!fillings: | |
241 | //! | |
242 | //!-> mq_hdr_t: | |
243 | //! Main control block that controls the rest of the elements | |
244 | //! | |
245 | //!-> offset_ptr<msg_hdr_t> index [max_num_msg] | |
246 | //! An array of pointers with size "max_num_msg" called index. Each pointer | |
247 | //! points to a preallocated message. Elements of this array are | |
248 | //! reordered in runtime in the following way: | |
249 | //! | |
250 | //! IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined: | |
251 | //! | |
252 | //! When the current number of messages is "cur_num_msg", the array | |
253 | //! is treated like a circular buffer. Starting from position "cur_first_msg" | |
254 | //! "cur_num_msg" in a circular way, pointers point to inserted messages and the rest | |
255 | //! point to free messages. Those "cur_num_msg" pointers are | |
256 | //! ordered by the priority of the pointed message and by insertion order | |
257 | //! if two messages have the same priority. So the next message to be | |
258 | //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg] | |
259 | //! and the first free message ready to be used in a "send" operation is | |
260 | //! [cur_first_msg] if circular buffer is extended from front, | |
261 | //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise. | |
262 | //! | |
263 | //! This transforms the index in a circular buffer with an embedded free | |
264 | //! message queue. | |
265 | //! | |
266 | //! ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined): | |
267 | //! | |
268 | //! When the current number of messages is "cur_num_msg", the first | |
269 | //! "cur_num_msg" pointers point to inserted messages and the rest | |
270 | //! point to free messages. The first "cur_num_msg" pointers are | |
271 | //! ordered by the priority of the pointed message and by insertion order | |
272 | //! if two messages have the same priority. So the next message to be | |
273 | //! used in a "receive" is pointed by index [cur_num_msg-1] and the first free | |
274 | //! message ready to be used in a "send" operation is index [cur_num_msg]. | |
275 | //! | |
276 | //! This transforms the index in a fixed size priority queue with an embedded free | |
277 | //! message queue. | |
278 | //! | |
279 | //!-> struct message_t | |
280 | //! { | |
281 | //! msg_hdr_t header; | |
282 | //! char[max_msg_size] data; | |
283 | //! } messages [max_num_msg]; | |
284 | //! | |
285 | //! An array of buffers of preallocated messages, each one prefixed with the | |
286 | //! msg_hdr_t structure. Each of this message is pointed by one pointer of | |
287 | //! the index structure. | |
288 | template<class VoidPointer> | |
289 | class mq_hdr_t | |
290 | : public ipcdetail::priority_functor<VoidPointer> | |
291 | { | |
292 | typedef VoidPointer void_pointer; | |
293 | typedef msg_hdr_t<void_pointer> msg_header; | |
294 | typedef typename boost::intrusive:: | |
295 | pointer_traits<void_pointer>::template | |
296 | rebind_pointer<msg_header>::type msg_hdr_ptr_t; | |
297 | typedef typename boost::intrusive::pointer_traits | |
298 | <msg_hdr_ptr_t>::difference_type difference_type; | |
299 | typedef typename boost::container:: | |
300 | container_detail::make_unsigned<difference_type>::type size_type; | |
301 | typedef typename boost::intrusive:: | |
302 | pointer_traits<void_pointer>::template | |
303 | rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t; | |
304 | typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; | |
305 | ||
306 | public: | |
307 | //!Constructor. This object must be constructed in the beginning of the | |
308 | //!shared memory of the size returned by the function "get_mem_size". | |
309 | //!This constructor initializes the needed resources and creates | |
310 | //!the internal structures like the priority index. This can throw. | |
311 | mq_hdr_t(size_type max_num_msg, size_type max_msg_size) | |
312 | : m_max_num_msg(max_num_msg), | |
313 | m_max_msg_size(max_msg_size), | |
314 | m_cur_num_msg(0) | |
315 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
316 | ,m_cur_first_msg(0u) | |
317 | ,m_blocked_senders(0u) | |
318 | ,m_blocked_receivers(0u) | |
319 | #endif | |
320 | { this->initialize_memory(); } | |
321 | ||
322 | //!Returns true if the message queue is full | |
323 | bool is_full() const | |
324 | { return m_cur_num_msg == m_max_num_msg; } | |
325 | ||
326 | //!Returns true if the message queue is empty | |
327 | bool is_empty() const | |
328 | { return !m_cur_num_msg; } | |
329 | ||
330 | //!Frees the top priority message and saves it in the free message list | |
331 | void free_top_msg() | |
332 | { --m_cur_num_msg; } | |
333 | ||
334 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
335 | ||
336 | typedef msg_hdr_ptr_t *iterator; | |
337 | ||
338 | size_type end_pos() const | |
339 | { | |
340 | const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg; | |
341 | return space_until_bufend > m_cur_num_msg | |
342 | ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend; | |
343 | } | |
344 | ||
345 | //!Returns the inserted message with top priority | |
346 | msg_header &top_msg() | |
347 | { | |
348 | size_type pos = this->end_pos(); | |
349 | return *mp_index[pos ? --pos : m_max_num_msg - 1]; | |
350 | } | |
351 | ||
352 | //!Returns the inserted message with bottom priority | |
353 | msg_header &bottom_msg() | |
354 | { return *mp_index[m_cur_first_msg]; } | |
355 | ||
356 | iterator inserted_ptr_begin() const | |
357 | { return &mp_index[m_cur_first_msg]; } | |
358 | ||
359 | iterator inserted_ptr_end() const | |
360 | { return &mp_index[this->end_pos()]; } | |
361 | ||
362 | iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func) | |
363 | { | |
364 | iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end()); | |
365 | if(end < begin){ | |
366 | iterator idx_end = &mp_index[m_max_num_msg]; | |
367 | iterator ret = std::lower_bound(begin, idx_end, value, func); | |
368 | if(idx_end == ret){ | |
369 | iterator idx_beg = &mp_index[0]; | |
370 | ret = std::lower_bound(idx_beg, end, value, func); | |
371 | //sanity check, these cases should not call lower_bound (optimized out) | |
372 | BOOST_ASSERT(ret != end); | |
373 | BOOST_ASSERT(ret != begin); | |
374 | return ret; | |
375 | } | |
376 | else{ | |
377 | return ret; | |
378 | } | |
379 | } | |
380 | else{ | |
381 | return std::lower_bound(begin, end, value, func); | |
382 | } | |
383 | } | |
384 | ||
385 | msg_header & insert_at(iterator where) | |
386 | { | |
387 | iterator it_inserted_ptr_end = this->inserted_ptr_end(); | |
388 | iterator it_inserted_ptr_beg = this->inserted_ptr_begin(); | |
389 | if(where == it_inserted_ptr_beg){ | |
390 | //unsigned integer guarantees underflow | |
391 | m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; | |
392 | --m_cur_first_msg; | |
393 | ++m_cur_num_msg; | |
394 | return *mp_index[m_cur_first_msg]; | |
395 | } | |
396 | else if(where == it_inserted_ptr_end){ | |
397 | ++m_cur_num_msg; | |
398 | return **it_inserted_ptr_end; | |
399 | } | |
400 | else{ | |
401 | size_type pos = where - &mp_index[0]; | |
402 | size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg); | |
403 | //Check if it's more efficient to move back or move front | |
404 | if(circ_pos < m_cur_num_msg/2){ | |
405 | //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos | |
406 | //indicates two step insertion | |
407 | if(!pos){ | |
408 | pos = m_max_num_msg; | |
409 | where = &mp_index[m_max_num_msg-1]; | |
410 | } | |
411 | else{ | |
412 | --where; | |
413 | } | |
414 | const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos; | |
415 | const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u; | |
416 | const size_type first_segment_end = pos; | |
417 | const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg; | |
418 | const size_type second_segment_end = m_max_num_msg; | |
419 | const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1); | |
420 | ||
421 | //First segment | |
422 | if(!unique_segment){ | |
423 | std::copy( &mp_index[0] + second_segment_beg | |
424 | , &mp_index[0] + second_segment_end | |
425 | , &mp_index[0] + second_segment_beg - 1); | |
426 | mp_index[m_max_num_msg-1] = mp_index[0]; | |
427 | } | |
428 | std::copy( &mp_index[0] + first_segment_beg | |
429 | , &mp_index[0] + first_segment_end | |
430 | , &mp_index[0] + first_segment_beg - 1); | |
431 | *where = backup; | |
432 | m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; | |
433 | --m_cur_first_msg; | |
434 | ++m_cur_num_msg; | |
435 | return **where; | |
436 | } | |
437 | else{ | |
438 | //The queue can't be full so end_pos < m_cur_first_msg | |
439 | //indicates two step insertion | |
440 | const size_type pos_end = this->end_pos(); | |
441 | const bool unique_segment = pos < pos_end; | |
442 | const size_type first_segment_beg = pos; | |
443 | const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1; | |
444 | const size_type second_segment_beg = 0u; | |
445 | const size_type second_segment_end = unique_segment ? 0u : pos_end; | |
446 | const msg_hdr_ptr_t backup = *it_inserted_ptr_end; | |
447 | ||
448 | //First segment | |
449 | if(!unique_segment){ | |
450 | std::copy_backward( &mp_index[0] + second_segment_beg | |
451 | , &mp_index[0] + second_segment_end | |
452 | , &mp_index[0] + second_segment_end + 1); | |
453 | mp_index[0] = mp_index[m_max_num_msg-1]; | |
454 | } | |
455 | std::copy_backward( &mp_index[0] + first_segment_beg | |
456 | , &mp_index[0] + first_segment_end | |
457 | , &mp_index[0] + first_segment_end + 1); | |
458 | *where = backup; | |
459 | ++m_cur_num_msg; | |
460 | return **where; | |
461 | } | |
462 | } | |
463 | } | |
464 | ||
465 | #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
466 | ||
467 | typedef msg_hdr_ptr_t *iterator; | |
468 | ||
469 | //!Returns the inserted message with top priority | |
470 | msg_header &top_msg() | |
471 | { return *mp_index[m_cur_num_msg-1]; } | |
472 | ||
473 | //!Returns the inserted message with bottom priority | |
474 | msg_header &bottom_msg() | |
475 | { return *mp_index[0]; } | |
476 | ||
477 | iterator inserted_ptr_begin() const | |
478 | { return &mp_index[0]; } | |
479 | ||
480 | iterator inserted_ptr_end() const | |
481 | { return &mp_index[m_cur_num_msg]; } | |
482 | ||
483 | iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func) | |
484 | { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); } | |
485 | ||
486 | msg_header & insert_at(iterator pos) | |
487 | { | |
488 | const msg_hdr_ptr_t backup = *inserted_ptr_end(); | |
489 | std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1); | |
490 | *pos = backup; | |
491 | ++m_cur_num_msg; | |
492 | return **pos; | |
493 | } | |
494 | ||
495 | #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
496 | ||
497 | //!Inserts the first free message in the priority queue | |
498 | msg_header & queue_free_msg(unsigned int priority) | |
499 | { | |
500 | //Get priority queue's range | |
501 | iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end()); | |
502 | //Optimize for non-priority usage | |
503 | if(m_cur_num_msg && priority > this->bottom_msg().priority){ | |
504 | //Check for higher priority than all stored messages | |
505 | if(priority > this->top_msg().priority){ | |
506 | it = it_end; | |
507 | } | |
508 | else{ | |
509 | //Since we don't now which free message we will pick | |
510 | //build a dummy header for searches | |
511 | msg_header dummy_hdr; | |
512 | dummy_hdr.priority = priority; | |
513 | ||
514 | //Get free msg | |
515 | msg_hdr_ptr_t dummy_ptr(&dummy_hdr); | |
516 | ||
517 | //Check where the free message should be placed | |
518 | it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this)); | |
519 | } | |
520 | } | |
521 | //Insert the free message in the correct position | |
522 | return this->insert_at(it); | |
523 | } | |
524 | ||
525 | //!Returns the number of bytes needed to construct a message queue with | |
526 | //!"max_num_size" maximum number of messages and "max_msg_size" maximum | |
527 | //!message size. Never throws. | |
528 | static size_type get_mem_size | |
529 | (size_type max_msg_size, size_type max_num_msg) | |
530 | { | |
531 | const size_type | |
532 | msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value, | |
533 | index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value, | |
534 | r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, | |
535 | r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), | |
536 | r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header); | |
537 | return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + | |
538 | open_create_impl_t::ManagedOpenOrCreateUserOffset; | |
539 | } | |
540 | ||
541 | //!Initializes the memory structures to preallocate messages and constructs the | |
542 | //!message index. Never throws. | |
543 | void initialize_memory() | |
544 | { | |
545 | const size_type | |
546 | msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value, | |
547 | index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value, | |
548 | r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, | |
549 | r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), | |
550 | r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header); | |
551 | ||
552 | //Pointer to the index | |
553 | msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*> | |
554 | (reinterpret_cast<char*>(this)+r_hdr_size); | |
555 | ||
556 | //Pointer to the first message header | |
557 | msg_header *msg_hdr = reinterpret_cast<msg_header*> | |
558 | (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size); | |
559 | ||
560 | //Initialize the pointer to the index | |
561 | mp_index = index; | |
562 | ||
563 | //Initialize the index so each slot points to a preallocated message | |
564 | for(size_type i = 0; i < m_max_num_msg; ++i){ | |
565 | index[i] = msg_hdr; | |
566 | msg_hdr = reinterpret_cast<msg_header*> | |
567 | (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size); | |
568 | } | |
569 | } | |
570 | ||
571 | public: | |
572 | //Pointer to the index | |
573 | msg_hdr_ptr_ptr_t mp_index; | |
574 | //Maximum number of messages of the queue | |
575 | const size_type m_max_num_msg; | |
576 | //Maximum size of messages of the queue | |
577 | const size_type m_max_msg_size; | |
578 | //Current number of messages | |
579 | size_type m_cur_num_msg; | |
580 | //Mutex to protect data structures | |
581 | interprocess_mutex m_mutex; | |
582 | //Condition block receivers when there are no messages | |
583 | interprocess_condition m_cond_recv; | |
584 | //Condition block senders when the queue is full | |
585 | interprocess_condition m_cond_send; | |
586 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
587 | //Current start offset in the circular index | |
588 | size_type m_cur_first_msg; | |
589 | size_type m_blocked_senders; | |
590 | size_type m_blocked_receivers; | |
591 | #endif | |
592 | }; | |
593 | ||
594 | ||
595 | //!This is the atomic functor to be executed when creating or opening | |
596 | //!shared memory. Never throws | |
597 | template<class VoidPointer> | |
598 | class msg_queue_initialization_func_t | |
599 | { | |
600 | public: | |
601 | typedef typename boost::intrusive:: | |
602 | pointer_traits<VoidPointer>::template | |
603 | rebind_pointer<char>::type char_ptr; | |
604 | typedef typename boost::intrusive::pointer_traits<char_ptr>:: | |
605 | difference_type difference_type; | |
606 | typedef typename boost::container::container_detail:: | |
607 | make_unsigned<difference_type>::type size_type; | |
608 | ||
609 | msg_queue_initialization_func_t(size_type maxmsg = 0, | |
610 | size_type maxmsgsize = 0) | |
611 | : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} | |
612 | ||
613 | bool operator()(void *address, size_type, bool created) | |
614 | { | |
615 | char *mptr; | |
616 | ||
617 | if(created){ | |
618 | mptr = reinterpret_cast<char*>(address); | |
619 | //Construct the message queue header at the beginning | |
620 | BOOST_TRY{ | |
621 | new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize); | |
622 | } | |
623 | BOOST_CATCH(...){ | |
624 | return false; | |
625 | } | |
626 | BOOST_CATCH_END | |
627 | } | |
628 | return true; | |
629 | } | |
630 | ||
631 | std::size_t get_min_size() const | |
632 | { | |
633 | return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg) | |
634 | - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset; | |
635 | } | |
636 | ||
637 | const size_type m_maxmsg; | |
638 | const size_type m_maxmsgsize; | |
639 | }; | |
640 | ||
641 | } //namespace ipcdetail { | |
642 | ||
643 | template<class VoidPointer> | |
644 | inline message_queue_t<VoidPointer>::~message_queue_t() | |
645 | {} | |
646 | ||
647 | template<class VoidPointer> | |
648 | inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size | |
649 | (size_type max_msg_size, size_type max_num_msg) | |
650 | { return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); } | |
651 | ||
652 | template<class VoidPointer> | |
653 | inline message_queue_t<VoidPointer>::message_queue_t(create_only_t, | |
654 | const char *name, | |
655 | size_type max_num_msg, | |
656 | size_type max_msg_size, | |
657 | const permissions &perm) | |
658 | //Create shared memory and execute functor atomically | |
659 | : m_shmem(create_only, | |
660 | name, | |
661 | get_mem_size(max_msg_size, max_num_msg), | |
662 | read_write, | |
663 | static_cast<void*>(0), | |
664 | //Prepare initialization functor | |
665 | ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size), | |
666 | perm) | |
667 | {} | |
668 | ||
669 | template<class VoidPointer> | |
670 | inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t, | |
671 | const char *name, | |
672 | size_type max_num_msg, | |
673 | size_type max_msg_size, | |
674 | const permissions &perm) | |
675 | //Create shared memory and execute functor atomically | |
676 | : m_shmem(open_or_create, | |
677 | name, | |
678 | get_mem_size(max_msg_size, max_num_msg), | |
679 | read_write, | |
680 | static_cast<void*>(0), | |
681 | //Prepare initialization functor | |
682 | ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size), | |
683 | perm) | |
684 | {} | |
685 | ||
686 | template<class VoidPointer> | |
687 | inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name) | |
688 | //Create shared memory and execute functor atomically | |
689 | : m_shmem(open_only, | |
690 | name, | |
691 | read_write, | |
692 | static_cast<void*>(0), | |
693 | //Prepare initialization functor | |
694 | ipcdetail::msg_queue_initialization_func_t<VoidPointer> ()) | |
695 | {} | |
696 | ||
697 | template<class VoidPointer> | |
698 | inline void message_queue_t<VoidPointer>::send | |
699 | (const void *buffer, size_type buffer_size, unsigned int priority) | |
700 | { this->do_send(blocking, buffer, buffer_size, priority, ptime()); } | |
701 | ||
702 | template<class VoidPointer> | |
703 | inline bool message_queue_t<VoidPointer>::try_send | |
704 | (const void *buffer, size_type buffer_size, unsigned int priority) | |
705 | { return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); } | |
706 | ||
707 | template<class VoidPointer> | |
708 | inline bool message_queue_t<VoidPointer>::timed_send | |
709 | (const void *buffer, size_type buffer_size | |
710 | ,unsigned int priority, const boost::posix_time::ptime &abs_time) | |
711 | { | |
712 | if(abs_time == boost::posix_time::pos_infin){ | |
713 | this->send(buffer, buffer_size, priority); | |
714 | return true; | |
715 | } | |
716 | return this->do_send(timed, buffer, buffer_size, priority, abs_time); | |
717 | } | |
718 | ||
719 | template<class VoidPointer> | |
720 | inline bool message_queue_t<VoidPointer>::do_send(block_t block, | |
721 | const void *buffer, size_type buffer_size, | |
722 | unsigned int priority, const boost::posix_time::ptime &abs_time) | |
723 | { | |
724 | ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address()); | |
725 | //Check if buffer is smaller than maximum allowed | |
726 | if (buffer_size > p_hdr->m_max_msg_size) { | |
727 | throw interprocess_exception(size_error); | |
728 | } | |
729 | ||
730 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
731 | bool notify_blocked_receivers = false; | |
732 | #endif | |
733 | //--------------------------------------------- | |
734 | scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); | |
735 | //--------------------------------------------- | |
736 | { | |
737 | //If the queue is full execute blocking logic | |
738 | if (p_hdr->is_full()) { | |
739 | BOOST_TRY{ | |
740 | #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
741 | ++p_hdr->m_blocked_senders; | |
742 | #endif | |
743 | switch(block){ | |
744 | case non_blocking : | |
745 | #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
746 | --p_hdr->m_blocked_senders; | |
747 | #endif | |
748 | return false; | |
749 | break; | |
750 | ||
751 | case blocking : | |
752 | do{ | |
753 | p_hdr->m_cond_send.wait(lock); | |
754 | } | |
755 | while (p_hdr->is_full()); | |
756 | break; | |
757 | ||
758 | case timed : | |
759 | do{ | |
760 | if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){ | |
761 | if(p_hdr->is_full()){ | |
762 | #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
763 | --p_hdr->m_blocked_senders; | |
764 | #endif | |
765 | return false; | |
766 | } | |
767 | break; | |
768 | } | |
769 | } | |
770 | while (p_hdr->is_full()); | |
771 | break; | |
772 | default: | |
773 | break; | |
774 | } | |
775 | #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
776 | --p_hdr->m_blocked_senders; | |
777 | #endif | |
778 | } | |
779 | BOOST_CATCH(...){ | |
780 | #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
781 | --p_hdr->m_blocked_senders; | |
782 | #endif | |
783 | BOOST_RETHROW; | |
784 | } | |
785 | BOOST_CATCH_END | |
786 | } | |
787 | ||
788 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
789 | notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers; | |
790 | #endif | |
791 | //Insert the first free message in the priority queue | |
792 | ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority); | |
793 | ||
794 | //Sanity check, free msgs are always cleaned when received | |
795 | BOOST_ASSERT(free_msg_hdr.priority == 0); | |
796 | BOOST_ASSERT(free_msg_hdr.len == 0); | |
797 | ||
798 | //Copy control data to the free message | |
799 | free_msg_hdr.priority = priority; | |
800 | free_msg_hdr.len = buffer_size; | |
801 | ||
802 | //Copy user buffer to the message | |
803 | std::memcpy(free_msg_hdr.data(), buffer, buffer_size); | |
804 | } // Lock end | |
805 | ||
806 | //Notify outside lock to avoid contention. This might produce some | |
807 | //spurious wakeups, but it's usually far better than notifying inside. | |
808 | //If this message changes the queue empty state, notify it to receivers | |
809 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
810 | if (notify_blocked_receivers){ | |
811 | p_hdr->m_cond_recv.notify_one(); | |
812 | } | |
813 | #else | |
814 | p_hdr->m_cond_recv.notify_one(); | |
815 | #endif | |
816 | ||
817 | return true; | |
818 | } | |
819 | ||
820 | template<class VoidPointer> | |
821 | inline void message_queue_t<VoidPointer>::receive(void *buffer, size_type buffer_size, | |
822 | size_type &recvd_size, unsigned int &priority) | |
823 | { this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); } | |
824 | ||
825 | template<class VoidPointer> | |
826 | inline bool | |
827 | message_queue_t<VoidPointer>::try_receive(void *buffer, size_type buffer_size, | |
828 | size_type &recvd_size, unsigned int &priority) | |
829 | { return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); } | |
830 | ||
831 | template<class VoidPointer> | |
832 | inline bool | |
833 | message_queue_t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size, | |
834 | size_type &recvd_size, unsigned int &priority, | |
835 | const boost::posix_time::ptime &abs_time) | |
836 | { | |
837 | if(abs_time == boost::posix_time::pos_infin){ | |
838 | this->receive(buffer, buffer_size, recvd_size, priority); | |
839 | return true; | |
840 | } | |
841 | return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time); | |
842 | } | |
843 | ||
844 | template<class VoidPointer> | |
845 | inline bool | |
846 | message_queue_t<VoidPointer>::do_receive(block_t block, | |
847 | void *buffer, size_type buffer_size, | |
848 | size_type &recvd_size, unsigned int &priority, | |
849 | const boost::posix_time::ptime &abs_time) | |
850 | { | |
851 | ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address()); | |
852 | //Check if buffer is big enough for any message | |
853 | if (buffer_size < p_hdr->m_max_msg_size) { | |
854 | throw interprocess_exception(size_error); | |
855 | } | |
856 | ||
857 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
858 | bool notify_blocked_senders = false; | |
859 | #endif | |
860 | //--------------------------------------------- | |
861 | scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); | |
862 | //--------------------------------------------- | |
863 | { | |
864 | //If there are no messages execute blocking logic | |
865 | if (p_hdr->is_empty()) { | |
866 | BOOST_TRY{ | |
867 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
868 | ++p_hdr->m_blocked_receivers; | |
869 | #endif | |
870 | switch(block){ | |
871 | case non_blocking : | |
872 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
873 | --p_hdr->m_blocked_receivers; | |
874 | #endif | |
875 | return false; | |
876 | break; | |
877 | ||
878 | case blocking : | |
879 | do{ | |
880 | p_hdr->m_cond_recv.wait(lock); | |
881 | } | |
882 | while (p_hdr->is_empty()); | |
883 | break; | |
884 | ||
885 | case timed : | |
886 | do{ | |
887 | if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){ | |
888 | if(p_hdr->is_empty()){ | |
889 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
890 | --p_hdr->m_blocked_receivers; | |
891 | #endif | |
892 | return false; | |
893 | } | |
894 | break; | |
895 | } | |
896 | } | |
897 | while (p_hdr->is_empty()); | |
898 | break; | |
899 | ||
900 | //Paranoia check | |
901 | default: | |
902 | break; | |
903 | } | |
904 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
905 | --p_hdr->m_blocked_receivers; | |
906 | #endif | |
907 | } | |
908 | BOOST_CATCH(...){ | |
909 | #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
910 | --p_hdr->m_blocked_receivers; | |
911 | #endif | |
912 | BOOST_RETHROW; | |
913 | } | |
914 | BOOST_CATCH_END | |
915 | } | |
916 | ||
917 | #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
918 | notify_blocked_senders = 0 != p_hdr->m_blocked_senders; | |
919 | #endif | |
920 | ||
921 | //There is at least one message ready to pick, get the top one | |
922 | ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg(); | |
923 | ||
924 | //Get data from the message | |
925 | recvd_size = top_msg.len; | |
926 | priority = top_msg.priority; | |
927 | ||
928 | //Some cleanup to ease debugging | |
929 | top_msg.len = 0; | |
930 | top_msg.priority = 0; | |
931 | ||
932 | //Copy data to receiver's bufers | |
933 | std::memcpy(buffer, top_msg.data(), recvd_size); | |
934 | ||
935 | //Free top message and put it in the free message list | |
936 | p_hdr->free_top_msg(); | |
937 | } //Lock end | |
938 | ||
939 | //Notify outside lock to avoid contention. This might produce some | |
940 | //spurious wakeups, but it's usually far better than notifying inside. | |
941 | //If this reception changes the queue full state, notify senders | |
942 | #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
943 | if (notify_blocked_senders){ | |
944 | p_hdr->m_cond_send.notify_one(); | |
945 | } | |
946 | #else | |
947 | p_hdr->m_cond_send.notify_one(); | |
948 | #endif | |
949 | ||
950 | return true; | |
951 | } | |
952 | ||
953 | template<class VoidPointer> | |
954 | inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const | |
955 | { | |
956 | ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address()); | |
957 | return p_hdr ? p_hdr->m_max_num_msg : 0; } | |
958 | ||
959 | template<class VoidPointer> | |
960 | inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const | |
961 | { | |
962 | ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address()); | |
963 | return p_hdr ? p_hdr->m_max_msg_size : 0; | |
964 | } | |
965 | ||
966 | template<class VoidPointer> | |
967 | inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const | |
968 | { | |
969 | ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address()); | |
970 | if(p_hdr){ | |
971 | //--------------------------------------------- | |
972 | scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); | |
973 | //--------------------------------------------- | |
974 | return p_hdr->m_cur_num_msg; | |
975 | } | |
976 | ||
977 | return 0; | |
978 | } | |
979 | ||
980 | template<class VoidPointer> | |
981 | inline bool message_queue_t<VoidPointer>::remove(const char *name) | |
982 | { return shared_memory_object::remove(name); } | |
983 | ||
984 | #else | |
985 | ||
986 | //!Typedef for a default message queue | |
987 | //!to be used between processes | |
988 | typedef message_queue_t<offset_ptr<void> > message_queue; | |
989 | ||
990 | #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED | |
991 | ||
992 | }} //namespace boost{ namespace interprocess{ | |
993 | ||
994 | #include <boost/interprocess/detail/config_end.hpp> | |
995 | ||
996 | #endif //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP |