]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | ////////////////////////////////////////////////////////////////////////////// |
2 | // | |
3 | // (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost | |
4 | // Software License, Version 1.0. (See accompanying file | |
5 | // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | // See http://www.boost.org/libs/interprocess for documentation. | |
8 | // | |
9 | ////////////////////////////////////////////////////////////////////////////// | |
10 | ||
7c673cae FG |
11 | #include <boost/interprocess/ipc/message_queue.hpp> |
12 | #include <boost/interprocess/managed_external_buffer.hpp> | |
13 | #include <boost/interprocess/managed_heap_memory.hpp> | |
14 | #include <boost/interprocess/containers/map.hpp> | |
15 | #include <boost/interprocess/containers/set.hpp> | |
16 | #include <boost/interprocess/allocators/node_allocator.hpp> | |
17 | #include <boost/interprocess/detail/os_thread_functions.hpp> | |
18 | // intrusive/detail | |
19 | #include <boost/intrusive/detail/minimal_pair_header.hpp> | |
20 | #include <boost/intrusive/detail/minimal_less_equal_header.hpp> | |
21 | ||
b32b8144 FG |
22 | #include <boost/move/unique_ptr.hpp> |
23 | ||
7c673cae FG |
24 | #include <cstddef> |
25 | #include <memory> | |
26 | #include <iostream> | |
27 | #include <vector> | |
28 | #include <stdexcept> | |
29 | #include <limits> | |
30 | ||
31 | #include "get_process_id_name.hpp" | |
32 | ||
33 | //////////////////////////////////////////////////////////////////////////////// | |
34 | // // | |
35 | // This example tests the process shared message queue. // | |
36 | // // | |
37 | //////////////////////////////////////////////////////////////////////////////// | |
38 | ||
39 | using namespace boost::interprocess; | |
40 | ||
41 | //This test inserts messages with different priority and marks them with a | |
42 | //time-stamp to check if receiver obtains highest priority messages first and | |
43 | //messages with same priority are received in fifo order | |
44 | bool test_priority_order() | |
45 | { | |
46 | message_queue::remove(test::get_process_id_name()); | |
47 | { | |
48 | message_queue mq1 | |
49 | (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)), | |
50 | mq2 | |
51 | (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)); | |
52 | ||
53 | //We test that the queue is ordered by priority and in the | |
54 | //same priority, is a FIFO | |
55 | message_queue::size_type recvd = 0; | |
56 | unsigned int priority = 0; | |
57 | std::size_t tstamp; | |
58 | unsigned int priority_prev; | |
59 | std::size_t tstamp_prev; | |
60 | ||
61 | //We will send 100 message with priority 0-9 | |
62 | //The message will contain the timestamp of the message | |
63 | for(std::size_t i = 0; i < 100; ++i){ | |
64 | tstamp = i; | |
65 | mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10)); | |
66 | } | |
67 | ||
68 | priority_prev = (std::numeric_limits<unsigned int>::max)(); | |
69 | tstamp_prev = 0; | |
70 | ||
71 | //Receive all messages and test those are ordered | |
72 | //by priority and by FIFO in the same priority | |
73 | for(std::size_t i = 0; i < 100; ++i){ | |
74 | mq1.receive(&tstamp, sizeof(tstamp), recvd, priority); | |
75 | if(priority > priority_prev) | |
76 | return false; | |
77 | if(priority == priority_prev && | |
78 | tstamp <= tstamp_prev){ | |
79 | return false; | |
80 | } | |
81 | priority_prev = priority; | |
82 | tstamp_prev = tstamp; | |
83 | } | |
84 | ||
85 | //Now retry it with different priority order | |
86 | for(std::size_t i = 0; i < 100; ++i){ | |
87 | tstamp = i; | |
88 | mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10)); | |
89 | } | |
90 | ||
91 | priority_prev = (std::numeric_limits<unsigned int>::max)(); | |
92 | tstamp_prev = 0; | |
93 | ||
94 | //Receive all messages and test those are ordered | |
95 | //by priority and by FIFO in the same priority | |
96 | for(std::size_t i = 0; i < 100; ++i){ | |
97 | mq1.receive(&tstamp, sizeof(tstamp), recvd, priority); | |
98 | if(priority > priority_prev) | |
99 | return false; | |
100 | if(priority == priority_prev && | |
101 | tstamp <= tstamp_prev){ | |
102 | return false; | |
103 | } | |
104 | priority_prev = priority; | |
105 | tstamp_prev = tstamp; | |
106 | } | |
107 | } | |
108 | message_queue::remove(test::get_process_id_name()); | |
109 | return true; | |
110 | } | |
111 | ||
112 | //[message_queue_test_test_serialize_db | |
113 | //This test creates a in memory data-base using Interprocess machinery and | |
114 | //serializes it through a message queue. Then rebuilds the data-base in | |
115 | //another buffer and checks it against the original data-base | |
116 | bool test_serialize_db() | |
117 | { | |
118 | //Typedef data to create a Interprocess map | |
119 | typedef std::pair<const std::size_t, std::size_t> MyPair; | |
120 | typedef std::less<std::size_t> MyLess; | |
121 | typedef node_allocator<MyPair, managed_external_buffer::segment_manager> | |
122 | node_allocator_t; | |
123 | typedef map<std::size_t, | |
124 | std::size_t, | |
125 | std::less<std::size_t>, | |
126 | node_allocator_t> | |
127 | MyMap; | |
128 | ||
129 | //Some constants | |
130 | const std::size_t BufferSize = 65536; | |
131 | const std::size_t MaxMsgSize = 100; | |
132 | ||
133 | //Allocate a memory buffer to hold the destiny database using vector<char> | |
134 | std::vector<char> buffer_destiny(BufferSize, 0); | |
135 | ||
136 | message_queue::remove(test::get_process_id_name()); | |
137 | { | |
138 | //Create the message-queues | |
139 | message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize); | |
140 | ||
141 | //Open previously created message-queue simulating other process | |
142 | message_queue mq2(open_only, test::get_process_id_name()); | |
143 | ||
144 | //A managed heap memory to create the origin database | |
145 | managed_heap_memory db_origin(buffer_destiny.size()); | |
146 | ||
147 | //Construct the map in the first buffer | |
148 | MyMap *map1 = db_origin.construct<MyMap>("MyMap") | |
149 | (MyLess(), | |
150 | db_origin.get_segment_manager()); | |
151 | if(!map1) | |
152 | return false; | |
153 | ||
154 | //Fill map1 until is full | |
155 | try{ | |
156 | std::size_t i = 0; | |
157 | while(1){ | |
158 | (*map1)[i] = i; | |
159 | ++i; | |
160 | } | |
161 | } | |
162 | catch(boost::interprocess::bad_alloc &){} | |
163 | ||
164 | //Data control data sending through the message queue | |
165 | std::size_t sent = 0; | |
166 | message_queue::size_type recvd = 0; | |
167 | message_queue::size_type total_recvd = 0; | |
168 | unsigned int priority; | |
169 | ||
170 | //Send whole first buffer through the mq1, read it | |
171 | //through mq2 to the second buffer | |
172 | while(1){ | |
173 | //Send a fragment of buffer1 through mq1 | |
174 | std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ? | |
175 | MaxMsgSize : (db_origin.get_size() - sent); | |
176 | mq1.send( &static_cast<char*>(db_origin.get_address())[sent] | |
177 | , bytes_to_send | |
178 | , 0); | |
179 | sent += bytes_to_send; | |
180 | //Receive the fragment through mq2 to buffer_destiny | |
181 | mq2.receive( &buffer_destiny[total_recvd] | |
182 | , BufferSize - recvd | |
183 | , recvd | |
184 | , priority); | |
185 | total_recvd += recvd; | |
186 | ||
187 | //Check if we have received all the buffer | |
188 | if(total_recvd == BufferSize){ | |
189 | break; | |
190 | } | |
191 | } | |
192 | ||
193 | //The buffer will contain a copy of the original database | |
194 | //so let's interpret the buffer with managed_external_buffer | |
195 | managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize); | |
196 | ||
197 | //Let's find the map | |
198 | std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap"); | |
199 | MyMap *map2 = ret.first; | |
200 | ||
201 | //Check if we have found it | |
202 | if(!map2){ | |
203 | return false; | |
204 | } | |
205 | ||
206 | //Check if it is a single variable (not an array) | |
207 | if(ret.second != 1){ | |
208 | return false; | |
209 | } | |
210 | ||
211 | //Now let's compare size | |
212 | if(map1->size() != map2->size()){ | |
213 | return false; | |
214 | } | |
215 | ||
216 | //Now let's compare all db values | |
217 | MyMap::size_type num_elements = map1->size(); | |
218 | for(std::size_t i = 0; i < num_elements; ++i){ | |
219 | if((*map1)[i] != (*map2)[i]){ | |
220 | return false; | |
221 | } | |
222 | } | |
223 | ||
224 | //Destroy maps from db-s | |
225 | db_origin.destroy_ptr(map1); | |
226 | db_destiny.destroy_ptr(map2); | |
227 | } | |
228 | message_queue::remove(test::get_process_id_name()); | |
229 | return true; | |
230 | } | |
231 | //] | |
232 | ||
233 | static const int MsgSize = 10; | |
234 | static const int NumMsg = 1000; | |
235 | static char msgsend [10]; | |
236 | static char msgrecv [10]; | |
237 | ||
238 | static boost::interprocess::message_queue *pmessage_queue; | |
239 | ||
240 | void receiver() | |
241 | { | |
242 | boost::interprocess::message_queue::size_type recvd_size; | |
243 | unsigned int priority; | |
244 | int nummsg = NumMsg; | |
245 | ||
246 | while(nummsg--){ | |
247 | pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority); | |
248 | } | |
249 | } | |
250 | ||
251 | bool test_buffer_overflow() | |
252 | { | |
253 | boost::interprocess::message_queue::remove(test::get_process_id_name()); | |
254 | { | |
b32b8144 | 255 | boost::movelib::unique_ptr<boost::interprocess::message_queue> |
7c673cae FG |
256 | ptr(new boost::interprocess::message_queue |
257 | (create_only, test::get_process_id_name(), 10, 10)); | |
258 | pmessage_queue = ptr.get(); | |
259 | ||
260 | //Launch the receiver thread | |
261 | boost::interprocess::ipcdetail::OS_thread_t thread; | |
262 | boost::interprocess::ipcdetail::thread_launch(thread, &receiver); | |
263 | boost::interprocess::ipcdetail::thread_yield(); | |
264 | ||
265 | int nummsg = NumMsg; | |
266 | ||
267 | while(nummsg--){ | |
268 | pmessage_queue->send(msgsend, MsgSize, 0); | |
269 | } | |
270 | ||
271 | boost::interprocess::ipcdetail::thread_join(thread); | |
272 | } | |
273 | boost::interprocess::message_queue::remove(test::get_process_id_name()); | |
274 | return true; | |
275 | } | |
276 | ||
277 | ||
278 | ////////////////////////////////////////////////////////////////////////////// | |
279 | // | |
280 | // test_multi_sender_receiver is based on Alexander (aalutov's) | |
281 | // testcase for ticket #9221. Many thanks. | |
282 | // | |
283 | ////////////////////////////////////////////////////////////////////////////// | |
284 | ||
285 | static boost::interprocess::message_queue *global_queue = 0; | |
286 | //We'll send MULTI_NUM_MSG_PER_SENDER messages per sender | |
287 | static const int MULTI_NUM_MSG_PER_SENDER = 10000; | |
288 | //Message queue message capacity | |
289 | static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1; | |
290 | //We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers | |
291 | static const int MULTI_THREAD_COUNT = 10; | |
292 | ||
293 | static void multisend() | |
294 | { | |
295 | char buff; | |
296 | for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) { | |
297 | global_queue->send(&buff, 1, 0); | |
298 | } | |
299 | global_queue->send(&buff, 0, 0); | |
300 | //std::cout<<"writer thread complete"<<std::endl; | |
301 | } | |
302 | ||
303 | static void multireceive() | |
304 | { | |
305 | char buff; | |
306 | size_t size; | |
307 | int received_msgs = 0; | |
308 | unsigned int priority; | |
309 | do { | |
310 | global_queue->receive(&buff, 1, size, priority); | |
311 | ++received_msgs; | |
312 | } while (size > 0); | |
313 | --received_msgs; | |
314 | //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl; | |
315 | } | |
316 | ||
317 | ||
318 | bool test_multi_sender_receiver() | |
319 | { | |
320 | bool ret = true; | |
321 | //std::cout << "Testing multi-sender / multi-receiver " << std::endl; | |
322 | try { | |
323 | boost::interprocess::message_queue::remove(test::get_process_id_name()); | |
324 | boost::interprocess::message_queue mq | |
325 | (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1); | |
326 | global_queue = &mq; | |
327 | std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2); | |
328 | ||
329 | //Launch senders receiver thread | |
330 | for (int i = 0; i < MULTI_THREAD_COUNT; i++) { | |
331 | boost::interprocess::ipcdetail::thread_launch | |
332 | (threads[i], &multisend); | |
333 | } | |
334 | ||
335 | for (int i = 0; i < MULTI_THREAD_COUNT; i++) { | |
336 | boost::interprocess::ipcdetail::thread_launch | |
337 | (threads[MULTI_THREAD_COUNT+i], &multireceive); | |
338 | } | |
339 | ||
340 | for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) { | |
341 | boost::interprocess::ipcdetail::thread_join(threads[i]); | |
342 | //std::cout << "Joined thread " << i << std::endl; | |
343 | } | |
344 | } | |
345 | catch (std::exception &e) { | |
346 | std::cout << "error " << e.what() << std::endl; | |
347 | ret = false; | |
348 | } | |
349 | boost::interprocess::message_queue::remove(test::get_process_id_name()); | |
350 | return ret; | |
351 | } | |
352 | ||
353 | ||
354 | int main () | |
355 | { | |
356 | if(!test_priority_order()){ | |
357 | return 1; | |
358 | } | |
359 | ||
360 | if(!test_serialize_db()){ | |
361 | return 1; | |
362 | } | |
363 | ||
364 | if(!test_buffer_overflow()){ | |
365 | return 1; | |
366 | } | |
367 | ||
368 | if(!test_multi_sender_receiver()){ | |
369 | return 1; | |
370 | } | |
371 | ||
372 | return 0; | |
373 | } | |
374 |