]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/log/src/posix/ipc_reliable_message_queue.cpp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / libs / log / src / posix / ipc_reliable_message_queue.cpp
CommitLineData
7c673cae
FG
1/*
2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
7 */
8/*!
9 * \file posix/ipc_reliable_message_queue.cpp
10 * \author Lingxi Li
11 * \author Andrey Semashev
12 * \date 17.11.2015
13 *
14 * \brief This header is the Boost.Log library implementation, see the library documentation
15 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
16 *
17 * This file provides an interprocess message queue implementation on POSIX platforms.
18 */
19
20#include <boost/log/detail/config.hpp>
21#include <cstddef>
22#include <cerrno>
23#include <cstring>
1e59de90 24#include <ctime>
7c673cae
FG
25#include <new>
26#include <string>
27#include <stdexcept>
28#include <algorithm>
29#include <unistd.h>
30#if defined(BOOST_HAS_SCHED_YIELD)
31#include <sched.h>
32#elif defined(BOOST_HAS_PTHREAD_YIELD)
33#include <pthread.h>
34#elif defined(BOOST_HAS_NANOSLEEP)
35#include <time.h>
36#endif
37#include <boost/assert.hpp>
38#include <boost/static_assert.hpp>
39#include <boost/cstdint.hpp>
1e59de90 40#include <boost/memory_order.hpp>
7c673cae 41#include <boost/atomic/atomic.hpp>
1e59de90 42#include <boost/atomic/ipc_atomic.hpp>
7c673cae
FG
43#include <boost/atomic/capabilities.hpp>
44#include <boost/throw_exception.hpp>
45#include <boost/log/exceptions.hpp>
46#include <boost/log/utility/ipc/reliable_message_queue.hpp>
47#include <boost/log/support/exception.hpp>
48#include <boost/log/detail/pause.hpp>
49#include <boost/exception/info.hpp>
50#include <boost/exception/enable_error_info.hpp>
51#include <boost/interprocess/creation_tags.hpp>
52#include <boost/interprocess/exceptions.hpp>
53#include <boost/interprocess/permissions.hpp>
54#include <boost/interprocess/mapped_region.hpp>
55#include <boost/interprocess/shared_memory_object.hpp>
56#include <boost/align/align_up.hpp>
57#include "ipc_sync_wrappers.hpp"
58#include "murmur3.hpp"
59#include "bit_tools.hpp"
60#include <boost/log/detail/header.hpp>
61
7c673cae
FG
62namespace boost {
63
64BOOST_LOG_OPEN_NAMESPACE
65
66namespace ipc {
67
68//! Message queue implementation data
69struct reliable_message_queue::implementation
70{
71private:
72 //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment.
73 struct block_header
74 {
75 // Element data alignment, in bytes
76 enum { data_alignment = 32u };
77
78 //! Size of the element data, in bytes
79 size_type m_size;
80
81 //! Returns the block header overhead, in bytes
82 static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT
83 {
84 return static_cast< size_type >(boost::alignment::align_up(sizeof(block_header), data_alignment));
85 }
86
87 //! Returns a pointer to the element data
88 void* get_data() const BOOST_NOEXCEPT
89 {
90 return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead();
91 }
92 };
93
94 //! Header of the message queue. Placed at the beginning of the shared memory segment.
95 struct header
96 {
97 // Increment this constant whenever you change the binary layout of the queue (apart from this header structure)
98 enum { abi_version = 0 };
99
100 // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!!
101
102 //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment.
103 uint32_t m_abi_tag;
104 //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
105 unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)];
106 //! Reference counter. Also acts as a flag indicating that the queue is constructed (i.e. the queue is constructed when the counter is not 0).
1e59de90 107 boost::ipc_atomic< uint32_t > m_ref_count;
7c673cae
FG
108 //! Number of allocation blocks in the queue.
109 const uint32_t m_capacity;
110 //! Size of an allocation block, in bytes.
111 const size_type m_block_size;
112 //! Mutex for protecting queue data structures.
113 boost::log::ipc::aux::interprocess_mutex m_mutex;
114 //! Condition variable used to block readers when the queue is empty.
115 boost::log::ipc::aux::interprocess_condition_variable m_nonempty_queue;
116 //! Condition variable used to block writers when the queue is full.
117 boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue;
118 //! The current number of allocated blocks in the queue.
119 uint32_t m_size;
120 //! The current writing position (allocation block index).
121 uint32_t m_put_pos;
122 //! The current reading position (allocation block index).
123 uint32_t m_get_pos;
124
125 header(uint32_t capacity, size_type block_size) :
126 m_abi_tag(get_abi_tag()),
127 m_capacity(capacity),
128 m_block_size(block_size),
129 m_size(0u),
130 m_put_pos(0u),
131 m_get_pos(0u)
132 {
133 // Must be initialized last. m_ref_count is zero-initialized initially.
1e59de90 134 m_ref_count.opaque_add(1u, boost::memory_order_release);
7c673cae
FG
135 }
136
137 //! Returns the header structure ABI tag
138 static uint32_t get_abi_tag() BOOST_NOEXCEPT
139 {
140 // This FOURCC identifies the queue type
141 boost::log::aux::murmur3_32 hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q'));
142
143 // This FOURCC identifies the queue implementation
144 hash.mix(boost::log::aux::make_fourcc('p', 't', 'h', 'r'));
145 hash.mix(abi_version);
146
147 // We will use these constants to align pointers
148 hash.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE);
149 hash.mix(block_header::data_alignment);
150
151 // The members in the sequence below must be enumerated in the same order as they are declared in the header structure.
152 // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header.
153
154#define BOOST_LOG_MIX_HEADER_MEMBER(name)\
155 hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\
156 hash.mix(static_cast< uint32_t >(offsetof(header, name)))
157
158 BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag);
159 BOOST_LOG_MIX_HEADER_MEMBER(m_padding);
160 BOOST_LOG_MIX_HEADER_MEMBER(m_ref_count);
161 BOOST_LOG_MIX_HEADER_MEMBER(m_capacity);
162 BOOST_LOG_MIX_HEADER_MEMBER(m_block_size);
163 BOOST_LOG_MIX_HEADER_MEMBER(m_mutex);
164 BOOST_LOG_MIX_HEADER_MEMBER(m_nonempty_queue);
165 BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue);
166 BOOST_LOG_MIX_HEADER_MEMBER(m_size);
167 BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos);
168 BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos);
169
170#undef BOOST_LOG_MIX_HEADER_MEMBER
171
172 return hash.finalize();
173 }
174
175 //! Returns an element header at the specified index
176 block_header* get_block(uint32_t index) const BOOST_NOEXCEPT
177 {
178 BOOST_ASSERT(index < m_capacity);
179 unsigned char* p = const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE);
180 p += static_cast< std::size_t >(m_block_size) * static_cast< std::size_t >(index);
181 return reinterpret_cast< block_header* >(p);
182 }
183
184 BOOST_DELETED_FUNCTION(header(header const&))
185 BOOST_DELETED_FUNCTION(header& operator=(header const&))
186 };
187
188private:
189 //! Shared memory object
190 boost::interprocess::shared_memory_object m_shared_memory;
191 //! Shared memory mapping into the process address space
192 boost::interprocess::mapped_region m_region;
193 //! Queue overflow handling policy
194 const overflow_policy m_overflow_policy;
195 //! The mask for selecting bits that constitute size values from 0 to (block_size - 1)
196 size_type m_block_size_mask;
197 //! The number of the bit set in block_size (i.e. log base 2 of block_size)
198 uint32_t m_block_size_log2;
199 //! The flag indicates that stop has been requested
1e59de90 200 boost::atomic< bool > m_stop;
7c673cae
FG
201
202 //! Queue shared memory object name
203 const object_name m_name;
204
1e59de90
TL
205 //! The total number of loop iterations in \c adopt_region for waiting for the region initialization to complete
206 static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops = 200u;
207 //! Threshold of the number of loop iterations in \c adopt_region for using pause instructions for yielding
208 static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops_pause = 16u;
209 //! Threshold of the number of loop iterations in \c adopt_region for using \c short_yield for yielding
210 static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops_short_yield = 64u;
211 //! Timeout, in seconds, for performing shared memory creation/opening loop
212 static BOOST_CONSTEXPR_OR_CONST unsigned int region_open_or_create_timeout = 60u;
213 //! The number of short yields to perform during the shared memory creation/opening loop
214 static BOOST_CONSTEXPR_OR_CONST unsigned int region_open_or_create_short_yield_loops = 64u;
215
7c673cae
FG
216public:
217 //! The constructor creates a new shared memory segment
218 implementation
219 (
220 open_mode::create_only_tag,
221 object_name const& name,
222 uint32_t capacity,
223 size_type block_size,
224 overflow_policy oflow_policy,
225 permissions const& perms
226 ) :
1e59de90 227 m_shared_memory(),
7c673cae
FG
228 m_region(),
229 m_overflow_policy(oflow_policy),
230 m_block_size_mask(0u),
231 m_block_size_log2(0u),
232 m_stop(false),
233 m_name(name)
234 {
1e59de90
TL
235 BOOST_ASSERT(block_size >= block_header::get_header_overhead());
236
237 boost::interprocess::permissions ipc_perms(perms.get_native());
238 while (true)
239 {
240 try
241 {
242 boost::interprocess::shared_memory_object shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, ipc_perms);
243 m_shared_memory.swap(shared_memory);
244 break;
245 }
246 catch (boost::interprocess::interprocess_exception& e)
247 {
248 // shared_memory_object does not handle EINTR returned from shm_open internally.
249 // https://github.com/boostorg/interprocess/issues/152
250 if (e.get_native_error() != EINTR)
251 throw;
252 }
253 }
254
7c673cae
FG
255 create_region(capacity, block_size);
256 }
257
258 //! The constructor creates a new shared memory segment or opens the existing one
259 implementation
260 (
261 open_mode::open_or_create_tag,
262 object_name const& name,
263 uint32_t capacity,
264 size_type block_size,
265 overflow_policy oflow_policy,
266 permissions const& perms
267 ) :
1e59de90 268 m_shared_memory(),
7c673cae
FG
269 m_region(),
270 m_overflow_policy(oflow_policy),
271 m_block_size_mask(0u),
272 m_block_size_log2(0u),
273 m_stop(false),
274 m_name(name)
275 {
1e59de90
TL
276 BOOST_ASSERT(block_size >= block_header::get_header_overhead());
277
278 // We need to know for certain whether we create the shared memory segment or open an existing one.
279 // This is to ensure that only one thread initializes the segment and all other threads wait until completion.
280 // Since shared_memory_object(open_or_create) constructor does not report whether the segment was actually created,
281 // we have to loop trying to create or open the segment. https://github.com/boostorg/interprocess/issues/151
282 boost::interprocess::permissions ipc_perms(perms.get_native());
283 bool created = false;
284 unsigned int i = 0u;
285 std::time_t start_time = std::time(NULL);
286 while (true)
287 {
288 while (true)
289 {
290 try
291 {
292 boost::interprocess::shared_memory_object shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, ipc_perms);
293 m_shared_memory.swap(shared_memory);
294 created = true;
295 goto done;
296 }
297 catch (boost::interprocess::interprocess_exception& e)
298 {
299 if (e.get_error_code() == boost::interprocess::already_exists_error)
300 break;
301
302 // shared_memory_object does not handle EINTR returned from shm_open internally.
303 // https://github.com/boostorg/interprocess/issues/152
304 if (e.get_native_error() != EINTR)
305 throw;
306 }
307 }
308
309 while (true)
310 {
311 try
312 {
313 boost::interprocess::shared_memory_object shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write);
314 m_shared_memory.swap(shared_memory);
315 created = false;
316 goto done;
317 }
318 catch (boost::interprocess::interprocess_exception& e)
319 {
320 if (e.get_error_code() == boost::interprocess::not_found_error)
321 break;
322
323 if (e.get_native_error() != EINTR)
324 throw;
325 }
326 }
327
328 std::time_t now = std::time(NULL);
329 if (BOOST_UNLIKELY((now - start_time) >= region_open_or_create_timeout))
330 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be created or opened: shared memory segment failed to be created or opened until timeout (possible livelock)");
331
332 if (i < region_open_or_create_short_yield_loops)
333 short_yield();
334 else
335 long_yield();
336
337 ++i;
338 }
339
340 done:
341 if (created)
7c673cae
FG
342 create_region(capacity, block_size);
343 else
1e59de90 344 adopt_region();
7c673cae
FG
345 }
346
347 //! The constructor opens the existing shared memory segment
348 implementation
349 (
350 open_mode::open_only_tag,
351 object_name const& name,
352 overflow_policy oflow_policy
353 ) :
1e59de90 354 m_shared_memory(),
7c673cae
FG
355 m_region(),
356 m_overflow_policy(oflow_policy),
357 m_block_size_mask(0u),
358 m_block_size_log2(0u),
359 m_stop(false),
360 m_name(name)
361 {
1e59de90
TL
362 while (true)
363 {
364 try
365 {
366 boost::interprocess::shared_memory_object shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write);
367 m_shared_memory.swap(shared_memory);
368 break;
369 }
370 catch (boost::interprocess::interprocess_exception& e)
371 {
372 // shared_memory_object does not handle EINTR returned from shm_open internally.
373 // https://github.com/boostorg/interprocess/issues/152
374 if (e.get_native_error() != EINTR)
375 throw;
376 }
377 }
7c673cae 378
1e59de90 379 adopt_region();
7c673cae
FG
380 }
381
382 ~implementation()
383 {
384 close_region();
385 }
386
387 object_name const& name() const BOOST_NOEXCEPT
388 {
389 return m_name;
390 }
391
392 uint32_t capacity() const BOOST_NOEXCEPT
393 {
394 return get_header()->m_capacity;
395 }
396
397 size_type block_size() const BOOST_NOEXCEPT
398 {
399 return get_header()->m_block_size;
400 }
401
402 operation_result send(void const* message_data, size_type message_size)
403 {
404 const uint32_t block_count = estimate_block_count(message_size);
405
406 header* const hdr = get_header();
407
408 if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
409 BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
410
1e59de90 411 if (m_stop.load(boost::memory_order_relaxed))
7c673cae
FG
412 return aborted;
413
414 lock_queue();
415 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
416
417 while (true)
418 {
1e59de90 419 if (m_stop.load(boost::memory_order_relaxed))
7c673cae
FG
420 return aborted;
421
422 if ((hdr->m_capacity - hdr->m_size) >= block_count)
423 break;
424
425 const overflow_policy oflow_policy = m_overflow_policy;
426 if (oflow_policy == fail_on_overflow)
427 return no_space;
428 else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow))
429 BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full");
430
431 hdr->m_nonfull_queue.wait(hdr->m_mutex);
432 }
433
434 enqueue_message(message_data, message_size, block_count);
435
436 return succeeded;
437 }
438
439 bool try_send(void const* message_data, size_type message_size)
440 {
441 const uint32_t block_count = estimate_block_count(message_size);
442
443 header* const hdr = get_header();
444
445 if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
446 BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
447
1e59de90 448 if (m_stop.load(boost::memory_order_relaxed))
7c673cae
FG
449 return false;
450
451 lock_queue();
452 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
453
1e59de90 454 if (m_stop.load(boost::memory_order_relaxed))
7c673cae
FG
455 return false;
456
457 if ((hdr->m_capacity - hdr->m_size) < block_count)
458 return false;
459
460 enqueue_message(message_data, message_size, block_count);
461
462 return true;
463 }
464
465 operation_result receive(receive_handler handler, void* state)
466 {
1e59de90 467 if (m_stop.load(boost::memory_order_relaxed))
7c673cae
FG
468 return aborted;
469
470 lock_queue();
471 header* const hdr = get_header();
472 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
473
474 while (true)
475 {
1e59de90 476 if (m_stop.load(boost::memory_order_relaxed))
7c673cae
FG
477 return aborted;
478
479 if (hdr->m_size > 0u)
480 break;
481
482 hdr->m_nonempty_queue.wait(hdr->m_mutex);
483 }
484
485 dequeue_message(handler, state);
486
487 return succeeded;
488 }
489
490 bool try_receive(receive_handler handler, void* state)
491 {
1e59de90 492 if (m_stop.load(boost::memory_order_relaxed))
7c673cae
FG
493 return false;
494
495 lock_queue();
496 header* const hdr = get_header();
497 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
498
499 if (hdr->m_size == 0u)
500 return false;
501
502 dequeue_message(handler, state);
503
504 return true;
505 }
506
507 void stop_local()
508 {
1e59de90 509 if (m_stop.load(boost::memory_order_relaxed))
7c673cae
FG
510 return;
511
512 lock_queue();
513 header* const hdr = get_header();
514 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
515
1e59de90 516 m_stop.store(true, boost::memory_order_relaxed);
7c673cae
FG
517
518 hdr->m_nonempty_queue.notify_all();
519 hdr->m_nonfull_queue.notify_all();
520 }
521
522 void reset_local()
523 {
1e59de90 524 m_stop.store(false, boost::memory_order_relaxed);
7c673cae
FG
525 }
526
527 void clear()
528 {
529 lock_queue();
530 header* const hdr = get_header();
531 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
532 clear_queue();
533 }
534
535private:
536 header* get_header() const BOOST_NOEXCEPT
537 {
538 return static_cast< header* >(m_region.get_address());
539 }
540
541 static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT
542 {
543 return boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size);
544 }
545
546 void create_region(uint32_t capacity, size_type block_size)
547 {
548 const std::size_t shmem_size = estimate_region_size(capacity, block_size);
549 m_shared_memory.truncate(shmem_size);
550 boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0u, shmem_size).swap(m_region);
551
552 new (m_region.get_address()) header(capacity, block_size);
553
554 init_block_size(block_size);
555 }
556
1e59de90 557 void adopt_region()
7c673cae 558 {
1e59de90
TL
559 std::size_t shmem_size = 0u;
560 unsigned int i = 0u;
561 std::time_t start_time = std::time(NULL);
562 while (true)
563 {
564 boost::interprocess::offset_t shm_size = 0;
565 const bool get_size_result = m_shared_memory.get_size(shm_size);
566 if (BOOST_LIKELY(get_size_result && shm_size > 0))
567 {
568 shmem_size = static_cast< std::size_t >(shm_size);
569 break;
570 }
571
572 std::time_t now = std::time(NULL);
573 if (BOOST_UNLIKELY((now - start_time) >= region_open_or_create_timeout))
574 {
575 if (get_size_result)
576 goto shmem_size_too_small;
577 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size could not be determined until timeout");
578 }
579
580 if (i < region_open_or_create_short_yield_loops)
581 short_yield();
582 else
583 long_yield();
584
585 ++i;
586 }
587
588 if (BOOST_UNLIKELY(shmem_size < sizeof(header)))
589 {
590 shmem_size_too_small:
7c673cae 591 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
1e59de90 592 }
7c673cae 593
1e59de90 594 boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0, shmem_size).swap(m_region);
7c673cae
FG
595
596 // Wait until the mapped region becomes initialized
597 header* const hdr = get_header();
1e59de90 598 for (i = 0u; i < region_init_wait_loops; ++i)
7c673cae
FG
599 {
600 uint32_t ref_count = hdr->m_ref_count.load(boost::memory_order_acquire);
601 while (ref_count > 0u)
602 {
603 if (hdr->m_ref_count.compare_exchange_weak(ref_count, ref_count + 1u, boost::memory_order_acq_rel, boost::memory_order_acquire))
604 goto done;
605 }
606
1e59de90
TL
607 if (i < region_init_wait_loops_pause)
608 boost::log::aux::pause();
609 else if (i < region_init_wait_loops_short_yield)
610 short_yield();
7c673cae 611 else
1e59de90 612 long_yield();
7c673cae
FG
613 }
614
615 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
616
617 done:
618 try
619 {
620 // Check that the queue layout matches the current process ABI
621 if (hdr->m_abi_tag != header::get_abi_tag())
622 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible");
623
624 if (!boost::log::aux::is_power_of_2(hdr->m_block_size))
625 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2");
626
627 init_block_size(hdr->m_block_size);
628 }
629 catch (...)
630 {
631 close_region();
632 throw;
633 }
634 }
635
636 void close_region() BOOST_NOEXCEPT
637 {
638 header* const hdr = get_header();
639
640 if (hdr->m_ref_count.fetch_sub(1u, boost::memory_order_acq_rel) == 1u)
641 {
642 boost::interprocess::shared_memory_object::remove(m_shared_memory.get_name());
643
644 hdr->~header();
645
646 boost::interprocess::mapped_region().swap(m_region);
647 boost::interprocess::shared_memory_object().swap(m_shared_memory);
648
649 m_block_size_mask = 0u;
650 m_block_size_log2 = 0u;
651 }
652 }
653
654 void init_block_size(size_type block_size)
655 {
656 m_block_size_mask = block_size - 1u;
657
658 uint32_t block_size_log2 = 0u;
659 if ((block_size & 0x0000ffff) == 0u)
660 {
661 block_size >>= 16u;
662 block_size_log2 += 16u;
663 }
664 if ((block_size & 0x000000ff) == 0u)
665 {
666 block_size >>= 8u;
667 block_size_log2 += 8u;
668 }
669 if ((block_size & 0x0000000f) == 0u)
670 {
671 block_size >>= 4u;
672 block_size_log2 += 4u;
673 }
674 if ((block_size & 0x00000003) == 0u)
675 {
676 block_size >>= 2u;
677 block_size_log2 += 2u;
678 }
679 if ((block_size & 0x00000001) == 0u)
680 {
681 ++block_size_log2;
682 }
683 m_block_size_log2 = block_size_log2;
684 }
685
686 void lock_queue()
687 {
688 header* const hdr = get_header();
689
690#if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST)
691 try
692 {
693#endif
694 hdr->m_mutex.lock();
695#if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST)
696 }
697 catch (boost::log::ipc::aux::lock_owner_dead&)
698 {
699 // The mutex is locked by the current thread, but the previous owner terminated without releasing the lock
700 try
701 {
702 clear_queue();
703 hdr->m_mutex.recover();
704 }
705 catch (...)
706 {
707 hdr->m_mutex.unlock();
708 throw;
709 }
710 }
711#endif
712 }
713
714 void clear_queue()
715 {
716 header* const hdr = get_header();
717 hdr->m_size = 0u;
718 hdr->m_put_pos = 0u;
719 hdr->m_get_pos = 0u;
720 hdr->m_nonfull_queue.notify_all();
721 }
722
723 //! Returns the number of allocation blocks that are required to store user's payload of the specified size
724 uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT
725 {
726 // ceil((size + get_header_overhead()) / block_size)
727 return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2);
728 }
729
730 //! Puts the message to the back of the queue
731 void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count)
732 {
733 header* const hdr = get_header();
734
735 const uint32_t capacity = hdr->m_capacity;
736 const size_type block_size = hdr->m_block_size;
737 uint32_t pos = hdr->m_put_pos;
1e59de90 738 BOOST_ASSERT(pos < capacity);
7c673cae
FG
739
740 block_header* block = hdr->get_block(pos);
741 block->m_size = message_size;
742
743 size_type write_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
744 std::memcpy(block->get_data(), message_data, write_size);
745
746 pos += block_count;
747 if (BOOST_UNLIKELY(pos >= capacity))
748 {
749 // Write the rest of the message at the beginning of the queue
750 pos -= capacity;
751 message_data = static_cast< const unsigned char* >(message_data) + write_size;
752 write_size = message_size - write_size;
753 if (write_size > 0u)
754 std::memcpy(hdr->get_block(0u), message_data, write_size);
755 }
756
757 hdr->m_put_pos = pos;
758
759 const uint32_t old_queue_size = hdr->m_size;
760 hdr->m_size = old_queue_size + block_count;
761 if (old_queue_size == 0u)
762 hdr->m_nonempty_queue.notify_one();
763 }
764
765 //! Retrieves the next message and invokes the handler to store the message contents
766 void dequeue_message(receive_handler handler, void* state)
767 {
768 header* const hdr = get_header();
769
770 const uint32_t capacity = hdr->m_capacity;
771 const size_type block_size = hdr->m_block_size;
772 uint32_t pos = hdr->m_get_pos;
1e59de90 773 BOOST_ASSERT(pos < capacity);
7c673cae
FG
774
775 block_header* block = hdr->get_block(pos);
776 size_type message_size = block->m_size;
777 uint32_t block_count = estimate_block_count(message_size);
778
779 BOOST_ASSERT(block_count <= hdr->m_size);
780
781 size_type read_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
782 handler(state, block->get_data(), read_size);
783
784 pos += block_count;
785 if (BOOST_UNLIKELY(pos >= capacity))
786 {
787 // Read the tail of the message
788 pos -= capacity;
789 read_size = message_size - read_size;
790 if (read_size > 0u)
791 handler(state, hdr->get_block(0u), read_size);
792 }
793
794 hdr->m_get_pos = pos;
795 hdr->m_size -= block_count;
796
797 hdr->m_nonfull_queue.notify_all();
798 }
1e59de90
TL
799
800 static void short_yield() BOOST_NOEXCEPT
801 {
802#if defined(BOOST_HAS_SCHED_YIELD)
803 sched_yield();
804#elif defined(BOOST_HAS_PTHREAD_YIELD)
805 pthread_yield();
806#else
807 long_yield();
808#endif
809 }
810
811 static void long_yield() BOOST_NOEXCEPT
812 {
813#if defined(BOOST_HAS_NANOSLEEP)
814 timespec ts = {};
815 ts.tv_sec = 0;
816 ts.tv_nsec = 1000;
817 nanosleep(&ts, NULL);
818#else
819 usleep(1);
820#endif
821 }
7c673cae
FG
822};
823
824BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
825{
826 BOOST_ASSERT(m_impl == NULL);
827 if (!boost::log::aux::is_power_of_2(block_size))
828 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
829 try
830 {
831 m_impl = new implementation(open_mode::create_only, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
832 }
833 catch (boost::exception& e)
834 {
835 e << boost::log::ipc::object_name_info(name);
836 throw;
837 }
838 catch (boost::interprocess::interprocess_exception& e)
839 {
840 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
841 }
842}
843
844BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
845{
846 BOOST_ASSERT(m_impl == NULL);
847 if (!boost::log::aux::is_power_of_2(block_size))
848 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
849 try
850 {
851 m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
852 }
853 catch (boost::exception& e)
854 {
855 e << boost::log::ipc::object_name_info(name);
856 throw;
857 }
858 catch (boost::interprocess::interprocess_exception& e)
859 {
860 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
861 }
862}
863
864BOOST_LOG_API void reliable_message_queue::open(object_name const& name, overflow_policy oflow_policy, permissions const&)
865{
866 BOOST_ASSERT(m_impl == NULL);
867 try
868 {
869 m_impl = new implementation(open_mode::open_only, name, oflow_policy);
870 }
871 catch (boost::exception& e)
872 {
873 e << boost::log::ipc::object_name_info(name);
874 throw;
875 }
876 catch (boost::interprocess::interprocess_exception& e)
877 {
878 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
879 }
880}
881
882BOOST_LOG_API void reliable_message_queue::clear()
883{
884 BOOST_ASSERT(m_impl != NULL);
885 try
886 {
887 m_impl->clear();
888 }
889 catch (boost::exception& e)
890 {
891 e << boost::log::ipc::object_name_info(m_impl->name());
892 throw;
893 }
894}
895
896BOOST_LOG_API object_name const& reliable_message_queue::name() const
897{
898 BOOST_ASSERT(m_impl != NULL);
899 return m_impl->name();
900}
901
902BOOST_LOG_API uint32_t reliable_message_queue::capacity() const
903{
904 BOOST_ASSERT(m_impl != NULL);
905 return m_impl->capacity();
906}
907
908BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const
909{
910 BOOST_ASSERT(m_impl != NULL);
911 return m_impl->block_size();
912}
913
914BOOST_LOG_API void reliable_message_queue::stop_local()
915{
916 BOOST_ASSERT(m_impl != NULL);
917 try
918 {
919 m_impl->stop_local();
920 }
921 catch (boost::exception& e)
922 {
923 e << boost::log::ipc::object_name_info(m_impl->name());
924 throw;
925 }
926}
927
928BOOST_LOG_API void reliable_message_queue::reset_local()
929{
930 BOOST_ASSERT(m_impl != NULL);
931 try
932 {
933 m_impl->reset_local();
934 }
935 catch (boost::exception& e)
936 {
937 e << boost::log::ipc::object_name_info(m_impl->name());
938 throw;
939 }
940}
941
942BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT
943{
944 delete m_impl;
945 m_impl = NULL;
946}
947
948BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size)
949{
950 BOOST_ASSERT(m_impl != NULL);
951 try
952 {
953 return m_impl->send(message_data, message_size);
954 }
955 catch (boost::exception& e)
956 {
957 e << boost::log::ipc::object_name_info(m_impl->name());
958 throw;
959 }
960}
961
962BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size)
963{
964 BOOST_ASSERT(m_impl != NULL);
965 try
966 {
967 return m_impl->try_send(message_data, message_size);
968 }
969 catch (boost::exception& e)
970 {
971 e << boost::log::ipc::object_name_info(m_impl->name());
972 throw;
973 }
974}
975
976BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::do_receive(receive_handler handler, void* state)
977{
978 BOOST_ASSERT(m_impl != NULL);
979 try
980 {
981 return m_impl->receive(handler, state);
982 }
983 catch (boost::exception& e)
984 {
985 e << boost::log::ipc::object_name_info(m_impl->name());
986 throw;
987 }
988}
989
990BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handler, void* state)
991{
992 BOOST_ASSERT(m_impl != NULL);
993 try
994 {
995 return m_impl->try_receive(handler, state);
996 }
997 catch (boost::exception& e)
998 {
999 e << boost::log::ipc::object_name_info(m_impl->name());
1000 throw;
1001 }
1002}
1003
1004//! Fixed buffer receive handler
1005BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size)
1006{
1007 fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state);
1008 if (BOOST_UNLIKELY(size > p->size))
1009 BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message"));
1010
1011 std::memcpy(p->data, data, size);
1012 p->data += size;
1013 p->size -= size;
1014}
1015
1016BOOST_LOG_API void reliable_message_queue::remove(object_name const& name)
1017{
1018 boost::interprocess::shared_memory_object::remove(name.c_str());
1019}
1020
1021} // namespace ipc
1022
1023BOOST_LOG_CLOSE_NAMESPACE // namespace log
1024
1025} // namespace boost
1026
1027#include <boost/log/detail/footer.hpp>