2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
9 * \file util_ipc_reliable_mq.cpp
11 * \author Andrey Semashev
14 * \brief The test verifies that \c ipc::reliable_message_queue works.
17 #define BOOST_TEST_MODULE util_ipc_reliable_mq
19 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
20 #include <boost/log/utility/ipc/object_name.hpp>
21 #include <boost/log/utility/permissions.hpp>
22 #include <boost/log/utility/open_mode.hpp>
23 #include <boost/log/exceptions.hpp>
24 #include <boost/test/unit_test.hpp>
31 #include <boost/move/utility_core.hpp>
32 #if !defined(BOOST_LOG_NO_THREADS)
34 #include <boost/ref.hpp>
35 #include <boost/atomic/fences.hpp>
36 #include <boost/thread/thread.hpp>
37 #include <boost/chrono/duration.hpp>
39 #include "char_definitions.hpp"
41 typedef boost::log::ipc::reliable_message_queue queue_t
;
42 typedef queue_t::size_type size_type
;
44 const boost::log::ipc::object_name
ipc_queue_name(boost::log::ipc::object_name::session
, "boost_log_test_ipc_reliable_mq");
45 const unsigned int capacity
= 512;
46 const size_type block_size
= 1024;
47 const char message1
[] = "Hello, world!";
48 const char message2
[] = "Hello, the brand new world!";
50 BOOST_AUTO_TEST_CASE(basic_functionality
)
52 // Default constructor.
55 BOOST_CHECK(!queue
.is_open());
58 // Do a remove in case if a previous test failed
59 queue_t::remove(ipc_queue_name
);
61 // Opening a non-existing queue
64 queue_t
queue(boost::log::open_mode::open_only
, ipc_queue_name
);
65 BOOST_FAIL("Non-existing queue open succeeded, although it shouldn't have");
67 catch (std::exception
&)
69 BOOST_TEST_PASSPOINT();
72 // Create constructor and destructor.
74 queue_t
queue(boost::log::open_mode::create_only
, ipc_queue_name
, capacity
, block_size
);
75 BOOST_CHECK(equal_strings(queue
.name().c_str(), ipc_queue_name
.c_str()));
76 BOOST_CHECK(queue
.is_open());
77 BOOST_CHECK_EQUAL(queue
.capacity(), capacity
);
78 BOOST_CHECK_EQUAL(queue
.block_size(), block_size
);
81 // Creating a duplicate queue
84 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, capacity
, block_size
);
85 queue_t
queue_b(boost::log::open_mode::create_only
, ipc_queue_name
, capacity
, block_size
);
86 BOOST_FAIL("Creating a duplicate queue succeeded, although it shouldn't have");
88 catch (std::exception
&)
90 BOOST_TEST_PASSPOINT();
93 // Opening an existing queue
95 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, capacity
, block_size
);
96 BOOST_CHECK(queue_a
.is_open());
98 queue_t
queue_b(boost::log::open_mode::open_or_create
, ipc_queue_name
, capacity
* 2u, block_size
* 2u); // queue geometry differs from the existing queue
99 BOOST_CHECK(queue_b
.is_open());
100 BOOST_CHECK(equal_strings(queue_b
.name().c_str(), ipc_queue_name
.c_str()));
101 BOOST_CHECK_EQUAL(queue_b
.capacity(), capacity
);
102 BOOST_CHECK_EQUAL(queue_b
.block_size(), block_size
);
104 queue_t
queue_c(boost::log::open_mode::open_only
, ipc_queue_name
);
105 BOOST_CHECK(queue_c
.is_open());
106 BOOST_CHECK(equal_strings(queue_c
.name().c_str(), ipc_queue_name
.c_str()));
107 BOOST_CHECK_EQUAL(queue_c
.capacity(), capacity
);
108 BOOST_CHECK_EQUAL(queue_c
.block_size(), block_size
);
112 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, capacity
, block_size
);
113 BOOST_CHECK(queue_a
.is_open());
115 BOOST_CHECK(!queue_a
.is_open());
118 BOOST_CHECK(!queue_a
.is_open());
122 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, capacity
, block_size
);
123 queue_t
queue_b(boost::move(queue_a
));
124 BOOST_CHECK(!queue_a
.is_open());
125 BOOST_CHECK(equal_strings(queue_b
.name().c_str(), ipc_queue_name
.c_str()));
126 BOOST_CHECK(queue_b
.is_open());
127 BOOST_CHECK_EQUAL(queue_b
.capacity(), capacity
);
128 BOOST_CHECK_EQUAL(queue_b
.block_size(), block_size
);
130 // Move assignment operator.
132 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, capacity
, block_size
);
134 queue_b
= boost::move(queue_a
);
135 BOOST_CHECK(!queue_a
.is_open());
136 BOOST_CHECK(equal_strings(queue_b
.name().c_str(), ipc_queue_name
.c_str()));
137 BOOST_CHECK(queue_b
.is_open());
138 BOOST_CHECK_EQUAL(queue_b
.capacity(), capacity
);
139 BOOST_CHECK_EQUAL(queue_b
.block_size(), block_size
);
141 // Member and non-member swaps.
143 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, capacity
, block_size
);
144 queue_a
.swap(queue_a
);
145 BOOST_CHECK(queue_a
.is_open());
146 BOOST_CHECK(equal_strings(queue_a
.name().c_str(), ipc_queue_name
.c_str()));
147 BOOST_CHECK_EQUAL(queue_a
.capacity(), capacity
);
148 BOOST_CHECK_EQUAL(queue_a
.block_size(), block_size
);
151 swap(queue_a
, queue_b
);
152 BOOST_CHECK(!queue_a
.is_open());
153 BOOST_CHECK(queue_b
.is_open());
154 BOOST_CHECK(equal_strings(queue_b
.name().c_str(), ipc_queue_name
.c_str()));
155 BOOST_CHECK_EQUAL(queue_b
.capacity(), capacity
);
156 BOOST_CHECK_EQUAL(queue_b
.block_size(), block_size
);
160 BOOST_AUTO_TEST_CASE(message_passing
)
162 // try_send() and try_receive()
164 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, 1u, block_size
);
165 queue_t
queue_b(boost::log::open_mode::open_only
, ipc_queue_name
);
166 BOOST_CHECK(queue_a
.try_send(message1
, sizeof(message1
) - 1u));
167 BOOST_CHECK(!queue_a
.try_send(message2
, sizeof(message2
) - 1u));
168 char buffer
[block_size
] = {};
169 size_type message_size
= 0u;
170 BOOST_CHECK(queue_b
.try_receive(buffer
, sizeof(buffer
), message_size
));
171 BOOST_CHECK_EQUAL(message_size
, sizeof(message1
) - 1u);
172 BOOST_CHECK(std::memcmp(buffer
, message1
, message_size
) == 0);
173 BOOST_CHECK(!queue_b
.try_receive(buffer
, sizeof(buffer
), message_size
));
175 BOOST_CHECK(queue_a
.try_send(message2
, sizeof(message2
) - 1u));
177 BOOST_CHECK(queue_b
.try_receive(msg
));
178 BOOST_CHECK_EQUAL(msg
.size(), sizeof(message2
) - 1u);
179 BOOST_CHECK_EQUAL(msg
, message2
);
181 BOOST_CHECK(queue_a
.try_send(message2
, sizeof(message2
) - 1u));
182 std::vector
< unsigned char > buf
;
183 BOOST_CHECK(queue_b
.try_receive(buf
));
184 BOOST_CHECK_EQUAL(buf
.size(), sizeof(message2
) - 1u);
185 BOOST_CHECK(std::memcmp(&buf
[0], message2
, buf
.size()) == 0);
188 // send() and receive() without blocking
190 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, 1u, block_size
);
191 queue_t
queue_b(boost::log::open_mode::open_only
, ipc_queue_name
);
192 BOOST_CHECK(queue_a
.send(message1
, sizeof(message1
) - 1u) == queue_t::succeeded
);
193 char buffer
[block_size
] = {};
194 size_type message_size
= 0u;
195 BOOST_CHECK(queue_b
.receive(buffer
, sizeof(buffer
), message_size
) == queue_t::succeeded
);
196 BOOST_CHECK_EQUAL(message_size
, sizeof(message1
) - 1u);
197 BOOST_CHECK(std::memcmp(buffer
, message1
, message_size
) == 0);
199 BOOST_CHECK(queue_a
.send(message2
, sizeof(message2
) - 1u) == queue_t::succeeded
);
201 BOOST_CHECK(queue_b
.receive(msg
) == queue_t::succeeded
);
202 BOOST_CHECK_EQUAL(msg
.size(), sizeof(message2
) - 1u);
203 BOOST_CHECK_EQUAL(msg
, message2
);
205 BOOST_CHECK(queue_a
.send(message2
, sizeof(message2
) - 1u) == queue_t::succeeded
);
206 std::vector
< unsigned char > buf
;
207 BOOST_CHECK(queue_b
.receive(buf
) == queue_t::succeeded
);
208 BOOST_CHECK_EQUAL(buf
.size(), sizeof(message2
) - 1u);
209 BOOST_CHECK(std::memcmp(&buf
[0], message2
, buf
.size()) == 0);
212 // send() with an error code on overflow
214 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, 1u, block_size
, queue_t::fail_on_overflow
);
215 BOOST_TEST_PASSPOINT();
216 BOOST_CHECK(queue_a
.send(message1
, sizeof(message1
) - 1u) == queue_t::succeeded
);
217 BOOST_TEST_PASSPOINT();
219 queue_t::operation_result res
= queue_a
.send(message1
, sizeof(message1
) - 1u);
220 BOOST_CHECK_EQUAL(res
, queue_t::no_space
);
223 // send() with an exception on overflow
225 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, 1u, block_size
, queue_t::throw_on_overflow
);
226 BOOST_TEST_PASSPOINT();
227 BOOST_CHECK(queue_a
.send(message1
, sizeof(message1
) - 1u) == queue_t::succeeded
);
228 BOOST_TEST_PASSPOINT();
231 queue_a
.send(message1
, sizeof(message1
) - 1u);
232 BOOST_FAIL("Owerflowing the queue succeeded, although it shouldn't have");
234 catch (boost::log::capacity_limit_reached
&)
236 BOOST_TEST_PASSPOINT();
240 // send() and receive() for messages larger than block_size. The message size and queue capacity below are such
241 // that the last enqueued message is expected to be split in the queue storage.
243 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, 5u, block_size
);
244 queue_t
queue_b(boost::log::open_mode::open_only
, ipc_queue_name
);
246 const size_type message_size
= block_size
* 3u / 2u;
247 std::vector
< unsigned char > send_data
;
248 send_data
.resize(message_size
);
249 for (unsigned int i
= 0; i
< message_size
; ++i
)
250 send_data
[i
] = static_cast< unsigned char >(i
& 0xFF);
252 BOOST_CHECK(queue_a
.send(&send_data
[0], static_cast< size_type
>(send_data
.size())) == queue_t::succeeded
);
254 for (unsigned int i
= 0; i
< 3; ++i
)
256 BOOST_CHECK(queue_a
.send(&send_data
[0], static_cast< size_type
>(send_data
.size())) == queue_t::succeeded
);
258 std::vector
< unsigned char > receive_data
;
259 BOOST_CHECK(queue_b
.receive(receive_data
) == queue_t::succeeded
);
260 BOOST_CHECK_EQUAL_COLLECTIONS(send_data
.begin(), send_data
.end(), receive_data
.begin(), receive_data
.end());
263 std::vector
< unsigned char > receive_data
;
264 BOOST_CHECK(queue_b
.receive(receive_data
) == queue_t::succeeded
);
265 BOOST_CHECK_EQUAL_COLLECTIONS(send_data
.begin(), send_data
.end(), receive_data
.begin(), receive_data
.end());
270 queue_t
queue_a(boost::log::open_mode::create_only
, ipc_queue_name
, 1u, block_size
);
271 queue_t
queue_b(boost::log::open_mode::open_only
, ipc_queue_name
);
272 BOOST_CHECK(queue_a
.try_send(message1
, sizeof(message1
) - 1u));
273 BOOST_CHECK(!queue_a
.try_send(message2
, sizeof(message2
) - 1u));
277 BOOST_CHECK(queue_a
.try_send(message2
, sizeof(message2
) - 1u));
278 char buffer
[block_size
] = {};
279 size_type message_size
= 0u;
280 BOOST_CHECK(queue_b
.try_receive(buffer
, sizeof(buffer
), message_size
));
281 BOOST_CHECK_EQUAL(message_size
, sizeof(message2
) - 1u);
282 BOOST_CHECK(std::memcmp(buffer
, message2
, message_size
) == 0);
286 #if !defined(BOOST_LOG_NO_THREADS)
290 const unsigned int message_count
= 100000;
292 void multithreaded_message_passing_feeding_thread(const char* message
, unsigned int& failure_count
)
294 const size_type len
= static_cast< size_type
>(std::strlen(message
));
295 queue_t
queue(boost::log::open_mode::open_or_create
, ipc_queue_name
, capacity
, block_size
);
296 for (unsigned int i
= 0; i
< message_count
; ++i
)
298 failure_count
+= queue
.send(message
, len
) != queue_t::succeeded
;
301 boost::atomic_thread_fence(boost::memory_order_release
);
306 BOOST_AUTO_TEST_CASE(multithreaded_message_passing
)
308 unsigned int failure_count1
= 0, failure_count2
= 0, failure_count3
= 0;
309 boost::atomic_thread_fence(boost::memory_order_release
);
311 boost::thread
thread1(&multithreaded_message_passing_feeding_thread
, "Thread 1", boost::ref(failure_count1
));
312 boost::thread
thread2(&multithreaded_message_passing_feeding_thread
, "Thread 2", boost::ref(failure_count2
));
313 boost::thread
thread3(&multithreaded_message_passing_feeding_thread
, "Thread 3", boost::ref(failure_count3
));
315 BOOST_TEST_PASSPOINT();
317 queue_t
queue(boost::log::open_mode::open_or_create
, ipc_queue_name
, capacity
, block_size
);
318 unsigned int receive_failures
= 0, receive_corruptions
= 0;
319 unsigned int message_count1
= 0, message_count2
= 0, message_count3
= 0;
322 BOOST_TEST_PASSPOINT();
324 for (unsigned int i
= 0; i
< message_count
* 3u; ++i
)
327 if (queue
.receive(msg
) == queue_t::succeeded
)
329 if (msg
== "Thread 1")
331 else if (msg
== "Thread 2")
333 else if (msg
== "Thread 3")
336 ++receive_corruptions
;
342 BOOST_TEST_PASSPOINT();
345 BOOST_TEST_PASSPOINT();
348 BOOST_TEST_PASSPOINT();
351 boost::atomic_thread_fence(boost::memory_order_acquire
);
353 BOOST_CHECK_EQUAL(failure_count1
, 0u);
354 BOOST_CHECK_EQUAL(message_count1
, message_count
);
355 BOOST_CHECK_EQUAL(failure_count2
, 0u);
356 BOOST_CHECK_EQUAL(message_count2
, message_count
);
357 BOOST_CHECK_EQUAL(failure_count3
, 0u);
358 BOOST_CHECK_EQUAL(message_count3
, message_count
);
359 BOOST_CHECK_EQUAL(receive_failures
, 0u);
360 BOOST_CHECK_EQUAL(receive_corruptions
, 0u);
365 void stop_reset_feeding_thread(queue_t
& queue
, queue_t::operation_result
* results
, unsigned int count
)
367 for (unsigned int i
= 0; i
< count
; ++i
)
369 results
[i
] = queue
.send(message1
, sizeof(message1
) - 1u);
370 if (results
[i
] != queue_t::succeeded
)
374 boost::atomic_thread_fence(boost::memory_order_release
);
377 void stop_reset_reading_thread(queue_t
& queue
, queue_t::operation_result
* results
, unsigned int count
)
380 for (unsigned int i
= 0; i
< count
; ++i
)
383 results
[i
] = queue
.receive(msg
);
384 if (results
[i
] != queue_t::succeeded
)
388 boost::atomic_thread_fence(boost::memory_order_release
);
393 BOOST_AUTO_TEST_CASE(stop_reset_local
)
395 queue_t
feeder_queue(boost::log::open_mode::open_or_create
, ipc_queue_name
, 1u, block_size
);
396 queue_t::operation_result feeder_results
[3];
397 queue_t
reader_queue(boost::log::open_mode::open_only
, ipc_queue_name
);
398 queue_t::operation_result reader_results
[3];
400 std::fill_n(feeder_results
, sizeof(feeder_results
) / sizeof(*feeder_results
), queue_t::succeeded
);
401 std::fill_n(reader_results
, sizeof(reader_results
) / sizeof(*reader_results
), queue_t::succeeded
);
402 boost::atomic_thread_fence(boost::memory_order_release
);
404 BOOST_TEST_PASSPOINT();
406 // Case 1: Let the feeder block and then we unblock it with stop_local()
407 boost::thread
feeder_thread(&stop_reset_feeding_thread
, boost::ref(feeder_queue
), feeder_results
, 3);
408 boost::thread
reader_thread(&stop_reset_reading_thread
, boost::ref(reader_queue
), reader_results
, 1);
410 BOOST_TEST_PASSPOINT();
412 reader_thread
.join();
413 BOOST_TEST_PASSPOINT();
414 boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
416 BOOST_TEST_PASSPOINT();
418 feeder_queue
.stop_local();
419 BOOST_TEST_PASSPOINT();
420 feeder_thread
.join();
422 boost::atomic_thread_fence(boost::memory_order_acquire
);
424 BOOST_CHECK_EQUAL(feeder_results
[0], queue_t::succeeded
);
425 BOOST_CHECK_EQUAL(feeder_results
[1], queue_t::succeeded
);
426 BOOST_CHECK_EQUAL(feeder_results
[2], queue_t::aborted
);
427 BOOST_CHECK_EQUAL(reader_results
[0], queue_t::succeeded
);
429 // Reset the aborted queue
430 feeder_queue
.reset_local();
431 feeder_queue
.clear();
433 std::fill_n(feeder_results
, sizeof(feeder_results
) / sizeof(*feeder_results
), queue_t::succeeded
);
434 std::fill_n(reader_results
, sizeof(reader_results
) / sizeof(*reader_results
), queue_t::succeeded
);
435 boost::atomic_thread_fence(boost::memory_order_release
);
437 BOOST_TEST_PASSPOINT();
439 // Case 2: Let the reader block and then we unblock it with stop_local()
440 boost::thread(&stop_reset_feeding_thread
, boost::ref(feeder_queue
), feeder_results
, 1).swap(feeder_thread
);
441 boost::thread(&stop_reset_reading_thread
, boost::ref(reader_queue
), reader_results
, 2).swap(reader_thread
);
443 BOOST_TEST_PASSPOINT();
445 feeder_thread
.join();
446 BOOST_TEST_PASSPOINT();
447 boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
449 BOOST_TEST_PASSPOINT();
451 reader_queue
.stop_local();
452 BOOST_TEST_PASSPOINT();
453 reader_thread
.join();
455 boost::atomic_thread_fence(boost::memory_order_acquire
);
457 BOOST_CHECK_EQUAL(feeder_results
[0], queue_t::succeeded
);
458 BOOST_CHECK_EQUAL(feeder_results
[1], queue_t::succeeded
);
459 BOOST_CHECK_EQUAL(reader_results
[0], queue_t::succeeded
);
460 BOOST_CHECK_EQUAL(reader_results
[1], queue_t::aborted
);
463 #endif // !defined(BOOST_LOG_NO_THREADS)