]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | ////////////////////////////////////////////////////////////////////////////// |
2 | // | |
3 | // (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost | |
4 | // Software License, Version 1.0. (See accompanying file | |
5 | // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | // See http://www.boost.org/libs/interprocess for documentation. | |
8 | // | |
9 | ////////////////////////////////////////////////////////////////////////////// | |
10 | ||
7c673cae FG |
11 | #include <boost/interprocess/ipc/message_queue.hpp> |
12 | #include <boost/interprocess/managed_external_buffer.hpp> | |
13 | #include <boost/interprocess/managed_heap_memory.hpp> | |
14 | #include <boost/interprocess/containers/map.hpp> | |
15 | #include <boost/interprocess/containers/set.hpp> | |
16 | #include <boost/interprocess/allocators/node_allocator.hpp> | |
17 | #include <boost/interprocess/detail/os_thread_functions.hpp> | |
18 | // intrusive/detail | |
19 | #include <boost/intrusive/detail/minimal_pair_header.hpp> | |
20 | #include <boost/intrusive/detail/minimal_less_equal_header.hpp> | |
21 | ||
b32b8144 FG |
22 | #include <boost/move/unique_ptr.hpp> |
23 | ||
7c673cae FG |
24 | #include <cstddef> |
25 | #include <memory> | |
26 | #include <iostream> | |
27 | #include <vector> | |
1e59de90 | 28 | #include <exception> |
7c673cae FG |
29 | #include <limits> |
30 | ||
31 | #include "get_process_id_name.hpp" | |
1e59de90 | 32 | #include "named_creation_template.hpp" |
7c673cae FG |
33 | |
34 | //////////////////////////////////////////////////////////////////////////////// | |
35 | // // | |
36 | // This example tests the process shared message queue. // | |
37 | // // | |
38 | //////////////////////////////////////////////////////////////////////////////// | |
39 | ||
40 | using namespace boost::interprocess; | |
41 | ||
42 | //This test inserts messages with different priority and marks them with a | |
43 | //time-stamp to check if receiver obtains highest priority messages first and | |
44 | //messages with same priority are received in fifo order | |
45 | bool test_priority_order() | |
46 | { | |
47 | message_queue::remove(test::get_process_id_name()); | |
48 | { | |
49 | message_queue mq1 | |
50 | (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)), | |
51 | mq2 | |
52 | (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)); | |
53 | ||
54 | //We test that the queue is ordered by priority and in the | |
55 | //same priority, is a FIFO | |
56 | message_queue::size_type recvd = 0; | |
57 | unsigned int priority = 0; | |
58 | std::size_t tstamp; | |
59 | unsigned int priority_prev; | |
60 | std::size_t tstamp_prev; | |
61 | ||
62 | //We will send 100 message with priority 0-9 | |
63 | //The message will contain the timestamp of the message | |
64 | for(std::size_t i = 0; i < 100; ++i){ | |
65 | tstamp = i; | |
66 | mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10)); | |
67 | } | |
68 | ||
69 | priority_prev = (std::numeric_limits<unsigned int>::max)(); | |
70 | tstamp_prev = 0; | |
71 | ||
72 | //Receive all messages and test those are ordered | |
73 | //by priority and by FIFO in the same priority | |
74 | for(std::size_t i = 0; i < 100; ++i){ | |
75 | mq1.receive(&tstamp, sizeof(tstamp), recvd, priority); | |
76 | if(priority > priority_prev) | |
77 | return false; | |
78 | if(priority == priority_prev && | |
79 | tstamp <= tstamp_prev){ | |
80 | return false; | |
81 | } | |
82 | priority_prev = priority; | |
83 | tstamp_prev = tstamp; | |
84 | } | |
85 | ||
86 | //Now retry it with different priority order | |
87 | for(std::size_t i = 0; i < 100; ++i){ | |
88 | tstamp = i; | |
89 | mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10)); | |
90 | } | |
91 | ||
92 | priority_prev = (std::numeric_limits<unsigned int>::max)(); | |
93 | tstamp_prev = 0; | |
94 | ||
95 | //Receive all messages and test those are ordered | |
96 | //by priority and by FIFO in the same priority | |
97 | for(std::size_t i = 0; i < 100; ++i){ | |
98 | mq1.receive(&tstamp, sizeof(tstamp), recvd, priority); | |
99 | if(priority > priority_prev) | |
100 | return false; | |
101 | if(priority == priority_prev && | |
102 | tstamp <= tstamp_prev){ | |
103 | return false; | |
104 | } | |
105 | priority_prev = priority; | |
106 | tstamp_prev = tstamp; | |
107 | } | |
108 | } | |
109 | message_queue::remove(test::get_process_id_name()); | |
110 | return true; | |
111 | } | |
112 | ||
113 | //[message_queue_test_test_serialize_db | |
114 | //This test creates a in memory data-base using Interprocess machinery and | |
115 | //serializes it through a message queue. Then rebuilds the data-base in | |
116 | //another buffer and checks it against the original data-base | |
117 | bool test_serialize_db() | |
118 | { | |
119 | //Typedef data to create a Interprocess map | |
120 | typedef std::pair<const std::size_t, std::size_t> MyPair; | |
121 | typedef std::less<std::size_t> MyLess; | |
122 | typedef node_allocator<MyPair, managed_external_buffer::segment_manager> | |
123 | node_allocator_t; | |
124 | typedef map<std::size_t, | |
125 | std::size_t, | |
126 | std::less<std::size_t>, | |
127 | node_allocator_t> | |
128 | MyMap; | |
129 | ||
130 | //Some constants | |
131 | const std::size_t BufferSize = 65536; | |
132 | const std::size_t MaxMsgSize = 100; | |
133 | ||
134 | //Allocate a memory buffer to hold the destiny database using vector<char> | |
135 | std::vector<char> buffer_destiny(BufferSize, 0); | |
136 | ||
137 | message_queue::remove(test::get_process_id_name()); | |
138 | { | |
139 | //Create the message-queues | |
140 | message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize); | |
141 | ||
142 | //Open previously created message-queue simulating other process | |
143 | message_queue mq2(open_only, test::get_process_id_name()); | |
144 | ||
145 | //A managed heap memory to create the origin database | |
146 | managed_heap_memory db_origin(buffer_destiny.size()); | |
147 | ||
148 | //Construct the map in the first buffer | |
149 | MyMap *map1 = db_origin.construct<MyMap>("MyMap") | |
150 | (MyLess(), | |
151 | db_origin.get_segment_manager()); | |
152 | if(!map1) | |
153 | return false; | |
154 | ||
155 | //Fill map1 until is full | |
1e59de90 | 156 | BOOST_TRY{ |
7c673cae FG |
157 | std::size_t i = 0; |
158 | while(1){ | |
159 | (*map1)[i] = i; | |
160 | ++i; | |
161 | } | |
162 | } | |
1e59de90 | 163 | BOOST_CATCH(boost::interprocess::bad_alloc &){} BOOST_CATCH_END |
7c673cae FG |
164 | |
165 | //Data control data sending through the message queue | |
166 | std::size_t sent = 0; | |
167 | message_queue::size_type recvd = 0; | |
168 | message_queue::size_type total_recvd = 0; | |
169 | unsigned int priority; | |
170 | ||
171 | //Send whole first buffer through the mq1, read it | |
172 | //through mq2 to the second buffer | |
173 | while(1){ | |
174 | //Send a fragment of buffer1 through mq1 | |
175 | std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ? | |
176 | MaxMsgSize : (db_origin.get_size() - sent); | |
177 | mq1.send( &static_cast<char*>(db_origin.get_address())[sent] | |
178 | , bytes_to_send | |
179 | , 0); | |
180 | sent += bytes_to_send; | |
181 | //Receive the fragment through mq2 to buffer_destiny | |
182 | mq2.receive( &buffer_destiny[total_recvd] | |
183 | , BufferSize - recvd | |
184 | , recvd | |
185 | , priority); | |
186 | total_recvd += recvd; | |
187 | ||
188 | //Check if we have received all the buffer | |
189 | if(total_recvd == BufferSize){ | |
190 | break; | |
191 | } | |
192 | } | |
193 | ||
194 | //The buffer will contain a copy of the original database | |
195 | //so let's interpret the buffer with managed_external_buffer | |
196 | managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize); | |
197 | ||
198 | //Let's find the map | |
199 | std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap"); | |
200 | MyMap *map2 = ret.first; | |
201 | ||
202 | //Check if we have found it | |
203 | if(!map2){ | |
204 | return false; | |
205 | } | |
206 | ||
207 | //Check if it is a single variable (not an array) | |
208 | if(ret.second != 1){ | |
209 | return false; | |
210 | } | |
211 | ||
212 | //Now let's compare size | |
213 | if(map1->size() != map2->size()){ | |
214 | return false; | |
215 | } | |
216 | ||
217 | //Now let's compare all db values | |
218 | MyMap::size_type num_elements = map1->size(); | |
219 | for(std::size_t i = 0; i < num_elements; ++i){ | |
220 | if((*map1)[i] != (*map2)[i]){ | |
221 | return false; | |
222 | } | |
223 | } | |
224 | ||
225 | //Destroy maps from db-s | |
226 | db_origin.destroy_ptr(map1); | |
227 | db_destiny.destroy_ptr(map2); | |
228 | } | |
229 | message_queue::remove(test::get_process_id_name()); | |
230 | return true; | |
231 | } | |
232 | //] | |
233 | ||
234 | static const int MsgSize = 10; | |
235 | static const int NumMsg = 1000; | |
236 | static char msgsend [10]; | |
237 | static char msgrecv [10]; | |
238 | ||
239 | static boost::interprocess::message_queue *pmessage_queue; | |
240 | ||
241 | void receiver() | |
242 | { | |
243 | boost::interprocess::message_queue::size_type recvd_size; | |
244 | unsigned int priority; | |
245 | int nummsg = NumMsg; | |
246 | ||
247 | while(nummsg--){ | |
248 | pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority); | |
249 | } | |
250 | } | |
251 | ||
252 | bool test_buffer_overflow() | |
253 | { | |
254 | boost::interprocess::message_queue::remove(test::get_process_id_name()); | |
255 | { | |
b32b8144 | 256 | boost::movelib::unique_ptr<boost::interprocess::message_queue> |
7c673cae FG |
257 | ptr(new boost::interprocess::message_queue |
258 | (create_only, test::get_process_id_name(), 10, 10)); | |
259 | pmessage_queue = ptr.get(); | |
260 | ||
261 | //Launch the receiver thread | |
262 | boost::interprocess::ipcdetail::OS_thread_t thread; | |
263 | boost::interprocess::ipcdetail::thread_launch(thread, &receiver); | |
264 | boost::interprocess::ipcdetail::thread_yield(); | |
265 | ||
266 | int nummsg = NumMsg; | |
267 | ||
268 | while(nummsg--){ | |
269 | pmessage_queue->send(msgsend, MsgSize, 0); | |
270 | } | |
271 | ||
272 | boost::interprocess::ipcdetail::thread_join(thread); | |
273 | } | |
274 | boost::interprocess::message_queue::remove(test::get_process_id_name()); | |
275 | return true; | |
276 | } | |
277 | ||
278 | ||
279 | ////////////////////////////////////////////////////////////////////////////// | |
280 | // | |
281 | // test_multi_sender_receiver is based on Alexander (aalutov's) | |
282 | // testcase for ticket #9221. Many thanks. | |
283 | // | |
284 | ////////////////////////////////////////////////////////////////////////////// | |
285 | ||
286 | static boost::interprocess::message_queue *global_queue = 0; | |
287 | //We'll send MULTI_NUM_MSG_PER_SENDER messages per sender | |
288 | static const int MULTI_NUM_MSG_PER_SENDER = 10000; | |
289 | //Message queue message capacity | |
290 | static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1; | |
291 | //We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers | |
1e59de90 | 292 | static const std::size_t MULTI_THREAD_COUNT = 10; |
7c673cae FG |
293 | |
294 | static void multisend() | |
295 | { | |
296 | char buff; | |
1e59de90 | 297 | for (std::size_t i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) { |
7c673cae FG |
298 | global_queue->send(&buff, 1, 0); |
299 | } | |
300 | global_queue->send(&buff, 0, 0); | |
301 | //std::cout<<"writer thread complete"<<std::endl; | |
302 | } | |
303 | ||
304 | static void multireceive() | |
305 | { | |
306 | char buff; | |
307 | size_t size; | |
308 | int received_msgs = 0; | |
309 | unsigned int priority; | |
310 | do { | |
311 | global_queue->receive(&buff, 1, size, priority); | |
312 | ++received_msgs; | |
313 | } while (size > 0); | |
314 | --received_msgs; | |
315 | //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl; | |
316 | } | |
317 | ||
318 | ||
319 | bool test_multi_sender_receiver() | |
320 | { | |
321 | bool ret = true; | |
322 | //std::cout << "Testing multi-sender / multi-receiver " << std::endl; | |
1e59de90 | 323 | BOOST_TRY { |
7c673cae FG |
324 | boost::interprocess::message_queue::remove(test::get_process_id_name()); |
325 | boost::interprocess::message_queue mq | |
326 | (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1); | |
327 | global_queue = &mq; | |
328 | std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2); | |
329 | ||
330 | //Launch senders receiver thread | |
1e59de90 | 331 | for (std::size_t i = 0; i < MULTI_THREAD_COUNT; i++) { |
7c673cae FG |
332 | boost::interprocess::ipcdetail::thread_launch |
333 | (threads[i], &multisend); | |
334 | } | |
335 | ||
1e59de90 | 336 | for (std::size_t i = 0; i < MULTI_THREAD_COUNT; i++) { |
7c673cae FG |
337 | boost::interprocess::ipcdetail::thread_launch |
338 | (threads[MULTI_THREAD_COUNT+i], &multireceive); | |
339 | } | |
340 | ||
1e59de90 | 341 | for (std::size_t i = 0; i < MULTI_THREAD_COUNT*2; i++) { |
7c673cae FG |
342 | boost::interprocess::ipcdetail::thread_join(threads[i]); |
343 | //std::cout << "Joined thread " << i << std::endl; | |
344 | } | |
345 | } | |
1e59de90 | 346 | BOOST_CATCH(std::exception &e) { |
7c673cae FG |
347 | std::cout << "error " << e.what() << std::endl; |
348 | ret = false; | |
1e59de90 | 349 | } BOOST_CATCH_END |
7c673cae FG |
350 | boost::interprocess::message_queue::remove(test::get_process_id_name()); |
351 | return ret; | |
352 | } | |
353 | ||
1e59de90 TL |
354 | class msg_queue_named_test_wrapper |
355 | : public test::named_sync_deleter<message_queue>, public message_queue | |
356 | { | |
357 | public: | |
358 | ||
359 | msg_queue_named_test_wrapper(create_only_t) | |
360 | : message_queue(create_only, test::get_process_id_name(), 10, 10) | |
361 | {} | |
362 | ||
363 | msg_queue_named_test_wrapper(open_only_t) | |
364 | : message_queue(open_only, test::get_process_id_name()) | |
365 | {} | |
366 | ||
367 | msg_queue_named_test_wrapper(open_or_create_t) | |
368 | : message_queue(open_or_create, test::get_process_id_name(), 10, 10) | |
369 | {} | |
370 | ||
371 | ~msg_queue_named_test_wrapper() | |
372 | {} | |
373 | }; | |
374 | ||
375 | #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) | |
376 | ||
377 | class msg_queue_named_test_wrapper_w | |
378 | : public test::named_sync_deleter_w<message_queue>, public message_queue | |
379 | { | |
380 | public: | |
381 | ||
382 | template <class CharT> | |
383 | msg_queue_named_test_wrapper_w(create_only_t) | |
384 | : message_queue(create_only, test::get_process_id_wname(), 10, 10) | |
385 | {} | |
386 | ||
387 | msg_queue_named_test_wrapper_w(open_only_t) | |
388 | : message_queue(open_only, test::get_process_id_wname()) | |
389 | {} | |
390 | ||
391 | msg_queue_named_test_wrapper_w(open_or_create_t) | |
392 | : message_queue(open_or_create, test::get_process_id_wname(), 10, 10) | |
393 | {} | |
394 | ||
395 | ~msg_queue_named_test_wrapper_w() | |
396 | {} | |
397 | }; | |
398 | ||
399 | #endif //defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) | |
400 | ||
7c673cae FG |
401 | |
402 | int main () | |
403 | { | |
1e59de90 TL |
404 | int ret = 0; |
405 | BOOST_TRY{ | |
406 | message_queue::remove(test::get_process_id_name()); | |
407 | test::test_named_creation<msg_queue_named_test_wrapper>(); | |
408 | #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) | |
409 | test::test_named_creation<msg_queue_named_test_wrapper>(); | |
410 | #endif | |
411 | ||
412 | if(!test_priority_order()){ | |
413 | return 1; | |
414 | } | |
7c673cae | 415 | |
1e59de90 TL |
416 | if(!test_serialize_db()){ |
417 | return 1; | |
418 | } | |
7c673cae | 419 | |
1e59de90 TL |
420 | if(!test_buffer_overflow()){ |
421 | return 1; | |
422 | } | |
7c673cae | 423 | |
1e59de90 TL |
424 | if(!test_multi_sender_receiver()){ |
425 | return 1; | |
426 | } | |
7c673cae | 427 | } |
1e59de90 TL |
428 | BOOST_CATCH(std::exception &ex) { |
429 | std::cout << ex.what() << std::endl; | |
430 | ret = 1; | |
431 | } BOOST_CATCH_END | |
432 | ||
433 | message_queue::remove(test::get_process_id_name()); | |
434 | return ret; | |
7c673cae FG |
435 | } |
436 |