]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/interprocess/include/boost/interprocess/ipc/message_queue.hpp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / boost / libs / interprocess / include / boost / interprocess / ipc / message_queue.hpp
CommitLineData
7c673cae
FG
1//////////////////////////////////////////////////////////////////////////////
2//
3// (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
4// Software License, Version 1.0. (See accompanying file
5// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// See http://www.boost.org/libs/interprocess for documentation.
8//
9//////////////////////////////////////////////////////////////////////////////
10
11#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
12#define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
13
14#ifndef BOOST_CONFIG_HPP
15# include <boost/config.hpp>
16#endif
17#
18#if defined(BOOST_HAS_PRAGMA_ONCE)
19# pragma once
20#endif
21
22#include <boost/interprocess/detail/config_begin.hpp>
23#include <boost/interprocess/detail/workaround.hpp>
24
25#include <boost/interprocess/shared_memory_object.hpp>
26#include <boost/interprocess/detail/managed_open_or_create_impl.hpp>
27#include <boost/interprocess/sync/interprocess_condition.hpp>
28#include <boost/interprocess/sync/interprocess_mutex.hpp>
29#include <boost/interprocess/sync/scoped_lock.hpp>
30#include <boost/interprocess/detail/utilities.hpp>
31#include <boost/interprocess/offset_ptr.hpp>
32#include <boost/interprocess/creation_tags.hpp>
33#include <boost/interprocess/exceptions.hpp>
34#include <boost/interprocess/permissions.hpp>
35#include <boost/core/no_exceptions_support.hpp>
36#include <boost/interprocess/detail/type_traits.hpp>
37#include <boost/intrusive/pointer_traits.hpp>
38#include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of
39#include <boost/intrusive/pointer_traits.hpp>
40#include <boost/assert.hpp>
41#include <algorithm> //std::lower_bound
42#include <cstddef> //std::size_t
43#include <cstring> //memcpy
44
45
46//!\file
47//!Describes an inter-process message queue. This class allows sending
48//!messages between processes and allows blocking, non-blocking and timed
49//!sending and receiving.
50
51namespace boost{ namespace interprocess{
52
53namespace ipcdetail
54{
55 template<class VoidPointer>
56 class msg_queue_initialization_func_t;
57}
58
59//!A class that allows sending messages
60//!between processes.
61template<class VoidPointer>
62class message_queue_t
63{
64 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
65 //Blocking modes
66 enum block_t { blocking, timed, non_blocking };
67
68 message_queue_t();
69 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
70
71 public:
72 typedef VoidPointer void_pointer;
73 typedef typename boost::intrusive::
74 pointer_traits<void_pointer>::template
75 rebind_pointer<char>::type char_ptr;
76 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
77 typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type;
78
79 //!Creates a process shared message queue with name "name". For this message queue,
80 //!the maximum number of messages will be "max_num_msg" and the maximum message size
81 //!will be "max_msg_size". Throws on error and if the queue was previously created.
82 message_queue_t(create_only_t create_only,
83 const char *name,
84 size_type max_num_msg,
85 size_type max_msg_size,
86 const permissions &perm = permissions());
87
88 //!Opens or creates a process shared message queue with name "name".
89 //!If the queue is created, the maximum number of messages will be "max_num_msg"
90 //!and the maximum message size will be "max_msg_size". If queue was previously
91 //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters
92 //!are ignored. Throws on error.
93 message_queue_t(open_or_create_t open_or_create,
94 const char *name,
95 size_type max_num_msg,
96 size_type max_msg_size,
97 const permissions &perm = permissions());
98
99 //!Opens a previously created process shared message queue with name "name".
100 //!If the queue was not previously created or there are no free resources,
101 //!throws an error.
102 message_queue_t(open_only_t open_only,
103 const char *name);
104
105 //!Destroys *this and indicates that the calling process is finished using
106 //!the resource. All opened message queues are still
107 //!valid after destruction. The destructor function will deallocate
108 //!any system resources allocated by the system for use by this process for
109 //!this resource. The resource can still be opened again calling
110 //!the open constructor overload. To erase the message queue from the system
111 //!use remove().
112 ~message_queue_t();
113
114 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
115 //!message queue with priority "priority". If the message queue is full
116 //!the sender is blocked. Throws interprocess_error on error.
117 void send (const void *buffer, size_type buffer_size,
118 unsigned int priority);
119
120 //!Sends a message stored in buffer "buffer" with size "buffer_size" through the
121 //!message queue with priority "priority". If the message queue is full
122 //!the sender is not blocked and returns false, otherwise returns true.
123 //!Throws interprocess_error on error.
124 bool try_send (const void *buffer, size_type buffer_size,
125 unsigned int priority);
126
127 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
128 //!message queue with priority "priority". If the message queue is full
129 //!the sender retries until time "abs_time" is reached. Returns true if
130 //!the message has been successfully sent. Returns false if timeout is reached.
131 //!Throws interprocess_error on error.
132 bool timed_send (const void *buffer, size_type buffer_size,
133 unsigned int priority, const boost::posix_time::ptime& abs_time);
134
135 //!Receives a message from the message queue. The message is stored in buffer
136 //!"buffer", which has size "buffer_size". The received message has size
137 //!"recvd_size" and priority "priority". If the message queue is empty
138 //!the receiver is blocked. Throws interprocess_error on error.
139 void receive (void *buffer, size_type buffer_size,
140 size_type &recvd_size,unsigned int &priority);
141
142 //!Receives a message from the message queue. The message is stored in buffer
143 //!"buffer", which has size "buffer_size". The received message has size
144 //!"recvd_size" and priority "priority". If the message queue is empty
145 //!the receiver is not blocked and returns false, otherwise returns true.
146 //!Throws interprocess_error on error.
147 bool try_receive (void *buffer, size_type buffer_size,
148 size_type &recvd_size,unsigned int &priority);
149
150 //!Receives a message from the message queue. The message is stored in buffer
151 //!"buffer", which has size "buffer_size". The received message has size
152 //!"recvd_size" and priority "priority". If the message queue is empty
153 //!the receiver retries until time "abs_time" is reached. Returns true if
154 //!the message has been successfully sent. Returns false if timeout is reached.
155 //!Throws interprocess_error on error.
156 bool timed_receive (void *buffer, size_type buffer_size,
157 size_type &recvd_size,unsigned int &priority,
158 const boost::posix_time::ptime &abs_time);
159
160 //!Returns the maximum number of messages allowed by the queue. The message
161 //!queue must be opened or created previously. Otherwise, returns 0.
162 //!Never throws
163 size_type get_max_msg() const;
164
165 //!Returns the maximum size of message allowed by the queue. The message
166 //!queue must be opened or created previously. Otherwise, returns 0.
167 //!Never throws
168 size_type get_max_msg_size() const;
169
170 //!Returns the number of messages currently stored.
171 //!Never throws
172 size_type get_num_msg() const;
173
174 //!Removes the message queue from the system.
175 //!Returns false on error. Never throws
176 static bool remove(const char *name);
177
178 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
179 private:
180 typedef boost::posix_time::ptime ptime;
181
182 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
183
184 bool do_receive(block_t block,
185 void *buffer, size_type buffer_size,
186 size_type &recvd_size, unsigned int &priority,
187 const ptime &abs_time);
188
189 bool do_send(block_t block,
190 const void *buffer, size_type buffer_size,
191 unsigned int priority, const ptime &abs_time);
192
193 //!Returns the needed memory size for the shared message queue.
194 //!Never throws
195 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
196 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
197 open_create_impl_t m_shmem;
198 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
199};
200
201#if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
202
203namespace ipcdetail {
204
205//!This header is the prefix of each message in the queue
206template<class VoidPointer>
207class msg_hdr_t
208{
209 typedef VoidPointer void_pointer;
210 typedef typename boost::intrusive::
211 pointer_traits<void_pointer>::template
212 rebind_pointer<char>::type char_ptr;
213 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
214 typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type;
215
216 public:
217 size_type len; // Message length
218 unsigned int priority;// Message priority
219 //!Returns the data buffer associated with this this message
220 void * data(){ return this+1; } //
221};
222
223//!This functor is the predicate to order stored messages by priority
224template<class VoidPointer>
225class priority_functor
226{
227 typedef typename boost::intrusive::
228 pointer_traits<VoidPointer>::template
229 rebind_pointer<msg_hdr_t<VoidPointer> >::type msg_hdr_ptr_t;
230
231 public:
232 bool operator()(const msg_hdr_ptr_t &msg1,
233 const msg_hdr_ptr_t &msg2) const
234 { return msg1->priority < msg2->priority; }
235};
236
237//!This header is placed in the beginning of the shared memory and contains
238//!the data to control the queue. This class initializes the shared memory
239//!in the following way: in ascending memory address with proper alignment
240//!fillings:
241//!
242//!-> mq_hdr_t:
243//! Main control block that controls the rest of the elements
244//!
245//!-> offset_ptr<msg_hdr_t> index [max_num_msg]
246//! An array of pointers with size "max_num_msg" called index. Each pointer
247//! points to a preallocated message. Elements of this array are
248//! reordered in runtime in the following way:
249//!
250//! IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined:
251//!
252//! When the current number of messages is "cur_num_msg", the array
253//! is treated like a circular buffer. Starting from position "cur_first_msg"
254//! "cur_num_msg" in a circular way, pointers point to inserted messages and the rest
255//! point to free messages. Those "cur_num_msg" pointers are
256//! ordered by the priority of the pointed message and by insertion order
257//! if two messages have the same priority. So the next message to be
258//! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg]
259//! and the first free message ready to be used in a "send" operation is
260//! [cur_first_msg] if circular buffer is extended from front,
261//! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise.
262//!
263//! This transforms the index in a circular buffer with an embedded free
264//! message queue.
265//!
266//! ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined):
267//!
268//! When the current number of messages is "cur_num_msg", the first
269//! "cur_num_msg" pointers point to inserted messages and the rest
270//! point to free messages. The first "cur_num_msg" pointers are
271//! ordered by the priority of the pointed message and by insertion order
272//! if two messages have the same priority. So the next message to be
273//! used in a "receive" is pointed by index [cur_num_msg-1] and the first free
274//! message ready to be used in a "send" operation is index [cur_num_msg].
275//!
276//! This transforms the index in a fixed size priority queue with an embedded free
277//! message queue.
278//!
279//!-> struct message_t
280//! {
281//! msg_hdr_t header;
282//! char[max_msg_size] data;
283//! } messages [max_num_msg];
284//!
285//! An array of buffers of preallocated messages, each one prefixed with the
286//! msg_hdr_t structure. Each of this message is pointed by one pointer of
287//! the index structure.
288template<class VoidPointer>
289class mq_hdr_t
290 : public ipcdetail::priority_functor<VoidPointer>
291{
292 typedef VoidPointer void_pointer;
293 typedef msg_hdr_t<void_pointer> msg_header;
294 typedef typename boost::intrusive::
295 pointer_traits<void_pointer>::template
296 rebind_pointer<msg_header>::type msg_hdr_ptr_t;
297 typedef typename boost::intrusive::pointer_traits
298 <msg_hdr_ptr_t>::difference_type difference_type;
299 typedef typename boost::container::
300 container_detail::make_unsigned<difference_type>::type size_type;
301 typedef typename boost::intrusive::
302 pointer_traits<void_pointer>::template
303 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t;
304 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
305
306 public:
307 //!Constructor. This object must be constructed in the beginning of the
308 //!shared memory of the size returned by the function "get_mem_size".
309 //!This constructor initializes the needed resources and creates
310 //!the internal structures like the priority index. This can throw.
311 mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
312 : m_max_num_msg(max_num_msg),
313 m_max_msg_size(max_msg_size),
314 m_cur_num_msg(0)
315 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
316 ,m_cur_first_msg(0u)
317 ,m_blocked_senders(0u)
318 ,m_blocked_receivers(0u)
319 #endif
320 { this->initialize_memory(); }
321
322 //!Returns true if the message queue is full
323 bool is_full() const
324 { return m_cur_num_msg == m_max_num_msg; }
325
326 //!Returns true if the message queue is empty
327 bool is_empty() const
328 { return !m_cur_num_msg; }
329
330 //!Frees the top priority message and saves it in the free message list
331 void free_top_msg()
332 { --m_cur_num_msg; }
333
334 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
335
336 typedef msg_hdr_ptr_t *iterator;
337
338 size_type end_pos() const
339 {
340 const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
341 return space_until_bufend > m_cur_num_msg
342 ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
343 }
344
345 //!Returns the inserted message with top priority
346 msg_header &top_msg()
347 {
348 size_type pos = this->end_pos();
349 return *mp_index[pos ? --pos : m_max_num_msg - 1];
350 }
351
352 //!Returns the inserted message with bottom priority
353 msg_header &bottom_msg()
354 { return *mp_index[m_cur_first_msg]; }
355
356 iterator inserted_ptr_begin() const
357 { return &mp_index[m_cur_first_msg]; }
358
359 iterator inserted_ptr_end() const
360 { return &mp_index[this->end_pos()]; }
361
362 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
363 {
364 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
365 if(end < begin){
366 iterator idx_end = &mp_index[m_max_num_msg];
367 iterator ret = std::lower_bound(begin, idx_end, value, func);
368 if(idx_end == ret){
369 iterator idx_beg = &mp_index[0];
370 ret = std::lower_bound(idx_beg, end, value, func);
371 //sanity check, these cases should not call lower_bound (optimized out)
372 BOOST_ASSERT(ret != end);
373 BOOST_ASSERT(ret != begin);
374 return ret;
375 }
376 else{
377 return ret;
378 }
379 }
380 else{
381 return std::lower_bound(begin, end, value, func);
382 }
383 }
384
385 msg_header & insert_at(iterator where)
386 {
387 iterator it_inserted_ptr_end = this->inserted_ptr_end();
388 iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
389 if(where == it_inserted_ptr_beg){
390 //unsigned integer guarantees underflow
391 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
392 --m_cur_first_msg;
393 ++m_cur_num_msg;
394 return *mp_index[m_cur_first_msg];
395 }
396 else if(where == it_inserted_ptr_end){
397 ++m_cur_num_msg;
398 return **it_inserted_ptr_end;
399 }
400 else{
401 size_type pos = where - &mp_index[0];
402 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
403 //Check if it's more efficient to move back or move front
404 if(circ_pos < m_cur_num_msg/2){
405 //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos
406 //indicates two step insertion
407 if(!pos){
408 pos = m_max_num_msg;
409 where = &mp_index[m_max_num_msg-1];
410 }
411 else{
412 --where;
413 }
414 const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
415 const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u;
416 const size_type first_segment_end = pos;
417 const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
418 const size_type second_segment_end = m_max_num_msg;
419 const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1);
420
421 //First segment
422 if(!unique_segment){
423 std::copy( &mp_index[0] + second_segment_beg
424 , &mp_index[0] + second_segment_end
425 , &mp_index[0] + second_segment_beg - 1);
426 mp_index[m_max_num_msg-1] = mp_index[0];
427 }
428 std::copy( &mp_index[0] + first_segment_beg
429 , &mp_index[0] + first_segment_end
430 , &mp_index[0] + first_segment_beg - 1);
431 *where = backup;
432 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
433 --m_cur_first_msg;
434 ++m_cur_num_msg;
435 return **where;
436 }
437 else{
438 //The queue can't be full so end_pos < m_cur_first_msg
439 //indicates two step insertion
440 const size_type pos_end = this->end_pos();
441 const bool unique_segment = pos < pos_end;
442 const size_type first_segment_beg = pos;
443 const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1;
444 const size_type second_segment_beg = 0u;
445 const size_type second_segment_end = unique_segment ? 0u : pos_end;
446 const msg_hdr_ptr_t backup = *it_inserted_ptr_end;
447
448 //First segment
449 if(!unique_segment){
450 std::copy_backward( &mp_index[0] + second_segment_beg
451 , &mp_index[0] + second_segment_end
452 , &mp_index[0] + second_segment_end + 1);
453 mp_index[0] = mp_index[m_max_num_msg-1];
454 }
455 std::copy_backward( &mp_index[0] + first_segment_beg
456 , &mp_index[0] + first_segment_end
457 , &mp_index[0] + first_segment_end + 1);
458 *where = backup;
459 ++m_cur_num_msg;
460 return **where;
461 }
462 }
463 }
464
465 #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
466
467 typedef msg_hdr_ptr_t *iterator;
468
469 //!Returns the inserted message with top priority
470 msg_header &top_msg()
471 { return *mp_index[m_cur_num_msg-1]; }
472
473 //!Returns the inserted message with bottom priority
474 msg_header &bottom_msg()
475 { return *mp_index[0]; }
476
477 iterator inserted_ptr_begin() const
478 { return &mp_index[0]; }
479
480 iterator inserted_ptr_end() const
481 { return &mp_index[m_cur_num_msg]; }
482
483 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
484 { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); }
485
486 msg_header & insert_at(iterator pos)
487 {
488 const msg_hdr_ptr_t backup = *inserted_ptr_end();
489 std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
490 *pos = backup;
491 ++m_cur_num_msg;
492 return **pos;
493 }
494
495 #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
496
497 //!Inserts the first free message in the priority queue
498 msg_header & queue_free_msg(unsigned int priority)
499 {
500 //Get priority queue's range
501 iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end());
502 //Optimize for non-priority usage
503 if(m_cur_num_msg && priority > this->bottom_msg().priority){
504 //Check for higher priority than all stored messages
505 if(priority > this->top_msg().priority){
506 it = it_end;
507 }
508 else{
509 //Since we don't now which free message we will pick
510 //build a dummy header for searches
511 msg_header dummy_hdr;
512 dummy_hdr.priority = priority;
513
514 //Get free msg
515 msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
516
517 //Check where the free message should be placed
518 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
519 }
520 }
521 //Insert the free message in the correct position
522 return this->insert_at(it);
523 }
524
525 //!Returns the number of bytes needed to construct a message queue with
526 //!"max_num_size" maximum number of messages and "max_msg_size" maximum
527 //!message size. Never throws.
528 static size_type get_mem_size
529 (size_type max_msg_size, size_type max_num_msg)
530 {
531 const size_type
532 msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value,
533 index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value,
534 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
535 r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
536 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header);
537 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
538 open_create_impl_t::ManagedOpenOrCreateUserOffset;
539 }
540
541 //!Initializes the memory structures to preallocate messages and constructs the
542 //!message index. Never throws.
543 void initialize_memory()
544 {
545 const size_type
546 msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value,
547 index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value,
548 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
549 r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
550 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
551
552 //Pointer to the index
553 msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*>
554 (reinterpret_cast<char*>(this)+r_hdr_size);
555
556 //Pointer to the first message header
557 msg_header *msg_hdr = reinterpret_cast<msg_header*>
558 (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
559
560 //Initialize the pointer to the index
561 mp_index = index;
562
563 //Initialize the index so each slot points to a preallocated message
564 for(size_type i = 0; i < m_max_num_msg; ++i){
565 index[i] = msg_hdr;
566 msg_hdr = reinterpret_cast<msg_header*>
567 (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);
568 }
569 }
570
571 public:
572 //Pointer to the index
573 msg_hdr_ptr_ptr_t mp_index;
574 //Maximum number of messages of the queue
575 const size_type m_max_num_msg;
576 //Maximum size of messages of the queue
577 const size_type m_max_msg_size;
578 //Current number of messages
579 size_type m_cur_num_msg;
580 //Mutex to protect data structures
581 interprocess_mutex m_mutex;
582 //Condition block receivers when there are no messages
583 interprocess_condition m_cond_recv;
584 //Condition block senders when the queue is full
585 interprocess_condition m_cond_send;
586 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
587 //Current start offset in the circular index
588 size_type m_cur_first_msg;
589 size_type m_blocked_senders;
590 size_type m_blocked_receivers;
591 #endif
592};
593
594
595//!This is the atomic functor to be executed when creating or opening
596//!shared memory. Never throws
597template<class VoidPointer>
598class msg_queue_initialization_func_t
599{
600 public:
601 typedef typename boost::intrusive::
602 pointer_traits<VoidPointer>::template
603 rebind_pointer<char>::type char_ptr;
604 typedef typename boost::intrusive::pointer_traits<char_ptr>::
605 difference_type difference_type;
606 typedef typename boost::container::container_detail::
607 make_unsigned<difference_type>::type size_type;
608
609 msg_queue_initialization_func_t(size_type maxmsg = 0,
610 size_type maxmsgsize = 0)
611 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
612
613 bool operator()(void *address, size_type, bool created)
614 {
615 char *mptr;
616
617 if(created){
618 mptr = reinterpret_cast<char*>(address);
619 //Construct the message queue header at the beginning
620 BOOST_TRY{
621 new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
622 }
623 BOOST_CATCH(...){
624 return false;
625 }
626 BOOST_CATCH_END
627 }
628 return true;
629 }
630
631 std::size_t get_min_size() const
632 {
633 return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg)
634 - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset;
635 }
636
637 const size_type m_maxmsg;
638 const size_type m_maxmsgsize;
639};
640
641} //namespace ipcdetail {
642
643template<class VoidPointer>
644inline message_queue_t<VoidPointer>::~message_queue_t()
645{}
646
647template<class VoidPointer>
648inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
649 (size_type max_msg_size, size_type max_num_msg)
650{ return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); }
651
652template<class VoidPointer>
653inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
654 const char *name,
655 size_type max_num_msg,
656 size_type max_msg_size,
657 const permissions &perm)
658 //Create shared memory and execute functor atomically
659 : m_shmem(create_only,
660 name,
661 get_mem_size(max_msg_size, max_num_msg),
662 read_write,
663 static_cast<void*>(0),
664 //Prepare initialization functor
665 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
666 perm)
667{}
668
669template<class VoidPointer>
670inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
671 const char *name,
672 size_type max_num_msg,
673 size_type max_msg_size,
674 const permissions &perm)
675 //Create shared memory and execute functor atomically
676 : m_shmem(open_or_create,
677 name,
678 get_mem_size(max_msg_size, max_num_msg),
679 read_write,
680 static_cast<void*>(0),
681 //Prepare initialization functor
682 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
683 perm)
684{}
685
686template<class VoidPointer>
687inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name)
688 //Create shared memory and execute functor atomically
689 : m_shmem(open_only,
690 name,
691 read_write,
692 static_cast<void*>(0),
693 //Prepare initialization functor
694 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
695{}
696
697template<class VoidPointer>
698inline void message_queue_t<VoidPointer>::send
699 (const void *buffer, size_type buffer_size, unsigned int priority)
700{ this->do_send(blocking, buffer, buffer_size, priority, ptime()); }
701
702template<class VoidPointer>
703inline bool message_queue_t<VoidPointer>::try_send
704 (const void *buffer, size_type buffer_size, unsigned int priority)
705{ return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); }
706
707template<class VoidPointer>
708inline bool message_queue_t<VoidPointer>::timed_send
709 (const void *buffer, size_type buffer_size
710 ,unsigned int priority, const boost::posix_time::ptime &abs_time)
711{
712 if(abs_time == boost::posix_time::pos_infin){
713 this->send(buffer, buffer_size, priority);
714 return true;
715 }
716 return this->do_send(timed, buffer, buffer_size, priority, abs_time);
717}
718
719template<class VoidPointer>
720inline bool message_queue_t<VoidPointer>::do_send(block_t block,
721 const void *buffer, size_type buffer_size,
722 unsigned int priority, const boost::posix_time::ptime &abs_time)
723{
724 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
725 //Check if buffer is smaller than maximum allowed
726 if (buffer_size > p_hdr->m_max_msg_size) {
727 throw interprocess_exception(size_error);
728 }
729
730 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
731 bool notify_blocked_receivers = false;
732 #endif
733 //---------------------------------------------
734 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
735 //---------------------------------------------
736 {
737 //If the queue is full execute blocking logic
738 if (p_hdr->is_full()) {
739 BOOST_TRY{
740 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
741 ++p_hdr->m_blocked_senders;
742 #endif
743 switch(block){
744 case non_blocking :
745 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
746 --p_hdr->m_blocked_senders;
747 #endif
748 return false;
749 break;
750
751 case blocking :
752 do{
753 p_hdr->m_cond_send.wait(lock);
754 }
755 while (p_hdr->is_full());
756 break;
757
758 case timed :
759 do{
760 if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
761 if(p_hdr->is_full()){
762 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
763 --p_hdr->m_blocked_senders;
764 #endif
765 return false;
766 }
767 break;
768 }
769 }
770 while (p_hdr->is_full());
771 break;
772 default:
773 break;
774 }
775 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
776 --p_hdr->m_blocked_senders;
777 #endif
778 }
779 BOOST_CATCH(...){
780 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
781 --p_hdr->m_blocked_senders;
782 #endif
783 BOOST_RETHROW;
784 }
785 BOOST_CATCH_END
786 }
787
788 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
789 notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
790 #endif
791 //Insert the first free message in the priority queue
792 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
793
794 //Sanity check, free msgs are always cleaned when received
795 BOOST_ASSERT(free_msg_hdr.priority == 0);
796 BOOST_ASSERT(free_msg_hdr.len == 0);
797
798 //Copy control data to the free message
799 free_msg_hdr.priority = priority;
800 free_msg_hdr.len = buffer_size;
801
802 //Copy user buffer to the message
803 std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
804 } // Lock end
805
806 //Notify outside lock to avoid contention. This might produce some
807 //spurious wakeups, but it's usually far better than notifying inside.
808 //If this message changes the queue empty state, notify it to receivers
809 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
810 if (notify_blocked_receivers){
811 p_hdr->m_cond_recv.notify_one();
812 }
813 #else
814 p_hdr->m_cond_recv.notify_one();
815 #endif
816
817 return true;
818}
819
820template<class VoidPointer>
821inline void message_queue_t<VoidPointer>::receive(void *buffer, size_type buffer_size,
822 size_type &recvd_size, unsigned int &priority)
823{ this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
824
825template<class VoidPointer>
826inline bool
827 message_queue_t<VoidPointer>::try_receive(void *buffer, size_type buffer_size,
828 size_type &recvd_size, unsigned int &priority)
829{ return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
830
831template<class VoidPointer>
832inline bool
833 message_queue_t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size,
834 size_type &recvd_size, unsigned int &priority,
835 const boost::posix_time::ptime &abs_time)
836{
837 if(abs_time == boost::posix_time::pos_infin){
838 this->receive(buffer, buffer_size, recvd_size, priority);
839 return true;
840 }
841 return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time);
842}
843
844template<class VoidPointer>
845inline bool
846 message_queue_t<VoidPointer>::do_receive(block_t block,
847 void *buffer, size_type buffer_size,
848 size_type &recvd_size, unsigned int &priority,
849 const boost::posix_time::ptime &abs_time)
850{
851 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
852 //Check if buffer is big enough for any message
853 if (buffer_size < p_hdr->m_max_msg_size) {
854 throw interprocess_exception(size_error);
855 }
856
857 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
858 bool notify_blocked_senders = false;
859 #endif
860 //---------------------------------------------
861 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
862 //---------------------------------------------
863 {
864 //If there are no messages execute blocking logic
865 if (p_hdr->is_empty()) {
866 BOOST_TRY{
867 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
868 ++p_hdr->m_blocked_receivers;
869 #endif
870 switch(block){
871 case non_blocking :
872 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
873 --p_hdr->m_blocked_receivers;
874 #endif
875 return false;
876 break;
877
878 case blocking :
879 do{
880 p_hdr->m_cond_recv.wait(lock);
881 }
882 while (p_hdr->is_empty());
883 break;
884
885 case timed :
886 do{
887 if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
888 if(p_hdr->is_empty()){
889 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
890 --p_hdr->m_blocked_receivers;
891 #endif
892 return false;
893 }
894 break;
895 }
896 }
897 while (p_hdr->is_empty());
898 break;
899
900 //Paranoia check
901 default:
902 break;
903 }
904 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
905 --p_hdr->m_blocked_receivers;
906 #endif
907 }
908 BOOST_CATCH(...){
909 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
910 --p_hdr->m_blocked_receivers;
911 #endif
912 BOOST_RETHROW;
913 }
914 BOOST_CATCH_END
915 }
916
917 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
918 notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
919 #endif
920
921 //There is at least one message ready to pick, get the top one
922 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
923
924 //Get data from the message
925 recvd_size = top_msg.len;
926 priority = top_msg.priority;
927
928 //Some cleanup to ease debugging
929 top_msg.len = 0;
930 top_msg.priority = 0;
931
932 //Copy data to receiver's bufers
933 std::memcpy(buffer, top_msg.data(), recvd_size);
934
935 //Free top message and put it in the free message list
936 p_hdr->free_top_msg();
937 } //Lock end
938
939 //Notify outside lock to avoid contention. This might produce some
940 //spurious wakeups, but it's usually far better than notifying inside.
941 //If this reception changes the queue full state, notify senders
942 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
943 if (notify_blocked_senders){
944 p_hdr->m_cond_send.notify_one();
945 }
946 #else
947 p_hdr->m_cond_send.notify_one();
948 #endif
949
950 return true;
951}
952
953template<class VoidPointer>
954inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
955{
956 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
957 return p_hdr ? p_hdr->m_max_num_msg : 0; }
958
959template<class VoidPointer>
960inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
961{
962 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
963 return p_hdr ? p_hdr->m_max_msg_size : 0;
964}
965
966template<class VoidPointer>
967inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const
968{
969 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
970 if(p_hdr){
971 //---------------------------------------------
972 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
973 //---------------------------------------------
974 return p_hdr->m_cur_num_msg;
975 }
976
977 return 0;
978}
979
980template<class VoidPointer>
981inline bool message_queue_t<VoidPointer>::remove(const char *name)
982{ return shared_memory_object::remove(name); }
983
984#else
985
986//!Typedef for a default message queue
987//!to be used between processes
988typedef message_queue_t<offset_ptr<void> > message_queue;
989
990#endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
991
992}} //namespace boost{ namespace interprocess{
993
994#include <boost/interprocess/detail/config_end.hpp>
995
996#endif //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP