1 //////////////////////////////////////////////////////////////////////////////
3 // (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
4 // Software License, Version 1.0. (See accompanying file
5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // See http://www.boost.org/libs/interprocess for documentation.
9 //////////////////////////////////////////////////////////////////////////////
11 #include <boost/interprocess/ipc/message_queue.hpp>
12 #include <boost/interprocess/managed_external_buffer.hpp>
13 #include <boost/interprocess/managed_heap_memory.hpp>
14 #include <boost/interprocess/containers/map.hpp>
15 #include <boost/interprocess/containers/set.hpp>
16 #include <boost/interprocess/allocators/node_allocator.hpp>
17 #include <boost/interprocess/detail/os_thread_functions.hpp>
19 #include <boost/intrusive/detail/minimal_pair_header.hpp>
20 #include <boost/intrusive/detail/minimal_less_equal_header.hpp>
22 #include <boost/move/unique_ptr.hpp>
31 #include "get_process_id_name.hpp"
32 #include "named_creation_template.hpp"
34 ////////////////////////////////////////////////////////////////////////////////
36 // This example tests the process shared message queue. //
38 ////////////////////////////////////////////////////////////////////////////////
40 using namespace boost::interprocess
;
42 //This test inserts messages with different priority and marks them with a
43 //time-stamp to check if receiver obtains highest priority messages first and
44 //messages with same priority are received in fifo order
45 bool test_priority_order()
47 message_queue::remove(test::get_process_id_name());
50 (open_or_create
, test::get_process_id_name(), 100, sizeof(std::size_t)),
52 (open_or_create
, test::get_process_id_name(), 100, sizeof(std::size_t));
54 //We test that the queue is ordered by priority and in the
55 //same priority, is a FIFO
56 message_queue::size_type recvd
= 0;
57 unsigned int priority
= 0;
59 unsigned int priority_prev
;
60 std::size_t tstamp_prev
;
62 //We will send 100 message with priority 0-9
63 //The message will contain the timestamp of the message
64 for(std::size_t i
= 0; i
< 100; ++i
){
66 mq1
.send(&tstamp
, sizeof(tstamp
), (unsigned int)(i
%10));
69 priority_prev
= (std::numeric_limits
<unsigned int>::max
)();
72 //Receive all messages and test those are ordered
73 //by priority and by FIFO in the same priority
74 for(std::size_t i
= 0; i
< 100; ++i
){
75 mq1
.receive(&tstamp
, sizeof(tstamp
), recvd
, priority
);
76 if(priority
> priority_prev
)
78 if(priority
== priority_prev
&&
79 tstamp
<= tstamp_prev
){
82 priority_prev
= priority
;
86 //Now retry it with different priority order
87 for(std::size_t i
= 0; i
< 100; ++i
){
89 mq1
.send(&tstamp
, sizeof(tstamp
), (unsigned int)(9 - i
%10));
92 priority_prev
= (std::numeric_limits
<unsigned int>::max
)();
95 //Receive all messages and test those are ordered
96 //by priority and by FIFO in the same priority
97 for(std::size_t i
= 0; i
< 100; ++i
){
98 mq1
.receive(&tstamp
, sizeof(tstamp
), recvd
, priority
);
99 if(priority
> priority_prev
)
101 if(priority
== priority_prev
&&
102 tstamp
<= tstamp_prev
){
105 priority_prev
= priority
;
106 tstamp_prev
= tstamp
;
109 message_queue::remove(test::get_process_id_name());
113 //[message_queue_test_test_serialize_db
114 //This test creates a in memory data-base using Interprocess machinery and
115 //serializes it through a message queue. Then rebuilds the data-base in
116 //another buffer and checks it against the original data-base
117 bool test_serialize_db()
119 //Typedef data to create a Interprocess map
120 typedef std::pair
<const std::size_t, std::size_t> MyPair
;
121 typedef std::less
<std::size_t> MyLess
;
122 typedef node_allocator
<MyPair
, managed_external_buffer::segment_manager
>
124 typedef map
<std::size_t,
126 std::less
<std::size_t>,
131 const std::size_t BufferSize
= 65536;
132 const std::size_t MaxMsgSize
= 100;
134 //Allocate a memory buffer to hold the destiny database using vector<char>
135 std::vector
<char> buffer_destiny(BufferSize
, 0);
137 message_queue::remove(test::get_process_id_name());
139 //Create the message-queues
140 message_queue
mq1(create_only
, test::get_process_id_name(), 1, MaxMsgSize
);
142 //Open previously created message-queue simulating other process
143 message_queue
mq2(open_only
, test::get_process_id_name());
145 //A managed heap memory to create the origin database
146 managed_heap_memory
db_origin(buffer_destiny
.size());
148 //Construct the map in the first buffer
149 MyMap
*map1
= db_origin
.construct
<MyMap
>("MyMap")
151 db_origin
.get_segment_manager());
155 //Fill map1 until is full
163 BOOST_CATCH(boost::interprocess::bad_alloc
&){} BOOST_CATCH_END
165 //Data control data sending through the message queue
166 std::size_t sent
= 0;
167 message_queue::size_type recvd
= 0;
168 message_queue::size_type total_recvd
= 0;
169 unsigned int priority
;
171 //Send whole first buffer through the mq1, read it
172 //through mq2 to the second buffer
174 //Send a fragment of buffer1 through mq1
175 std::size_t bytes_to_send
= MaxMsgSize
< (db_origin
.get_size() - sent
) ?
176 MaxMsgSize
: (db_origin
.get_size() - sent
);
177 mq1
.send( &static_cast<char*>(db_origin
.get_address())[sent
]
180 sent
+= bytes_to_send
;
181 //Receive the fragment through mq2 to buffer_destiny
182 mq2
.receive( &buffer_destiny
[total_recvd
]
186 total_recvd
+= recvd
;
188 //Check if we have received all the buffer
189 if(total_recvd
== BufferSize
){
194 //The buffer will contain a copy of the original database
195 //so let's interpret the buffer with managed_external_buffer
196 managed_external_buffer
db_destiny(open_only
, &buffer_destiny
[0], BufferSize
);
199 std::pair
<MyMap
*, managed_external_buffer::size_type
> ret
= db_destiny
.find
<MyMap
>("MyMap");
200 MyMap
*map2
= ret
.first
;
202 //Check if we have found it
207 //Check if it is a single variable (not an array)
212 //Now let's compare size
213 if(map1
->size() != map2
->size()){
217 //Now let's compare all db values
218 MyMap::size_type num_elements
= map1
->size();
219 for(std::size_t i
= 0; i
< num_elements
; ++i
){
220 if((*map1
)[i
] != (*map2
)[i
]){
225 //Destroy maps from db-s
226 db_origin
.destroy_ptr(map1
);
227 db_destiny
.destroy_ptr(map2
);
229 message_queue::remove(test::get_process_id_name());
234 static const int MsgSize
= 10;
235 static const int NumMsg
= 1000;
236 static char msgsend
[10];
237 static char msgrecv
[10];
239 static boost::interprocess::message_queue
*pmessage_queue
;
243 boost::interprocess::message_queue::size_type recvd_size
;
244 unsigned int priority
;
248 pmessage_queue
->receive(msgrecv
, MsgSize
, recvd_size
, priority
);
252 bool test_buffer_overflow()
254 boost::interprocess::message_queue::remove(test::get_process_id_name());
256 boost::movelib::unique_ptr
<boost::interprocess::message_queue
>
257 ptr(new boost::interprocess::message_queue
258 (create_only
, test::get_process_id_name(), 10, 10));
259 pmessage_queue
= ptr
.get();
261 //Launch the receiver thread
262 boost::interprocess::ipcdetail::OS_thread_t thread
;
263 boost::interprocess::ipcdetail::thread_launch(thread
, &receiver
);
264 boost::interprocess::ipcdetail::thread_yield();
269 pmessage_queue
->send(msgsend
, MsgSize
, 0);
272 boost::interprocess::ipcdetail::thread_join(thread
);
274 boost::interprocess::message_queue::remove(test::get_process_id_name());
279 //////////////////////////////////////////////////////////////////////////////
281 // test_multi_sender_receiver is based on Alexander (aalutov's)
282 // testcase for ticket #9221. Many thanks.
284 //////////////////////////////////////////////////////////////////////////////
286 static boost::interprocess::message_queue
*global_queue
= 0;
287 //We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
288 static const int MULTI_NUM_MSG_PER_SENDER
= 10000;
289 //Message queue message capacity
290 static const int MULTI_QUEUE_SIZE
= (MULTI_NUM_MSG_PER_SENDER
- 1)/MULTI_NUM_MSG_PER_SENDER
+ 1;
291 //We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
292 static const std::size_t MULTI_THREAD_COUNT
= 10;
294 static void multisend()
297 for (std::size_t i
= 0; i
< MULTI_NUM_MSG_PER_SENDER
; i
++) {
298 global_queue
->send(&buff
, 1, 0);
300 global_queue
->send(&buff
, 0, 0);
301 //std::cout<<"writer thread complete"<<std::endl;
304 static void multireceive()
308 int received_msgs
= 0;
309 unsigned int priority
;
311 global_queue
->receive(&buff
, 1, size
, priority
);
315 //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
319 bool test_multi_sender_receiver()
322 //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
324 boost::interprocess::message_queue::remove(test::get_process_id_name());
325 boost::interprocess::message_queue mq
326 (boost::interprocess::open_or_create
, test::get_process_id_name(), MULTI_QUEUE_SIZE
, 1);
328 std::vector
<boost::interprocess::ipcdetail::OS_thread_t
> threads(MULTI_THREAD_COUNT
*2);
330 //Launch senders receiver thread
331 for (std::size_t i
= 0; i
< MULTI_THREAD_COUNT
; i
++) {
332 boost::interprocess::ipcdetail::thread_launch
333 (threads
[i
], &multisend
);
336 for (std::size_t i
= 0; i
< MULTI_THREAD_COUNT
; i
++) {
337 boost::interprocess::ipcdetail::thread_launch
338 (threads
[MULTI_THREAD_COUNT
+i
], &multireceive
);
341 for (std::size_t i
= 0; i
< MULTI_THREAD_COUNT
*2; i
++) {
342 boost::interprocess::ipcdetail::thread_join(threads
[i
]);
343 //std::cout << "Joined thread " << i << std::endl;
346 BOOST_CATCH(std::exception
&e
) {
347 std::cout
<< "error " << e
.what() << std::endl
;
350 boost::interprocess::message_queue::remove(test::get_process_id_name());
354 class msg_queue_named_test_wrapper
355 : public test::named_sync_deleter
<message_queue
>, public message_queue
359 msg_queue_named_test_wrapper(create_only_t
)
360 : message_queue(create_only
, test::get_process_id_name(), 10, 10)
363 msg_queue_named_test_wrapper(open_only_t
)
364 : message_queue(open_only
, test::get_process_id_name())
367 msg_queue_named_test_wrapper(open_or_create_t
)
368 : message_queue(open_or_create
, test::get_process_id_name(), 10, 10)
371 ~msg_queue_named_test_wrapper()
375 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES)
377 class msg_queue_named_test_wrapper_w
378 : public test::named_sync_deleter_w
<message_queue
>, public message_queue
382 template <class CharT
>
383 msg_queue_named_test_wrapper_w(create_only_t
)
384 : message_queue(create_only
, test::get_process_id_wname(), 10, 10)
387 msg_queue_named_test_wrapper_w(open_only_t
)
388 : message_queue(open_only
, test::get_process_id_wname())
391 msg_queue_named_test_wrapper_w(open_or_create_t
)
392 : message_queue(open_or_create
, test::get_process_id_wname(), 10, 10)
395 ~msg_queue_named_test_wrapper_w()
399 #endif //defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES)
406 message_queue::remove(test::get_process_id_name());
407 test::test_named_creation
<msg_queue_named_test_wrapper
>();
408 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES)
409 test::test_named_creation
<msg_queue_named_test_wrapper
>();
412 if(!test_priority_order()){
416 if(!test_serialize_db()){
420 if(!test_buffer_overflow()){
424 if(!test_multi_sender_receiver()){
428 BOOST_CATCH(std::exception
&ex
) {
429 std::cout
<< ex
.what() << std::endl
;
433 message_queue::remove(test::get_process_id_name());