]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/interprocess/test/message_queue_test.cpp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / boost / libs / interprocess / test / message_queue_test.cpp
CommitLineData
7c673cae
FG
1//////////////////////////////////////////////////////////////////////////////
2//
3// (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
4// Software License, Version 1.0. (See accompanying file
5// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// See http://www.boost.org/libs/interprocess for documentation.
8//
9//////////////////////////////////////////////////////////////////////////////
10
11#include <boost/interprocess/detail/config_begin.hpp>
12#include <boost/interprocess/ipc/message_queue.hpp>
13#include <boost/interprocess/managed_external_buffer.hpp>
14#include <boost/interprocess/managed_heap_memory.hpp>
15#include <boost/interprocess/containers/map.hpp>
16#include <boost/interprocess/containers/set.hpp>
17#include <boost/interprocess/allocators/node_allocator.hpp>
18#include <boost/interprocess/detail/os_thread_functions.hpp>
19// intrusive/detail
20#include <boost/intrusive/detail/minimal_pair_header.hpp>
21#include <boost/intrusive/detail/minimal_less_equal_header.hpp>
22
23#include <cstddef>
24#include <memory>
25#include <iostream>
26#include <vector>
27#include <stdexcept>
28#include <limits>
29
30#include "get_process_id_name.hpp"
31
32////////////////////////////////////////////////////////////////////////////////
33// //
34// This example tests the process shared message queue. //
35// //
36////////////////////////////////////////////////////////////////////////////////
37
38using namespace boost::interprocess;
39
40//This test inserts messages with different priority and marks them with a
41//time-stamp to check if receiver obtains highest priority messages first and
42//messages with same priority are received in fifo order
43bool test_priority_order()
44{
45 message_queue::remove(test::get_process_id_name());
46 {
47 message_queue mq1
48 (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
49 mq2
50 (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));
51
52 //We test that the queue is ordered by priority and in the
53 //same priority, is a FIFO
54 message_queue::size_type recvd = 0;
55 unsigned int priority = 0;
56 std::size_t tstamp;
57 unsigned int priority_prev;
58 std::size_t tstamp_prev;
59
60 //We will send 100 message with priority 0-9
61 //The message will contain the timestamp of the message
62 for(std::size_t i = 0; i < 100; ++i){
63 tstamp = i;
64 mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
65 }
66
67 priority_prev = (std::numeric_limits<unsigned int>::max)();
68 tstamp_prev = 0;
69
70 //Receive all messages and test those are ordered
71 //by priority and by FIFO in the same priority
72 for(std::size_t i = 0; i < 100; ++i){
73 mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
74 if(priority > priority_prev)
75 return false;
76 if(priority == priority_prev &&
77 tstamp <= tstamp_prev){
78 return false;
79 }
80 priority_prev = priority;
81 tstamp_prev = tstamp;
82 }
83
84 //Now retry it with different priority order
85 for(std::size_t i = 0; i < 100; ++i){
86 tstamp = i;
87 mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10));
88 }
89
90 priority_prev = (std::numeric_limits<unsigned int>::max)();
91 tstamp_prev = 0;
92
93 //Receive all messages and test those are ordered
94 //by priority and by FIFO in the same priority
95 for(std::size_t i = 0; i < 100; ++i){
96 mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
97 if(priority > priority_prev)
98 return false;
99 if(priority == priority_prev &&
100 tstamp <= tstamp_prev){
101 return false;
102 }
103 priority_prev = priority;
104 tstamp_prev = tstamp;
105 }
106 }
107 message_queue::remove(test::get_process_id_name());
108 return true;
109}
110
111//[message_queue_test_test_serialize_db
112//This test creates a in memory data-base using Interprocess machinery and
113//serializes it through a message queue. Then rebuilds the data-base in
114//another buffer and checks it against the original data-base
115bool test_serialize_db()
116{
117 //Typedef data to create a Interprocess map
118 typedef std::pair<const std::size_t, std::size_t> MyPair;
119 typedef std::less<std::size_t> MyLess;
120 typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
121 node_allocator_t;
122 typedef map<std::size_t,
123 std::size_t,
124 std::less<std::size_t>,
125 node_allocator_t>
126 MyMap;
127
128 //Some constants
129 const std::size_t BufferSize = 65536;
130 const std::size_t MaxMsgSize = 100;
131
132 //Allocate a memory buffer to hold the destiny database using vector<char>
133 std::vector<char> buffer_destiny(BufferSize, 0);
134
135 message_queue::remove(test::get_process_id_name());
136 {
137 //Create the message-queues
138 message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);
139
140 //Open previously created message-queue simulating other process
141 message_queue mq2(open_only, test::get_process_id_name());
142
143 //A managed heap memory to create the origin database
144 managed_heap_memory db_origin(buffer_destiny.size());
145
146 //Construct the map in the first buffer
147 MyMap *map1 = db_origin.construct<MyMap>("MyMap")
148 (MyLess(),
149 db_origin.get_segment_manager());
150 if(!map1)
151 return false;
152
153 //Fill map1 until is full
154 try{
155 std::size_t i = 0;
156 while(1){
157 (*map1)[i] = i;
158 ++i;
159 }
160 }
161 catch(boost::interprocess::bad_alloc &){}
162
163 //Data control data sending through the message queue
164 std::size_t sent = 0;
165 message_queue::size_type recvd = 0;
166 message_queue::size_type total_recvd = 0;
167 unsigned int priority;
168
169 //Send whole first buffer through the mq1, read it
170 //through mq2 to the second buffer
171 while(1){
172 //Send a fragment of buffer1 through mq1
173 std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
174 MaxMsgSize : (db_origin.get_size() - sent);
175 mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
176 , bytes_to_send
177 , 0);
178 sent += bytes_to_send;
179 //Receive the fragment through mq2 to buffer_destiny
180 mq2.receive( &buffer_destiny[total_recvd]
181 , BufferSize - recvd
182 , recvd
183 , priority);
184 total_recvd += recvd;
185
186 //Check if we have received all the buffer
187 if(total_recvd == BufferSize){
188 break;
189 }
190 }
191
192 //The buffer will contain a copy of the original database
193 //so let's interpret the buffer with managed_external_buffer
194 managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
195
196 //Let's find the map
197 std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap");
198 MyMap *map2 = ret.first;
199
200 //Check if we have found it
201 if(!map2){
202 return false;
203 }
204
205 //Check if it is a single variable (not an array)
206 if(ret.second != 1){
207 return false;
208 }
209
210 //Now let's compare size
211 if(map1->size() != map2->size()){
212 return false;
213 }
214
215 //Now let's compare all db values
216 MyMap::size_type num_elements = map1->size();
217 for(std::size_t i = 0; i < num_elements; ++i){
218 if((*map1)[i] != (*map2)[i]){
219 return false;
220 }
221 }
222
223 //Destroy maps from db-s
224 db_origin.destroy_ptr(map1);
225 db_destiny.destroy_ptr(map2);
226 }
227 message_queue::remove(test::get_process_id_name());
228 return true;
229}
230//]
231
232static const int MsgSize = 10;
233static const int NumMsg = 1000;
234static char msgsend [10];
235static char msgrecv [10];
236
237static boost::interprocess::message_queue *pmessage_queue;
238
239void receiver()
240{
241 boost::interprocess::message_queue::size_type recvd_size;
242 unsigned int priority;
243 int nummsg = NumMsg;
244
245 while(nummsg--){
246 pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
247 }
248}
249
250bool test_buffer_overflow()
251{
252 boost::interprocess::message_queue::remove(test::get_process_id_name());
253 {
254 std::auto_ptr<boost::interprocess::message_queue>
255 ptr(new boost::interprocess::message_queue
256 (create_only, test::get_process_id_name(), 10, 10));
257 pmessage_queue = ptr.get();
258
259 //Launch the receiver thread
260 boost::interprocess::ipcdetail::OS_thread_t thread;
261 boost::interprocess::ipcdetail::thread_launch(thread, &receiver);
262 boost::interprocess::ipcdetail::thread_yield();
263
264 int nummsg = NumMsg;
265
266 while(nummsg--){
267 pmessage_queue->send(msgsend, MsgSize, 0);
268 }
269
270 boost::interprocess::ipcdetail::thread_join(thread);
271 }
272 boost::interprocess::message_queue::remove(test::get_process_id_name());
273 return true;
274}
275
276
277//////////////////////////////////////////////////////////////////////////////
278//
279// test_multi_sender_receiver is based on Alexander (aalutov's)
280// testcase for ticket #9221. Many thanks.
281//
282//////////////////////////////////////////////////////////////////////////////
283
284static boost::interprocess::message_queue *global_queue = 0;
285//We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
286static const int MULTI_NUM_MSG_PER_SENDER = 10000;
287//Message queue message capacity
288static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
289//We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
290static const int MULTI_THREAD_COUNT = 10;
291
292static void multisend()
293{
294 char buff;
295 for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
296 global_queue->send(&buff, 1, 0);
297 }
298 global_queue->send(&buff, 0, 0);
299 //std::cout<<"writer thread complete"<<std::endl;
300}
301
302static void multireceive()
303{
304 char buff;
305 size_t size;
306 int received_msgs = 0;
307 unsigned int priority;
308 do {
309 global_queue->receive(&buff, 1, size, priority);
310 ++received_msgs;
311 } while (size > 0);
312 --received_msgs;
313 //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
314}
315
316
317bool test_multi_sender_receiver()
318{
319 bool ret = true;
320 //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
321 try {
322 boost::interprocess::message_queue::remove(test::get_process_id_name());
323 boost::interprocess::message_queue mq
324 (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
325 global_queue = &mq;
326 std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
327
328 //Launch senders receiver thread
329 for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
330 boost::interprocess::ipcdetail::thread_launch
331 (threads[i], &multisend);
332 }
333
334 for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
335 boost::interprocess::ipcdetail::thread_launch
336 (threads[MULTI_THREAD_COUNT+i], &multireceive);
337 }
338
339 for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
340 boost::interprocess::ipcdetail::thread_join(threads[i]);
341 //std::cout << "Joined thread " << i << std::endl;
342 }
343 }
344 catch (std::exception &e) {
345 std::cout << "error " << e.what() << std::endl;
346 ret = false;
347 }
348 boost::interprocess::message_queue::remove(test::get_process_id_name());
349 return ret;
350}
351
352
353int main ()
354{
355 if(!test_priority_order()){
356 return 1;
357 }
358
359 if(!test_serialize_db()){
360 return 1;
361 }
362
363 if(!test_buffer_overflow()){
364 return 1;
365 }
366
367 if(!test_multi_sender_receiver()){
368 return 1;
369 }
370
371 return 0;
372}
373
374#include <boost/interprocess/detail/config_end.hpp>