]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/interprocess/test/message_queue_test.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / interprocess / test / message_queue_test.cpp
CommitLineData
7c673cae
FG
1//////////////////////////////////////////////////////////////////////////////
2//
3// (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
4// Software License, Version 1.0. (See accompanying file
5// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// See http://www.boost.org/libs/interprocess for documentation.
8//
9//////////////////////////////////////////////////////////////////////////////
10
11#include <boost/interprocess/detail/config_begin.hpp>
12#include <boost/interprocess/ipc/message_queue.hpp>
13#include <boost/interprocess/managed_external_buffer.hpp>
14#include <boost/interprocess/managed_heap_memory.hpp>
15#include <boost/interprocess/containers/map.hpp>
16#include <boost/interprocess/containers/set.hpp>
17#include <boost/interprocess/allocators/node_allocator.hpp>
18#include <boost/interprocess/detail/os_thread_functions.hpp>
19// intrusive/detail
20#include <boost/intrusive/detail/minimal_pair_header.hpp>
21#include <boost/intrusive/detail/minimal_less_equal_header.hpp>
22
b32b8144
FG
23#include <boost/move/unique_ptr.hpp>
24
7c673cae
FG
25#include <cstddef>
26#include <memory>
27#include <iostream>
28#include <vector>
29#include <stdexcept>
30#include <limits>
31
32#include "get_process_id_name.hpp"
33
34////////////////////////////////////////////////////////////////////////////////
35// //
36// This example tests the process shared message queue. //
37// //
38////////////////////////////////////////////////////////////////////////////////
39
40using namespace boost::interprocess;
41
42//This test inserts messages with different priority and marks them with a
43//time-stamp to check if receiver obtains highest priority messages first and
44//messages with same priority are received in fifo order
45bool test_priority_order()
46{
47 message_queue::remove(test::get_process_id_name());
48 {
49 message_queue mq1
50 (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
51 mq2
52 (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));
53
54 //We test that the queue is ordered by priority and in the
55 //same priority, is a FIFO
56 message_queue::size_type recvd = 0;
57 unsigned int priority = 0;
58 std::size_t tstamp;
59 unsigned int priority_prev;
60 std::size_t tstamp_prev;
61
62 //We will send 100 message with priority 0-9
63 //The message will contain the timestamp of the message
64 for(std::size_t i = 0; i < 100; ++i){
65 tstamp = i;
66 mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
67 }
68
69 priority_prev = (std::numeric_limits<unsigned int>::max)();
70 tstamp_prev = 0;
71
72 //Receive all messages and test those are ordered
73 //by priority and by FIFO in the same priority
74 for(std::size_t i = 0; i < 100; ++i){
75 mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
76 if(priority > priority_prev)
77 return false;
78 if(priority == priority_prev &&
79 tstamp <= tstamp_prev){
80 return false;
81 }
82 priority_prev = priority;
83 tstamp_prev = tstamp;
84 }
85
86 //Now retry it with different priority order
87 for(std::size_t i = 0; i < 100; ++i){
88 tstamp = i;
89 mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10));
90 }
91
92 priority_prev = (std::numeric_limits<unsigned int>::max)();
93 tstamp_prev = 0;
94
95 //Receive all messages and test those are ordered
96 //by priority and by FIFO in the same priority
97 for(std::size_t i = 0; i < 100; ++i){
98 mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
99 if(priority > priority_prev)
100 return false;
101 if(priority == priority_prev &&
102 tstamp <= tstamp_prev){
103 return false;
104 }
105 priority_prev = priority;
106 tstamp_prev = tstamp;
107 }
108 }
109 message_queue::remove(test::get_process_id_name());
110 return true;
111}
112
113//[message_queue_test_test_serialize_db
114//This test creates a in memory data-base using Interprocess machinery and
115//serializes it through a message queue. Then rebuilds the data-base in
116//another buffer and checks it against the original data-base
117bool test_serialize_db()
118{
119 //Typedef data to create a Interprocess map
120 typedef std::pair<const std::size_t, std::size_t> MyPair;
121 typedef std::less<std::size_t> MyLess;
122 typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
123 node_allocator_t;
124 typedef map<std::size_t,
125 std::size_t,
126 std::less<std::size_t>,
127 node_allocator_t>
128 MyMap;
129
130 //Some constants
131 const std::size_t BufferSize = 65536;
132 const std::size_t MaxMsgSize = 100;
133
134 //Allocate a memory buffer to hold the destiny database using vector<char>
135 std::vector<char> buffer_destiny(BufferSize, 0);
136
137 message_queue::remove(test::get_process_id_name());
138 {
139 //Create the message-queues
140 message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);
141
142 //Open previously created message-queue simulating other process
143 message_queue mq2(open_only, test::get_process_id_name());
144
145 //A managed heap memory to create the origin database
146 managed_heap_memory db_origin(buffer_destiny.size());
147
148 //Construct the map in the first buffer
149 MyMap *map1 = db_origin.construct<MyMap>("MyMap")
150 (MyLess(),
151 db_origin.get_segment_manager());
152 if(!map1)
153 return false;
154
155 //Fill map1 until is full
156 try{
157 std::size_t i = 0;
158 while(1){
159 (*map1)[i] = i;
160 ++i;
161 }
162 }
163 catch(boost::interprocess::bad_alloc &){}
164
165 //Data control data sending through the message queue
166 std::size_t sent = 0;
167 message_queue::size_type recvd = 0;
168 message_queue::size_type total_recvd = 0;
169 unsigned int priority;
170
171 //Send whole first buffer through the mq1, read it
172 //through mq2 to the second buffer
173 while(1){
174 //Send a fragment of buffer1 through mq1
175 std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
176 MaxMsgSize : (db_origin.get_size() - sent);
177 mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
178 , bytes_to_send
179 , 0);
180 sent += bytes_to_send;
181 //Receive the fragment through mq2 to buffer_destiny
182 mq2.receive( &buffer_destiny[total_recvd]
183 , BufferSize - recvd
184 , recvd
185 , priority);
186 total_recvd += recvd;
187
188 //Check if we have received all the buffer
189 if(total_recvd == BufferSize){
190 break;
191 }
192 }
193
194 //The buffer will contain a copy of the original database
195 //so let's interpret the buffer with managed_external_buffer
196 managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
197
198 //Let's find the map
199 std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap");
200 MyMap *map2 = ret.first;
201
202 //Check if we have found it
203 if(!map2){
204 return false;
205 }
206
207 //Check if it is a single variable (not an array)
208 if(ret.second != 1){
209 return false;
210 }
211
212 //Now let's compare size
213 if(map1->size() != map2->size()){
214 return false;
215 }
216
217 //Now let's compare all db values
218 MyMap::size_type num_elements = map1->size();
219 for(std::size_t i = 0; i < num_elements; ++i){
220 if((*map1)[i] != (*map2)[i]){
221 return false;
222 }
223 }
224
225 //Destroy maps from db-s
226 db_origin.destroy_ptr(map1);
227 db_destiny.destroy_ptr(map2);
228 }
229 message_queue::remove(test::get_process_id_name());
230 return true;
231}
232//]
233
234static const int MsgSize = 10;
235static const int NumMsg = 1000;
236static char msgsend [10];
237static char msgrecv [10];
238
239static boost::interprocess::message_queue *pmessage_queue;
240
241void receiver()
242{
243 boost::interprocess::message_queue::size_type recvd_size;
244 unsigned int priority;
245 int nummsg = NumMsg;
246
247 while(nummsg--){
248 pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
249 }
250}
251
252bool test_buffer_overflow()
253{
254 boost::interprocess::message_queue::remove(test::get_process_id_name());
255 {
b32b8144 256 boost::movelib::unique_ptr<boost::interprocess::message_queue>
7c673cae
FG
257 ptr(new boost::interprocess::message_queue
258 (create_only, test::get_process_id_name(), 10, 10));
259 pmessage_queue = ptr.get();
260
261 //Launch the receiver thread
262 boost::interprocess::ipcdetail::OS_thread_t thread;
263 boost::interprocess::ipcdetail::thread_launch(thread, &receiver);
264 boost::interprocess::ipcdetail::thread_yield();
265
266 int nummsg = NumMsg;
267
268 while(nummsg--){
269 pmessage_queue->send(msgsend, MsgSize, 0);
270 }
271
272 boost::interprocess::ipcdetail::thread_join(thread);
273 }
274 boost::interprocess::message_queue::remove(test::get_process_id_name());
275 return true;
276}
277
278
279//////////////////////////////////////////////////////////////////////////////
280//
281// test_multi_sender_receiver is based on Alexander (aalutov's)
282// testcase for ticket #9221. Many thanks.
283//
284//////////////////////////////////////////////////////////////////////////////
285
286static boost::interprocess::message_queue *global_queue = 0;
287//We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
288static const int MULTI_NUM_MSG_PER_SENDER = 10000;
289//Message queue message capacity
290static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
291//We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
292static const int MULTI_THREAD_COUNT = 10;
293
294static void multisend()
295{
296 char buff;
297 for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
298 global_queue->send(&buff, 1, 0);
299 }
300 global_queue->send(&buff, 0, 0);
301 //std::cout<<"writer thread complete"<<std::endl;
302}
303
304static void multireceive()
305{
306 char buff;
307 size_t size;
308 int received_msgs = 0;
309 unsigned int priority;
310 do {
311 global_queue->receive(&buff, 1, size, priority);
312 ++received_msgs;
313 } while (size > 0);
314 --received_msgs;
315 //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
316}
317
318
319bool test_multi_sender_receiver()
320{
321 bool ret = true;
322 //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
323 try {
324 boost::interprocess::message_queue::remove(test::get_process_id_name());
325 boost::interprocess::message_queue mq
326 (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
327 global_queue = &mq;
328 std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
329
330 //Launch senders receiver thread
331 for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
332 boost::interprocess::ipcdetail::thread_launch
333 (threads[i], &multisend);
334 }
335
336 for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
337 boost::interprocess::ipcdetail::thread_launch
338 (threads[MULTI_THREAD_COUNT+i], &multireceive);
339 }
340
341 for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
342 boost::interprocess::ipcdetail::thread_join(threads[i]);
343 //std::cout << "Joined thread " << i << std::endl;
344 }
345 }
346 catch (std::exception &e) {
347 std::cout << "error " << e.what() << std::endl;
348 ret = false;
349 }
350 boost::interprocess::message_queue::remove(test::get_process_id_name());
351 return ret;
352}
353
354
355int main ()
356{
357 if(!test_priority_order()){
358 return 1;
359 }
360
361 if(!test_serialize_db()){
362 return 1;
363 }
364
365 if(!test_buffer_overflow()){
366 return 1;
367 }
368
369 if(!test_multi_sender_receiver()){
370 return 1;
371 }
372
373 return 0;
374}
375
376#include <boost/interprocess/detail/config_end.hpp>