2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
9 * \file posix/ipc_reliable_message_queue.cpp
11 * \author Andrey Semashev
14 * \brief This header is the Boost.Log library implementation, see the library documentation
15 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
17 * This file provides an interprocess message queue implementation on POSIX platforms.
20 #include <boost/log/detail/config.hpp>
29 #if defined(BOOST_HAS_SCHED_YIELD)
31 #elif defined(BOOST_HAS_PTHREAD_YIELD)
33 #elif defined(BOOST_HAS_NANOSLEEP)
36 #include <boost/assert.hpp>
37 #include <boost/static_assert.hpp>
38 #include <boost/cstdint.hpp>
39 #include <boost/atomic/atomic.hpp>
40 #include <boost/atomic/capabilities.hpp>
41 #include <boost/throw_exception.hpp>
42 #include <boost/log/exceptions.hpp>
43 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
44 #include <boost/log/support/exception.hpp>
45 #include <boost/log/detail/pause.hpp>
46 #include <boost/exception/info.hpp>
47 #include <boost/exception/enable_error_info.hpp>
48 #include <boost/interprocess/creation_tags.hpp>
49 #include <boost/interprocess/exceptions.hpp>
50 #include <boost/interprocess/permissions.hpp>
51 #include <boost/interprocess/mapped_region.hpp>
52 #include <boost/interprocess/shared_memory_object.hpp>
53 #include <boost/align/align_up.hpp>
54 #include "ipc_sync_wrappers.hpp"
55 #include "murmur3.hpp"
56 #include "bit_tools.hpp"
57 #include <boost/log/detail/header.hpp>
59 #if BOOST_ATOMIC_INT32_LOCK_FREE != 2
60 // 32-bit atomic ops are required to be able to place atomic<uint32_t> in the process-shared memory
61 #error Boost.Log: Native 32-bit atomic operations are required but not supported by Boost.Atomic on the target platform
66 BOOST_LOG_OPEN_NAMESPACE
70 //! Message queue implementation data
71 struct reliable_message_queue::implementation
74 //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment.
77 // Element data alignment, in bytes
78 enum { data_alignment
= 32u };
80 //! Size of the element data, in bytes
83 //! Returns the block header overhead, in bytes
84 static BOOST_CONSTEXPR size_type
get_header_overhead() BOOST_NOEXCEPT
86 return static_cast< size_type
>(boost::alignment::align_up(sizeof(block_header
), data_alignment
));
89 //! Returns a pointer to the element data
90 void* get_data() const BOOST_NOEXCEPT
92 return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead();
96 //! Header of the message queue. Placed at the beginning of the shared memory segment.
99 // Increment this constant whenever you change the binary layout of the queue (apart from this header structure)
100 enum { abi_version
= 0 };
102 // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!!
104 //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment.
106 //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
107 unsigned char m_padding
[BOOST_LOG_CPU_CACHE_LINE_SIZE
- sizeof(uint32_t)];
108 //! Reference counter. Also acts as a flag indicating that the queue is constructed (i.e. the queue is constructed when the counter is not 0).
109 boost::atomic
< uint32_t > m_ref_count
;
110 //! Number of allocation blocks in the queue.
111 const uint32_t m_capacity
;
112 //! Size of an allocation block, in bytes.
113 const size_type m_block_size
;
114 //! Mutex for protecting queue data structures.
115 boost::log::ipc::aux::interprocess_mutex m_mutex
;
116 //! Condition variable used to block readers when the queue is empty.
117 boost::log::ipc::aux::interprocess_condition_variable m_nonempty_queue
;
118 //! Condition variable used to block writers when the queue is full.
119 boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue
;
120 //! The current number of allocated blocks in the queue.
122 //! The current writing position (allocation block index).
124 //! The current reading position (allocation block index).
127 header(uint32_t capacity
, size_type block_size
) :
128 m_abi_tag(get_abi_tag()),
129 m_capacity(capacity
),
130 m_block_size(block_size
),
135 // Must be initialized last. m_ref_count is zero-initialized initially.
136 m_ref_count
.fetch_add(1u, boost::memory_order_release
);
139 //! Returns the header structure ABI tag
140 static uint32_t get_abi_tag() BOOST_NOEXCEPT
142 // This FOURCC identifies the queue type
143 boost::log::aux::murmur3_32
hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q'));
145 // This FOURCC identifies the queue implementation
146 hash
.mix(boost::log::aux::make_fourcc('p', 't', 'h', 'r'));
147 hash
.mix(abi_version
);
149 // We will use these constants to align pointers
150 hash
.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE
);
151 hash
.mix(block_header::data_alignment
);
153 // The members in the sequence below must be enumerated in the same order as they are declared in the header structure.
154 // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header.
156 #define BOOST_LOG_MIX_HEADER_MEMBER(name)\
157 hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\
158 hash.mix(static_cast< uint32_t >(offsetof(header, name)))
160 BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag
);
161 BOOST_LOG_MIX_HEADER_MEMBER(m_padding
);
162 BOOST_LOG_MIX_HEADER_MEMBER(m_ref_count
);
163 BOOST_LOG_MIX_HEADER_MEMBER(m_capacity
);
164 BOOST_LOG_MIX_HEADER_MEMBER(m_block_size
);
165 BOOST_LOG_MIX_HEADER_MEMBER(m_mutex
);
166 BOOST_LOG_MIX_HEADER_MEMBER(m_nonempty_queue
);
167 BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue
);
168 BOOST_LOG_MIX_HEADER_MEMBER(m_size
);
169 BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos
);
170 BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos
);
172 #undef BOOST_LOG_MIX_HEADER_MEMBER
174 return hash
.finalize();
177 //! Returns an element header at the specified index
178 block_header
* get_block(uint32_t index
) const BOOST_NOEXCEPT
180 BOOST_ASSERT(index
< m_capacity
);
181 unsigned char* p
= const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header
), BOOST_LOG_CPU_CACHE_LINE_SIZE
);
182 p
+= static_cast< std::size_t >(m_block_size
) * static_cast< std::size_t >(index
);
183 return reinterpret_cast< block_header
* >(p
);
186 BOOST_DELETED_FUNCTION(header(header
const&))
187 BOOST_DELETED_FUNCTION(header
& operator=(header
const&))
191 //! Shared memory object
192 boost::interprocess::shared_memory_object m_shared_memory
;
193 //! Shared memory mapping into the process address space
194 boost::interprocess::mapped_region m_region
;
195 //! Queue overflow handling policy
196 const overflow_policy m_overflow_policy
;
197 //! The mask for selecting bits that constitute size values from 0 to (block_size - 1)
198 size_type m_block_size_mask
;
199 //! The number of the bit set in block_size (i.e. log base 2 of block_size)
200 uint32_t m_block_size_log2
;
201 //! The flag indicates that stop has been requested
204 //! Queue shared memory object name
205 const object_name m_name
;
208 //! The constructor creates a new shared memory segment
211 open_mode::create_only_tag
,
212 object_name
const& name
,
214 size_type block_size
,
215 overflow_policy oflow_policy
,
216 permissions
const& perms
218 m_shared_memory(boost::interprocess::create_only
, name
.c_str(), boost::interprocess::read_write
, boost::interprocess::permissions(perms
.get_native())),
220 m_overflow_policy(oflow_policy
),
221 m_block_size_mask(0u),
222 m_block_size_log2(0u),
226 create_region(capacity
, block_size
);
229 //! The constructor creates a new shared memory segment or opens the existing one
232 open_mode::open_or_create_tag
,
233 object_name
const& name
,
235 size_type block_size
,
236 overflow_policy oflow_policy
,
237 permissions
const& perms
239 m_shared_memory(boost::interprocess::open_or_create
, name
.c_str(), boost::interprocess::read_write
, boost::interprocess::permissions(perms
.get_native())),
241 m_overflow_policy(oflow_policy
),
242 m_block_size_mask(0u),
243 m_block_size_log2(0u),
247 boost::interprocess::offset_t shmem_size
= 0;
248 if (!m_shared_memory
.get_size(shmem_size
) || shmem_size
== 0)
249 create_region(capacity
, block_size
);
251 adopt_region(shmem_size
);
254 //! The constructor opens the existing shared memory segment
257 open_mode::open_only_tag
,
258 object_name
const& name
,
259 overflow_policy oflow_policy
261 m_shared_memory(boost::interprocess::open_only
, name
.c_str(), boost::interprocess::read_write
),
263 m_overflow_policy(oflow_policy
),
264 m_block_size_mask(0u),
265 m_block_size_log2(0u),
269 boost::interprocess::offset_t shmem_size
= 0;
270 if (!m_shared_memory
.get_size(shmem_size
))
271 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: shared memory segment not found");
273 adopt_region(shmem_size
);
281 object_name
const& name() const BOOST_NOEXCEPT
286 uint32_t capacity() const BOOST_NOEXCEPT
288 return get_header()->m_capacity
;
291 size_type
block_size() const BOOST_NOEXCEPT
293 return get_header()->m_block_size
;
296 operation_result
send(void const* message_data
, size_type message_size
)
298 const uint32_t block_count
= estimate_block_count(message_size
);
300 header
* const hdr
= get_header();
302 if (BOOST_UNLIKELY(block_count
> hdr
->m_capacity
))
303 BOOST_LOG_THROW_DESCR(logic_error
, "Message size exceeds the interprocess queue capacity");
309 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(hdr
->m_mutex
);
316 if ((hdr
->m_capacity
- hdr
->m_size
) >= block_count
)
319 const overflow_policy oflow_policy
= m_overflow_policy
;
320 if (oflow_policy
== fail_on_overflow
)
322 else if (BOOST_UNLIKELY(oflow_policy
== throw_on_overflow
))
323 BOOST_LOG_THROW_DESCR(capacity_limit_reached
, "Interprocess queue is full");
325 hdr
->m_nonfull_queue
.wait(hdr
->m_mutex
);
328 enqueue_message(message_data
, message_size
, block_count
);
333 bool try_send(void const* message_data
, size_type message_size
)
335 const uint32_t block_count
= estimate_block_count(message_size
);
337 header
* const hdr
= get_header();
339 if (BOOST_UNLIKELY(block_count
> hdr
->m_capacity
))
340 BOOST_LOG_THROW_DESCR(logic_error
, "Message size exceeds the interprocess queue capacity");
346 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(hdr
->m_mutex
);
351 if ((hdr
->m_capacity
- hdr
->m_size
) < block_count
)
354 enqueue_message(message_data
, message_size
, block_count
);
359 operation_result
receive(receive_handler handler
, void* state
)
365 header
* const hdr
= get_header();
366 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(hdr
->m_mutex
);
373 if (hdr
->m_size
> 0u)
376 hdr
->m_nonempty_queue
.wait(hdr
->m_mutex
);
379 dequeue_message(handler
, state
);
384 bool try_receive(receive_handler handler
, void* state
)
390 header
* const hdr
= get_header();
391 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(hdr
->m_mutex
);
393 if (hdr
->m_size
== 0u)
396 dequeue_message(handler
, state
);
407 header
* const hdr
= get_header();
408 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(hdr
->m_mutex
);
412 hdr
->m_nonempty_queue
.notify_all();
413 hdr
->m_nonfull_queue
.notify_all();
424 header
* const hdr
= get_header();
425 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(hdr
->m_mutex
);
430 header
* get_header() const BOOST_NOEXCEPT
432 return static_cast< header
* >(m_region
.get_address());
435 static std::size_t estimate_region_size(uint32_t capacity
, size_type block_size
) BOOST_NOEXCEPT
437 return boost::alignment::align_up(sizeof(header
), BOOST_LOG_CPU_CACHE_LINE_SIZE
) + static_cast< std::size_t >(capacity
) * static_cast< std::size_t >(block_size
);
440 void create_region(uint32_t capacity
, size_type block_size
)
442 const std::size_t shmem_size
= estimate_region_size(capacity
, block_size
);
443 m_shared_memory
.truncate(shmem_size
);
444 boost::interprocess::mapped_region(m_shared_memory
, boost::interprocess::read_write
, 0u, shmem_size
).swap(m_region
);
446 new (m_region
.get_address()) header(capacity
, block_size
);
448 init_block_size(block_size
);
451 void adopt_region(std::size_t shmem_size
)
453 if (shmem_size
< sizeof(header
))
454 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
456 boost::interprocess::mapped_region(m_shared_memory
, boost::interprocess::read_write
, 0u, shmem_size
).swap(m_region
);
458 // Wait until the mapped region becomes initialized
459 header
* const hdr
= get_header();
460 BOOST_CONSTEXPR_OR_CONST
unsigned int wait_loops
= 200u, spin_loops
= 16u, spins
= 16u;
461 for (unsigned int i
= 0; i
< wait_loops
; ++i
)
463 uint32_t ref_count
= hdr
->m_ref_count
.load(boost::memory_order_acquire
);
464 while (ref_count
> 0u)
466 if (hdr
->m_ref_count
.compare_exchange_weak(ref_count
, ref_count
+ 1u, boost::memory_order_acq_rel
, boost::memory_order_acquire
))
472 for (unsigned int j
= 0; j
< spins
; ++j
)
474 boost::log::aux::pause();
479 #if defined(BOOST_HAS_SCHED_YIELD)
481 #elif defined(BOOST_HAS_PTHREAD_YIELD)
483 #elif defined(BOOST_HAS_NANOSLEEP)
487 nanosleep(&ts
, NULL
);
494 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
499 // Check that the queue layout matches the current process ABI
500 if (hdr
->m_abi_tag
!= header::get_abi_tag())
501 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible");
503 if (!boost::log::aux::is_power_of_2(hdr
->m_block_size
))
504 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2");
506 init_block_size(hdr
->m_block_size
);
515 void close_region() BOOST_NOEXCEPT
517 header
* const hdr
= get_header();
519 if (hdr
->m_ref_count
.fetch_sub(1u, boost::memory_order_acq_rel
) == 1u)
521 boost::interprocess::shared_memory_object::remove(m_shared_memory
.get_name());
525 boost::interprocess::mapped_region().swap(m_region
);
526 boost::interprocess::shared_memory_object().swap(m_shared_memory
);
528 m_block_size_mask
= 0u;
529 m_block_size_log2
= 0u;
533 void init_block_size(size_type block_size
)
535 m_block_size_mask
= block_size
- 1u;
537 uint32_t block_size_log2
= 0u;
538 if ((block_size
& 0x0000ffff) == 0u)
541 block_size_log2
+= 16u;
543 if ((block_size
& 0x000000ff) == 0u)
546 block_size_log2
+= 8u;
548 if ((block_size
& 0x0000000f) == 0u)
551 block_size_log2
+= 4u;
553 if ((block_size
& 0x00000003) == 0u)
556 block_size_log2
+= 2u;
558 if ((block_size
& 0x00000001) == 0u)
562 m_block_size_log2
= block_size_log2
;
567 header
* const hdr
= get_header();
569 #if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST)
574 #if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST)
576 catch (boost::log::ipc::aux::lock_owner_dead
&)
578 // The mutex is locked by the current thread, but the previous owner terminated without releasing the lock
582 hdr
->m_mutex
.recover();
586 hdr
->m_mutex
.unlock();
595 header
* const hdr
= get_header();
599 hdr
->m_nonfull_queue
.notify_all();
602 //! Returns the number of allocation blocks that are required to store user's payload of the specified size
603 uint32_t estimate_block_count(size_type size
) const BOOST_NOEXCEPT
605 // ceil((size + get_header_overhead()) / block_size)
606 return static_cast< uint32_t >((size
+ block_header::get_header_overhead() + m_block_size_mask
) >> m_block_size_log2
);
609 //! Puts the message to the back of the queue
610 void enqueue_message(void const* message_data
, size_type message_size
, uint32_t block_count
)
612 header
* const hdr
= get_header();
614 const uint32_t capacity
= hdr
->m_capacity
;
615 const size_type block_size
= hdr
->m_block_size
;
616 uint32_t pos
= hdr
->m_put_pos
;
618 block_header
* block
= hdr
->get_block(pos
);
619 block
->m_size
= message_size
;
621 size_type write_size
= (std::min
)(static_cast< size_type
>((capacity
- pos
) * block_size
- block_header::get_header_overhead()), message_size
);
622 std::memcpy(block
->get_data(), message_data
, write_size
);
625 if (BOOST_UNLIKELY(pos
>= capacity
))
627 // Write the rest of the message at the beginning of the queue
629 message_data
= static_cast< const unsigned char* >(message_data
) + write_size
;
630 write_size
= message_size
- write_size
;
632 std::memcpy(hdr
->get_block(0u), message_data
, write_size
);
635 hdr
->m_put_pos
= pos
;
637 const uint32_t old_queue_size
= hdr
->m_size
;
638 hdr
->m_size
= old_queue_size
+ block_count
;
639 if (old_queue_size
== 0u)
640 hdr
->m_nonempty_queue
.notify_one();
643 //! Retrieves the next message and invokes the handler to store the message contents
644 void dequeue_message(receive_handler handler
, void* state
)
646 header
* const hdr
= get_header();
648 const uint32_t capacity
= hdr
->m_capacity
;
649 const size_type block_size
= hdr
->m_block_size
;
650 uint32_t pos
= hdr
->m_get_pos
;
652 block_header
* block
= hdr
->get_block(pos
);
653 size_type message_size
= block
->m_size
;
654 uint32_t block_count
= estimate_block_count(message_size
);
656 BOOST_ASSERT(block_count
<= hdr
->m_size
);
658 size_type read_size
= (std::min
)(static_cast< size_type
>((capacity
- pos
) * block_size
- block_header::get_header_overhead()), message_size
);
659 handler(state
, block
->get_data(), read_size
);
662 if (BOOST_UNLIKELY(pos
>= capacity
))
664 // Read the tail of the message
666 read_size
= message_size
- read_size
;
668 handler(state
, hdr
->get_block(0u), read_size
);
671 hdr
->m_get_pos
= pos
;
672 hdr
->m_size
-= block_count
;
674 hdr
->m_nonfull_queue
.notify_all();
678 BOOST_LOG_API
void reliable_message_queue::create(object_name
const& name
, uint32_t capacity
, size_type block_size
, overflow_policy oflow_policy
, permissions
const& perms
)
680 BOOST_ASSERT(m_impl
== NULL
);
681 if (!boost::log::aux::is_power_of_2(block_size
))
682 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
685 m_impl
= new implementation(open_mode::create_only
, name
, capacity
, static_cast< size_type
>(boost::alignment::align_up(block_size
, BOOST_LOG_CPU_CACHE_LINE_SIZE
)), oflow_policy
, perms
);
687 catch (boost::exception
& e
)
689 e
<< boost::log::ipc::object_name_info(name
);
692 catch (boost::interprocess::interprocess_exception
& e
)
694 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e
.get_native_error(), boost::system::system_category()), e
.what())) << boost::log::ipc::object_name_info(name
));
698 BOOST_LOG_API
void reliable_message_queue::open_or_create(object_name
const& name
, uint32_t capacity
, size_type block_size
, overflow_policy oflow_policy
, permissions
const& perms
)
700 BOOST_ASSERT(m_impl
== NULL
);
701 if (!boost::log::aux::is_power_of_2(block_size
))
702 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
705 m_impl
= new implementation(open_mode::open_or_create
, name
, capacity
, static_cast< size_type
>(boost::alignment::align_up(block_size
, BOOST_LOG_CPU_CACHE_LINE_SIZE
)), oflow_policy
, perms
);
707 catch (boost::exception
& e
)
709 e
<< boost::log::ipc::object_name_info(name
);
712 catch (boost::interprocess::interprocess_exception
& e
)
714 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e
.get_native_error(), boost::system::system_category()), e
.what())) << boost::log::ipc::object_name_info(name
));
718 BOOST_LOG_API
void reliable_message_queue::open(object_name
const& name
, overflow_policy oflow_policy
, permissions
const&)
720 BOOST_ASSERT(m_impl
== NULL
);
723 m_impl
= new implementation(open_mode::open_only
, name
, oflow_policy
);
725 catch (boost::exception
& e
)
727 e
<< boost::log::ipc::object_name_info(name
);
730 catch (boost::interprocess::interprocess_exception
& e
)
732 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e
.get_native_error(), boost::system::system_category()), e
.what())) << boost::log::ipc::object_name_info(name
));
736 BOOST_LOG_API
void reliable_message_queue::clear()
738 BOOST_ASSERT(m_impl
!= NULL
);
743 catch (boost::exception
& e
)
745 e
<< boost::log::ipc::object_name_info(m_impl
->name());
750 BOOST_LOG_API object_name
const& reliable_message_queue::name() const
752 BOOST_ASSERT(m_impl
!= NULL
);
753 return m_impl
->name();
756 BOOST_LOG_API
uint32_t reliable_message_queue::capacity() const
758 BOOST_ASSERT(m_impl
!= NULL
);
759 return m_impl
->capacity();
762 BOOST_LOG_API
reliable_message_queue::size_type
reliable_message_queue::block_size() const
764 BOOST_ASSERT(m_impl
!= NULL
);
765 return m_impl
->block_size();
768 BOOST_LOG_API
void reliable_message_queue::stop_local()
770 BOOST_ASSERT(m_impl
!= NULL
);
773 m_impl
->stop_local();
775 catch (boost::exception
& e
)
777 e
<< boost::log::ipc::object_name_info(m_impl
->name());
782 BOOST_LOG_API
void reliable_message_queue::reset_local()
784 BOOST_ASSERT(m_impl
!= NULL
);
787 m_impl
->reset_local();
789 catch (boost::exception
& e
)
791 e
<< boost::log::ipc::object_name_info(m_impl
->name());
796 BOOST_LOG_API
void reliable_message_queue::do_close() BOOST_NOEXCEPT
802 BOOST_LOG_API
reliable_message_queue::operation_result
reliable_message_queue::send(void const* message_data
, size_type message_size
)
804 BOOST_ASSERT(m_impl
!= NULL
);
807 return m_impl
->send(message_data
, message_size
);
809 catch (boost::exception
& e
)
811 e
<< boost::log::ipc::object_name_info(m_impl
->name());
816 BOOST_LOG_API
bool reliable_message_queue::try_send(void const* message_data
, size_type message_size
)
818 BOOST_ASSERT(m_impl
!= NULL
);
821 return m_impl
->try_send(message_data
, message_size
);
823 catch (boost::exception
& e
)
825 e
<< boost::log::ipc::object_name_info(m_impl
->name());
830 BOOST_LOG_API
reliable_message_queue::operation_result
reliable_message_queue::do_receive(receive_handler handler
, void* state
)
832 BOOST_ASSERT(m_impl
!= NULL
);
835 return m_impl
->receive(handler
, state
);
837 catch (boost::exception
& e
)
839 e
<< boost::log::ipc::object_name_info(m_impl
->name());
844 BOOST_LOG_API
bool reliable_message_queue::do_try_receive(receive_handler handler
, void* state
)
846 BOOST_ASSERT(m_impl
!= NULL
);
849 return m_impl
->try_receive(handler
, state
);
851 catch (boost::exception
& e
)
853 e
<< boost::log::ipc::object_name_info(m_impl
->name());
858 //! Fixed buffer receive handler
859 BOOST_LOG_API
void reliable_message_queue::fixed_buffer_receive_handler(void* state
, const void* data
, size_type size
)
861 fixed_buffer_state
* p
= static_cast< fixed_buffer_state
* >(state
);
862 if (BOOST_UNLIKELY(size
> p
->size
))
863 BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message"));
865 std::memcpy(p
->data
, data
, size
);
870 BOOST_LOG_API
void reliable_message_queue::remove(object_name
const& name
)
872 boost::interprocess::shared_memory_object::remove(name
.c_str());
877 BOOST_LOG_CLOSE_NAMESPACE
// namespace log
881 #include <boost/log/detail/footer.hpp>