]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/log/test/run/util_ipc_reliable_mq.cpp
d4b68ff86503e468e4506d9120604fadcaf0d57e
[ceph.git] / ceph / src / boost / libs / log / test / run / util_ipc_reliable_mq.cpp
1 /*
2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
7 */
8 /*!
9 * \file util_ipc_reliable_mq.cpp
10 * \author Lingxi Li
11 * \author Andrey Semashev
12 * \date 19.10.2015
13 *
14 * \brief The test verifies that \c ipc::reliable_message_queue works.
15 */
16
17 #define BOOST_TEST_MODULE util_ipc_reliable_mq
18
19 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
20 #include <boost/log/utility/ipc/object_name.hpp>
21 #include <boost/log/utility/permissions.hpp>
22 #include <boost/log/utility/open_mode.hpp>
23 #include <boost/log/exceptions.hpp>
24 #include <boost/test/unit_test.hpp>
25 #include <cstddef>
26 #include <cstring>
27 #include <string>
28 #include <vector>
29 #include <iostream>
30 #include <stdexcept>
31 #include <boost/move/utility_core.hpp>
32 #if !defined(BOOST_LOG_NO_THREADS)
33 #include <algorithm>
34 #include <boost/ref.hpp>
35 #include <boost/atomic/fences.hpp>
36 #include <boost/thread/thread.hpp>
37 #include <boost/chrono/duration.hpp>
38 #endif
39 #include "char_definitions.hpp"
40
41 typedef boost::log::ipc::reliable_message_queue queue_t;
42 typedef queue_t::size_type size_type;
43
44 const boost::log::ipc::object_name ipc_queue_name(boost::log::ipc::object_name::session, "boost_log_test_ipc_reliable_mq");
45 const unsigned int capacity = 512;
46 const size_type block_size = 1024;
47 const char message1[] = "Hello, world!";
48 const char message2[] = "Hello, the brand new world!";
49
50 BOOST_AUTO_TEST_CASE(basic_functionality)
51 {
52 // Default constructor.
53 {
54 queue_t queue;
55 BOOST_CHECK(!queue.is_open());
56 }
57
58 // Do a remove in case if a previous test failed
59 queue_t::remove(ipc_queue_name);
60
61 // Opening a non-existing queue
62 try
63 {
64 queue_t queue(boost::log::open_mode::open_only, ipc_queue_name);
65 BOOST_FAIL("Non-existing queue open succeeded, although it shouldn't have");
66 }
67 catch (std::exception&)
68 {
69 BOOST_TEST_PASSPOINT();
70 }
71
72 // Create constructor and destructor.
73 {
74 queue_t queue(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
75 BOOST_CHECK(equal_strings(queue.name().c_str(), ipc_queue_name.c_str()));
76 BOOST_CHECK(queue.is_open());
77 BOOST_CHECK_EQUAL(queue.capacity(), capacity);
78 BOOST_CHECK_EQUAL(queue.block_size(), block_size);
79 }
80
81 // Creating a duplicate queue
82 try
83 {
84 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
85 queue_t queue_b(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
86 BOOST_FAIL("Creating a duplicate queue succeeded, although it shouldn't have");
87 }
88 catch (std::exception&)
89 {
90 BOOST_TEST_PASSPOINT();
91 }
92
93 // Opening an existing queue
94 {
95 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
96 BOOST_CHECK(queue_a.is_open());
97
98 queue_t queue_b(boost::log::open_mode::open_or_create, ipc_queue_name, capacity * 2u, block_size * 2u); // queue geometry differs from the existing queue
99 BOOST_CHECK(queue_b.is_open());
100 BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
101 BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
102 BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
103
104 queue_t queue_c(boost::log::open_mode::open_only, ipc_queue_name);
105 BOOST_CHECK(queue_c.is_open());
106 BOOST_CHECK(equal_strings(queue_c.name().c_str(), ipc_queue_name.c_str()));
107 BOOST_CHECK_EQUAL(queue_c.capacity(), capacity);
108 BOOST_CHECK_EQUAL(queue_c.block_size(), block_size);
109 }
110 // Closing a queue
111 {
112 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
113 BOOST_CHECK(queue_a.is_open());
114 queue_a.close();
115 BOOST_CHECK(!queue_a.is_open());
116 // Duplicate close()
117 queue_a.close();
118 BOOST_CHECK(!queue_a.is_open());
119 }
120 // Move constructor.
121 {
122 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
123 queue_t queue_b(boost::move(queue_a));
124 BOOST_CHECK(!queue_a.is_open());
125 BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
126 BOOST_CHECK(queue_b.is_open());
127 BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
128 BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
129 }
130 // Move assignment operator.
131 {
132 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
133 queue_t queue_b;
134 queue_b = boost::move(queue_a);
135 BOOST_CHECK(!queue_a.is_open());
136 BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
137 BOOST_CHECK(queue_b.is_open());
138 BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
139 BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
140 }
141 // Member and non-member swaps.
142 {
143 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
144 queue_a.swap(queue_a);
145 BOOST_CHECK(queue_a.is_open());
146 BOOST_CHECK(equal_strings(queue_a.name().c_str(), ipc_queue_name.c_str()));
147 BOOST_CHECK_EQUAL(queue_a.capacity(), capacity);
148 BOOST_CHECK_EQUAL(queue_a.block_size(), block_size);
149
150 queue_t queue_b;
151 swap(queue_a, queue_b);
152 BOOST_CHECK(!queue_a.is_open());
153 BOOST_CHECK(queue_b.is_open());
154 BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
155 BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
156 BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
157 }
158 }
159
160 BOOST_AUTO_TEST_CASE(message_passing)
161 {
162 // try_send() and try_receive()
163 {
164 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
165 queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
166 BOOST_CHECK(queue_a.try_send(message1, sizeof(message1) - 1u));
167 BOOST_CHECK(!queue_a.try_send(message2, sizeof(message2) - 1u));
168 char buffer[block_size] = {};
169 size_type message_size = 0u;
170 BOOST_CHECK(queue_b.try_receive(buffer, sizeof(buffer), message_size));
171 BOOST_CHECK_EQUAL(message_size, sizeof(message1) - 1u);
172 BOOST_CHECK(std::memcmp(buffer, message1, message_size) == 0);
173 BOOST_CHECK(!queue_b.try_receive(buffer, sizeof(buffer), message_size));
174
175 BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
176 std::string msg;
177 BOOST_CHECK(queue_b.try_receive(msg));
178 BOOST_CHECK_EQUAL(msg.size(), sizeof(message2) - 1u);
179 BOOST_CHECK_EQUAL(msg, message2);
180
181 BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
182 std::vector< unsigned char > buf;
183 BOOST_CHECK(queue_b.try_receive(buf));
184 BOOST_CHECK_EQUAL(buf.size(), sizeof(message2) - 1u);
185 BOOST_CHECK(std::memcmp(&buf[0], message2, buf.size()) == 0);
186 }
187
188 // send() and receive() without blocking
189 {
190 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
191 queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
192 BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
193 char buffer[block_size] = {};
194 size_type message_size = 0u;
195 BOOST_CHECK(queue_b.receive(buffer, sizeof(buffer), message_size) == queue_t::succeeded);
196 BOOST_CHECK_EQUAL(message_size, sizeof(message1) - 1u);
197 BOOST_CHECK(std::memcmp(buffer, message1, message_size) == 0);
198
199 BOOST_CHECK(queue_a.send(message2, sizeof(message2) - 1u) == queue_t::succeeded);
200 std::string msg;
201 BOOST_CHECK(queue_b.receive(msg) == queue_t::succeeded);
202 BOOST_CHECK_EQUAL(msg.size(), sizeof(message2) - 1u);
203 BOOST_CHECK_EQUAL(msg, message2);
204
205 BOOST_CHECK(queue_a.send(message2, sizeof(message2) - 1u) == queue_t::succeeded);
206 std::vector< unsigned char > buf;
207 BOOST_CHECK(queue_b.receive(buf) == queue_t::succeeded);
208 BOOST_CHECK_EQUAL(buf.size(), sizeof(message2) - 1u);
209 BOOST_CHECK(std::memcmp(&buf[0], message2, buf.size()) == 0);
210 }
211
212 // send() with an error code on overflow
213 {
214 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size, queue_t::fail_on_overflow);
215 BOOST_TEST_PASSPOINT();
216 BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
217 BOOST_TEST_PASSPOINT();
218
219 queue_t::operation_result res = queue_a.send(message1, sizeof(message1) - 1u);
220 BOOST_CHECK_EQUAL(res, queue_t::no_space);
221 }
222
223 // send() with an exception on overflow
224 {
225 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size, queue_t::throw_on_overflow);
226 BOOST_TEST_PASSPOINT();
227 BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
228 BOOST_TEST_PASSPOINT();
229 try
230 {
231 queue_a.send(message1, sizeof(message1) - 1u);
232 BOOST_FAIL("Owerflowing the queue succeeded, although it shouldn't have");
233 }
234 catch (boost::log::capacity_limit_reached&)
235 {
236 BOOST_TEST_PASSPOINT();
237 }
238 }
239
240 // send() and receive() for messages larger than block_size. The message size and queue capacity below are such
241 // that the last enqueued message is expected to be split in the queue storage.
242 {
243 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 5u, block_size);
244 queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
245
246 const size_type message_size = block_size * 3u / 2u;
247 std::vector< unsigned char > send_data;
248 send_data.resize(message_size);
249 for (unsigned int i = 0; i < message_size; ++i)
250 send_data[i] = static_cast< unsigned char >(i & 0xFF);
251
252 BOOST_CHECK(queue_a.send(&send_data[0], static_cast< size_type >(send_data.size())) == queue_t::succeeded);
253
254 for (unsigned int i = 0; i < 3; ++i)
255 {
256 BOOST_CHECK(queue_a.send(&send_data[0], static_cast< size_type >(send_data.size())) == queue_t::succeeded);
257
258 std::vector< unsigned char > receive_data;
259 BOOST_CHECK(queue_b.receive(receive_data) == queue_t::succeeded);
260 BOOST_CHECK_EQUAL_COLLECTIONS(send_data.begin(), send_data.end(), receive_data.begin(), receive_data.end());
261 }
262
263 std::vector< unsigned char > receive_data;
264 BOOST_CHECK(queue_b.receive(receive_data) == queue_t::succeeded);
265 BOOST_CHECK_EQUAL_COLLECTIONS(send_data.begin(), send_data.end(), receive_data.begin(), receive_data.end());
266 }
267
268 // clear()
269 {
270 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
271 queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
272 BOOST_CHECK(queue_a.try_send(message1, sizeof(message1) - 1u));
273 BOOST_CHECK(!queue_a.try_send(message2, sizeof(message2) - 1u));
274
275 queue_a.clear();
276
277 BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
278 char buffer[block_size] = {};
279 size_type message_size = 0u;
280 BOOST_CHECK(queue_b.try_receive(buffer, sizeof(buffer), message_size));
281 BOOST_CHECK_EQUAL(message_size, sizeof(message2) - 1u);
282 BOOST_CHECK(std::memcmp(buffer, message2, message_size) == 0);
283 }
284 }
285
286 #if !defined(BOOST_LOG_NO_THREADS)
287
288 namespace {
289
290 const unsigned int message_count = 100000;
291
292 void multithreaded_message_passing_feeding_thread(const char* message, unsigned int& failure_count)
293 {
294 const size_type len = static_cast< size_type >(std::strlen(message));
295 queue_t queue(boost::log::open_mode::open_or_create, ipc_queue_name, capacity, block_size);
296 for (unsigned int i = 0; i < message_count; ++i)
297 {
298 failure_count += queue.send(message, len) != queue_t::succeeded;
299 }
300
301 boost::atomic_thread_fence(boost::memory_order_release);
302 }
303
304 } // namespace
305
306 BOOST_AUTO_TEST_CASE(multithreaded_message_passing)
307 {
308 unsigned int failure_count1 = 0, failure_count2 = 0, failure_count3 = 0;
309 boost::atomic_thread_fence(boost::memory_order_release);
310
311 boost::thread thread1(&multithreaded_message_passing_feeding_thread, "Thread 1", boost::ref(failure_count1));
312 boost::thread thread2(&multithreaded_message_passing_feeding_thread, "Thread 2", boost::ref(failure_count2));
313 boost::thread thread3(&multithreaded_message_passing_feeding_thread, "Thread 3", boost::ref(failure_count3));
314
315 BOOST_TEST_PASSPOINT();
316
317 queue_t queue(boost::log::open_mode::open_or_create, ipc_queue_name, capacity, block_size);
318 unsigned int receive_failures = 0, receive_corruptions = 0;
319 unsigned int message_count1 = 0, message_count2 = 0, message_count3 = 0;
320 std::string msg;
321
322 BOOST_TEST_PASSPOINT();
323
324 for (unsigned int i = 0; i < message_count * 3u; ++i)
325 {
326 msg.clear();
327 if (queue.receive(msg) == queue_t::succeeded)
328 {
329 if (msg == "Thread 1")
330 ++message_count1;
331 else if (msg == "Thread 2")
332 ++message_count2;
333 else if (msg == "Thread 3")
334 ++message_count3;
335 else
336 ++receive_corruptions;
337 }
338 else
339 ++receive_failures;
340 }
341
342 BOOST_TEST_PASSPOINT();
343 thread1.join();
344
345 BOOST_TEST_PASSPOINT();
346 thread2.join();
347
348 BOOST_TEST_PASSPOINT();
349 thread3.join();
350
351 boost::atomic_thread_fence(boost::memory_order_acquire);
352
353 BOOST_CHECK_EQUAL(failure_count1, 0u);
354 BOOST_CHECK_EQUAL(message_count1, message_count);
355 BOOST_CHECK_EQUAL(failure_count2, 0u);
356 BOOST_CHECK_EQUAL(message_count2, message_count);
357 BOOST_CHECK_EQUAL(failure_count3, 0u);
358 BOOST_CHECK_EQUAL(message_count3, message_count);
359 BOOST_CHECK_EQUAL(receive_failures, 0u);
360 BOOST_CHECK_EQUAL(receive_corruptions, 0u);
361 }
362
363 namespace {
364
365 void stop_reset_feeding_thread(queue_t& queue, queue_t::operation_result* results, unsigned int count)
366 {
367 for (unsigned int i = 0; i < count; ++i)
368 {
369 results[i] = queue.send(message1, sizeof(message1) - 1u);
370 if (results[i] != queue_t::succeeded)
371 break;
372 }
373
374 boost::atomic_thread_fence(boost::memory_order_release);
375 }
376
377 void stop_reset_reading_thread(queue_t& queue, queue_t::operation_result* results, unsigned int count)
378 {
379 std::string msg;
380 for (unsigned int i = 0; i < count; ++i)
381 {
382 msg.clear();
383 results[i] = queue.receive(msg);
384 if (results[i] != queue_t::succeeded)
385 break;
386 }
387
388 boost::atomic_thread_fence(boost::memory_order_release);
389 }
390
391 } // namespace
392
393 BOOST_AUTO_TEST_CASE(stop_reset_local)
394 {
395 queue_t feeder_queue(boost::log::open_mode::open_or_create, ipc_queue_name, 1u, block_size);
396 queue_t::operation_result feeder_results[3];
397 queue_t reader_queue(boost::log::open_mode::open_only, ipc_queue_name);
398 queue_t::operation_result reader_results[3];
399
400 std::fill_n(feeder_results, sizeof(feeder_results) / sizeof(*feeder_results), queue_t::succeeded);
401 std::fill_n(reader_results, sizeof(reader_results) / sizeof(*reader_results), queue_t::succeeded);
402 boost::atomic_thread_fence(boost::memory_order_release);
403
404 BOOST_TEST_PASSPOINT();
405
406 // Case 1: Let the feeder block and then we unblock it with stop_local()
407 boost::thread feeder_thread(&stop_reset_feeding_thread, boost::ref(feeder_queue), feeder_results, 3);
408 boost::thread reader_thread(&stop_reset_reading_thread, boost::ref(reader_queue), reader_results, 1);
409
410 BOOST_TEST_PASSPOINT();
411
412 reader_thread.join();
413 BOOST_TEST_PASSPOINT();
414 boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
415
416 BOOST_TEST_PASSPOINT();
417
418 feeder_queue.stop_local();
419 BOOST_TEST_PASSPOINT();
420 feeder_thread.join();
421
422 boost::atomic_thread_fence(boost::memory_order_acquire);
423
424 BOOST_CHECK_EQUAL(feeder_results[0], queue_t::succeeded);
425 BOOST_CHECK_EQUAL(feeder_results[1], queue_t::succeeded);
426 BOOST_CHECK_EQUAL(feeder_results[2], queue_t::aborted);
427 BOOST_CHECK_EQUAL(reader_results[0], queue_t::succeeded);
428
429 // Reset the aborted queue
430 feeder_queue.reset_local();
431 feeder_queue.clear();
432
433 std::fill_n(feeder_results, sizeof(feeder_results) / sizeof(*feeder_results), queue_t::succeeded);
434 std::fill_n(reader_results, sizeof(reader_results) / sizeof(*reader_results), queue_t::succeeded);
435 boost::atomic_thread_fence(boost::memory_order_release);
436
437 BOOST_TEST_PASSPOINT();
438
439 // Case 2: Let the reader block and then we unblock it with stop_local()
440 boost::thread(&stop_reset_feeding_thread, boost::ref(feeder_queue), feeder_results, 1).swap(feeder_thread);
441 boost::thread(&stop_reset_reading_thread, boost::ref(reader_queue), reader_results, 2).swap(reader_thread);
442
443 BOOST_TEST_PASSPOINT();
444
445 feeder_thread.join();
446 BOOST_TEST_PASSPOINT();
447 boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
448
449 BOOST_TEST_PASSPOINT();
450
451 reader_queue.stop_local();
452 BOOST_TEST_PASSPOINT();
453 reader_thread.join();
454
455 boost::atomic_thread_fence(boost::memory_order_acquire);
456
457 BOOST_CHECK_EQUAL(feeder_results[0], queue_t::succeeded);
458 BOOST_CHECK_EQUAL(feeder_results[1], queue_t::succeeded);
459 BOOST_CHECK_EQUAL(reader_results[0], queue_t::succeeded);
460 BOOST_CHECK_EQUAL(reader_results[1], queue_t::aborted);
461 }
462
463 #endif // !defined(BOOST_LOG_NO_THREADS)