]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | ////////////////////////////////////////////////////////////////////////////// |
2 | // | |
3 | // (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost | |
4 | // Software License, Version 1.0. (See accompanying file | |
5 | // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | // See http://www.boost.org/libs/interprocess for documentation. | |
8 | // | |
9 | ////////////////////////////////////////////////////////////////////////////// | |
10 | ||
11 | #include <boost/interprocess/detail/config_begin.hpp> | |
12 | #include <boost/interprocess/ipc/message_queue.hpp> | |
13 | #include <boost/interprocess/managed_external_buffer.hpp> | |
14 | #include <boost/interprocess/managed_heap_memory.hpp> | |
15 | #include <boost/interprocess/containers/map.hpp> | |
16 | #include <boost/interprocess/containers/set.hpp> | |
17 | #include <boost/interprocess/allocators/node_allocator.hpp> | |
18 | #include <boost/interprocess/detail/os_thread_functions.hpp> | |
19 | // intrusive/detail | |
20 | #include <boost/intrusive/detail/minimal_pair_header.hpp> | |
21 | #include <boost/intrusive/detail/minimal_less_equal_header.hpp> | |
22 | ||
23 | #include <cstddef> | |
24 | #include <memory> | |
25 | #include <iostream> | |
26 | #include <vector> | |
27 | #include <stdexcept> | |
28 | #include <limits> | |
29 | ||
30 | #include "get_process_id_name.hpp" | |
31 | ||
32 | //////////////////////////////////////////////////////////////////////////////// | |
33 | // // | |
34 | // This example tests the process shared message queue. // | |
35 | // // | |
36 | //////////////////////////////////////////////////////////////////////////////// | |
37 | ||
38 | using namespace boost::interprocess; | |
39 | ||
40 | //This test inserts messages with different priority and marks them with a | |
41 | //time-stamp to check if receiver obtains highest priority messages first and | |
42 | //messages with same priority are received in fifo order | |
43 | bool test_priority_order() | |
44 | { | |
45 | message_queue::remove(test::get_process_id_name()); | |
46 | { | |
47 | message_queue mq1 | |
48 | (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)), | |
49 | mq2 | |
50 | (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)); | |
51 | ||
52 | //We test that the queue is ordered by priority and in the | |
53 | //same priority, is a FIFO | |
54 | message_queue::size_type recvd = 0; | |
55 | unsigned int priority = 0; | |
56 | std::size_t tstamp; | |
57 | unsigned int priority_prev; | |
58 | std::size_t tstamp_prev; | |
59 | ||
60 | //We will send 100 message with priority 0-9 | |
61 | //The message will contain the timestamp of the message | |
62 | for(std::size_t i = 0; i < 100; ++i){ | |
63 | tstamp = i; | |
64 | mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10)); | |
65 | } | |
66 | ||
67 | priority_prev = (std::numeric_limits<unsigned int>::max)(); | |
68 | tstamp_prev = 0; | |
69 | ||
70 | //Receive all messages and test those are ordered | |
71 | //by priority and by FIFO in the same priority | |
72 | for(std::size_t i = 0; i < 100; ++i){ | |
73 | mq1.receive(&tstamp, sizeof(tstamp), recvd, priority); | |
74 | if(priority > priority_prev) | |
75 | return false; | |
76 | if(priority == priority_prev && | |
77 | tstamp <= tstamp_prev){ | |
78 | return false; | |
79 | } | |
80 | priority_prev = priority; | |
81 | tstamp_prev = tstamp; | |
82 | } | |
83 | ||
84 | //Now retry it with different priority order | |
85 | for(std::size_t i = 0; i < 100; ++i){ | |
86 | tstamp = i; | |
87 | mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10)); | |
88 | } | |
89 | ||
90 | priority_prev = (std::numeric_limits<unsigned int>::max)(); | |
91 | tstamp_prev = 0; | |
92 | ||
93 | //Receive all messages and test those are ordered | |
94 | //by priority and by FIFO in the same priority | |
95 | for(std::size_t i = 0; i < 100; ++i){ | |
96 | mq1.receive(&tstamp, sizeof(tstamp), recvd, priority); | |
97 | if(priority > priority_prev) | |
98 | return false; | |
99 | if(priority == priority_prev && | |
100 | tstamp <= tstamp_prev){ | |
101 | return false; | |
102 | } | |
103 | priority_prev = priority; | |
104 | tstamp_prev = tstamp; | |
105 | } | |
106 | } | |
107 | message_queue::remove(test::get_process_id_name()); | |
108 | return true; | |
109 | } | |
110 | ||
111 | //[message_queue_test_test_serialize_db | |
112 | //This test creates a in memory data-base using Interprocess machinery and | |
113 | //serializes it through a message queue. Then rebuilds the data-base in | |
114 | //another buffer and checks it against the original data-base | |
115 | bool test_serialize_db() | |
116 | { | |
117 | //Typedef data to create a Interprocess map | |
118 | typedef std::pair<const std::size_t, std::size_t> MyPair; | |
119 | typedef std::less<std::size_t> MyLess; | |
120 | typedef node_allocator<MyPair, managed_external_buffer::segment_manager> | |
121 | node_allocator_t; | |
122 | typedef map<std::size_t, | |
123 | std::size_t, | |
124 | std::less<std::size_t>, | |
125 | node_allocator_t> | |
126 | MyMap; | |
127 | ||
128 | //Some constants | |
129 | const std::size_t BufferSize = 65536; | |
130 | const std::size_t MaxMsgSize = 100; | |
131 | ||
132 | //Allocate a memory buffer to hold the destiny database using vector<char> | |
133 | std::vector<char> buffer_destiny(BufferSize, 0); | |
134 | ||
135 | message_queue::remove(test::get_process_id_name()); | |
136 | { | |
137 | //Create the message-queues | |
138 | message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize); | |
139 | ||
140 | //Open previously created message-queue simulating other process | |
141 | message_queue mq2(open_only, test::get_process_id_name()); | |
142 | ||
143 | //A managed heap memory to create the origin database | |
144 | managed_heap_memory db_origin(buffer_destiny.size()); | |
145 | ||
146 | //Construct the map in the first buffer | |
147 | MyMap *map1 = db_origin.construct<MyMap>("MyMap") | |
148 | (MyLess(), | |
149 | db_origin.get_segment_manager()); | |
150 | if(!map1) | |
151 | return false; | |
152 | ||
153 | //Fill map1 until is full | |
154 | try{ | |
155 | std::size_t i = 0; | |
156 | while(1){ | |
157 | (*map1)[i] = i; | |
158 | ++i; | |
159 | } | |
160 | } | |
161 | catch(boost::interprocess::bad_alloc &){} | |
162 | ||
163 | //Data control data sending through the message queue | |
164 | std::size_t sent = 0; | |
165 | message_queue::size_type recvd = 0; | |
166 | message_queue::size_type total_recvd = 0; | |
167 | unsigned int priority; | |
168 | ||
169 | //Send whole first buffer through the mq1, read it | |
170 | //through mq2 to the second buffer | |
171 | while(1){ | |
172 | //Send a fragment of buffer1 through mq1 | |
173 | std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ? | |
174 | MaxMsgSize : (db_origin.get_size() - sent); | |
175 | mq1.send( &static_cast<char*>(db_origin.get_address())[sent] | |
176 | , bytes_to_send | |
177 | , 0); | |
178 | sent += bytes_to_send; | |
179 | //Receive the fragment through mq2 to buffer_destiny | |
180 | mq2.receive( &buffer_destiny[total_recvd] | |
181 | , BufferSize - recvd | |
182 | , recvd | |
183 | , priority); | |
184 | total_recvd += recvd; | |
185 | ||
186 | //Check if we have received all the buffer | |
187 | if(total_recvd == BufferSize){ | |
188 | break; | |
189 | } | |
190 | } | |
191 | ||
192 | //The buffer will contain a copy of the original database | |
193 | //so let's interpret the buffer with managed_external_buffer | |
194 | managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize); | |
195 | ||
196 | //Let's find the map | |
197 | std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap"); | |
198 | MyMap *map2 = ret.first; | |
199 | ||
200 | //Check if we have found it | |
201 | if(!map2){ | |
202 | return false; | |
203 | } | |
204 | ||
205 | //Check if it is a single variable (not an array) | |
206 | if(ret.second != 1){ | |
207 | return false; | |
208 | } | |
209 | ||
210 | //Now let's compare size | |
211 | if(map1->size() != map2->size()){ | |
212 | return false; | |
213 | } | |
214 | ||
215 | //Now let's compare all db values | |
216 | MyMap::size_type num_elements = map1->size(); | |
217 | for(std::size_t i = 0; i < num_elements; ++i){ | |
218 | if((*map1)[i] != (*map2)[i]){ | |
219 | return false; | |
220 | } | |
221 | } | |
222 | ||
223 | //Destroy maps from db-s | |
224 | db_origin.destroy_ptr(map1); | |
225 | db_destiny.destroy_ptr(map2); | |
226 | } | |
227 | message_queue::remove(test::get_process_id_name()); | |
228 | return true; | |
229 | } | |
230 | //] | |
231 | ||
232 | static const int MsgSize = 10; | |
233 | static const int NumMsg = 1000; | |
234 | static char msgsend [10]; | |
235 | static char msgrecv [10]; | |
236 | ||
237 | static boost::interprocess::message_queue *pmessage_queue; | |
238 | ||
239 | void receiver() | |
240 | { | |
241 | boost::interprocess::message_queue::size_type recvd_size; | |
242 | unsigned int priority; | |
243 | int nummsg = NumMsg; | |
244 | ||
245 | while(nummsg--){ | |
246 | pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority); | |
247 | } | |
248 | } | |
249 | ||
250 | bool test_buffer_overflow() | |
251 | { | |
252 | boost::interprocess::message_queue::remove(test::get_process_id_name()); | |
253 | { | |
254 | std::auto_ptr<boost::interprocess::message_queue> | |
255 | ptr(new boost::interprocess::message_queue | |
256 | (create_only, test::get_process_id_name(), 10, 10)); | |
257 | pmessage_queue = ptr.get(); | |
258 | ||
259 | //Launch the receiver thread | |
260 | boost::interprocess::ipcdetail::OS_thread_t thread; | |
261 | boost::interprocess::ipcdetail::thread_launch(thread, &receiver); | |
262 | boost::interprocess::ipcdetail::thread_yield(); | |
263 | ||
264 | int nummsg = NumMsg; | |
265 | ||
266 | while(nummsg--){ | |
267 | pmessage_queue->send(msgsend, MsgSize, 0); | |
268 | } | |
269 | ||
270 | boost::interprocess::ipcdetail::thread_join(thread); | |
271 | } | |
272 | boost::interprocess::message_queue::remove(test::get_process_id_name()); | |
273 | return true; | |
274 | } | |
275 | ||
276 | ||
277 | ////////////////////////////////////////////////////////////////////////////// | |
278 | // | |
279 | // test_multi_sender_receiver is based on Alexander (aalutov's) | |
280 | // testcase for ticket #9221. Many thanks. | |
281 | // | |
282 | ////////////////////////////////////////////////////////////////////////////// | |
283 | ||
284 | static boost::interprocess::message_queue *global_queue = 0; | |
285 | //We'll send MULTI_NUM_MSG_PER_SENDER messages per sender | |
286 | static const int MULTI_NUM_MSG_PER_SENDER = 10000; | |
287 | //Message queue message capacity | |
288 | static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1; | |
289 | //We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers | |
290 | static const int MULTI_THREAD_COUNT = 10; | |
291 | ||
292 | static void multisend() | |
293 | { | |
294 | char buff; | |
295 | for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) { | |
296 | global_queue->send(&buff, 1, 0); | |
297 | } | |
298 | global_queue->send(&buff, 0, 0); | |
299 | //std::cout<<"writer thread complete"<<std::endl; | |
300 | } | |
301 | ||
302 | static void multireceive() | |
303 | { | |
304 | char buff; | |
305 | size_t size; | |
306 | int received_msgs = 0; | |
307 | unsigned int priority; | |
308 | do { | |
309 | global_queue->receive(&buff, 1, size, priority); | |
310 | ++received_msgs; | |
311 | } while (size > 0); | |
312 | --received_msgs; | |
313 | //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl; | |
314 | } | |
315 | ||
316 | ||
317 | bool test_multi_sender_receiver() | |
318 | { | |
319 | bool ret = true; | |
320 | //std::cout << "Testing multi-sender / multi-receiver " << std::endl; | |
321 | try { | |
322 | boost::interprocess::message_queue::remove(test::get_process_id_name()); | |
323 | boost::interprocess::message_queue mq | |
324 | (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1); | |
325 | global_queue = &mq; | |
326 | std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2); | |
327 | ||
328 | //Launch senders receiver thread | |
329 | for (int i = 0; i < MULTI_THREAD_COUNT; i++) { | |
330 | boost::interprocess::ipcdetail::thread_launch | |
331 | (threads[i], &multisend); | |
332 | } | |
333 | ||
334 | for (int i = 0; i < MULTI_THREAD_COUNT; i++) { | |
335 | boost::interprocess::ipcdetail::thread_launch | |
336 | (threads[MULTI_THREAD_COUNT+i], &multireceive); | |
337 | } | |
338 | ||
339 | for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) { | |
340 | boost::interprocess::ipcdetail::thread_join(threads[i]); | |
341 | //std::cout << "Joined thread " << i << std::endl; | |
342 | } | |
343 | } | |
344 | catch (std::exception &e) { | |
345 | std::cout << "error " << e.what() << std::endl; | |
346 | ret = false; | |
347 | } | |
348 | boost::interprocess::message_queue::remove(test::get_process_id_name()); | |
349 | return ret; | |
350 | } | |
351 | ||
352 | ||
353 | int main () | |
354 | { | |
355 | if(!test_priority_order()){ | |
356 | return 1; | |
357 | } | |
358 | ||
359 | if(!test_serialize_db()){ | |
360 | return 1; | |
361 | } | |
362 | ||
363 | if(!test_buffer_overflow()){ | |
364 | return 1; | |
365 | } | |
366 | ||
367 | if(!test_multi_sender_receiver()){ | |
368 | return 1; | |
369 | } | |
370 | ||
371 | return 0; | |
372 | } | |
373 | ||
374 | #include <boost/interprocess/detail/config_end.hpp> |