2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
9 * \file ipc_reliable_message_queue_win.hpp
11 * \author Andrey Semashev
14 * \brief This header is the Boost.Log library implementation, see the library documentation
15 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
17 * This file provides an interprocess message queue implementation on POSIX platforms.
20 #include <boost/log/detail/config.hpp>
28 #include <boost/assert.hpp>
29 #include <boost/static_assert.hpp>
30 #include <boost/cstdint.hpp>
31 #include <boost/memory_order.hpp>
32 #include <boost/atomic/ipc_atomic.hpp>
33 #include <boost/atomic/capabilities.hpp>
34 #include <boost/log/exceptions.hpp>
35 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
36 #include <boost/log/support/exception.hpp>
37 #include <boost/log/detail/pause.hpp>
38 #include <boost/exception/info.hpp>
39 #include <boost/exception/enable_error_info.hpp>
40 #include <boost/align/align_up.hpp>
41 #include <boost/winapi/thread.hpp> // SwitchToThread
42 #include "windows/ipc_sync_wrappers.hpp"
43 #include "windows/mapped_shared_memory.hpp"
44 #include "windows/utf_code_conversion.hpp"
45 #include "murmur3.hpp"
46 #include "bit_tools.hpp"
48 #include <boost/log/detail/header.hpp>
50 //! A suffix used in names of interprocess objects created by the queue.
51 //! Used as a protection against clashing with user-supplied names of interprocess queues and also to resolve conflicts between queues of different types.
52 #define BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".3010b9950926463398eee00b35b44651"
56 BOOST_LOG_OPEN_NAMESPACE
60 //! Message queue implementation data
61 struct reliable_message_queue::implementation
64 //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment.
67 // Element data alignment, in bytes
68 enum { data_alignment
= 32u };
70 //! Size of the element data, in bytes
73 //! Returns the block header overhead, in bytes
74 static BOOST_CONSTEXPR size_type
get_header_overhead() BOOST_NOEXCEPT
76 return static_cast< size_type
>(boost::alignment::align_up(sizeof(block_header
), data_alignment
));
79 //! Returns a pointer to the element data
80 void* get_data() const BOOST_NOEXCEPT
82 return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead();
86 //! Header of the message queue. Placed at the beginning of the shared memory segment.
89 // Increment this constant whenever you change the binary layout of the queue (apart from this header structure)
90 enum { abi_version
= 0 };
92 // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!!
94 //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment.
96 //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
97 unsigned char m_padding
[BOOST_LOG_CPU_CACHE_LINE_SIZE
- sizeof(uint32_t)];
98 //! A flag indicating that the queue is constructed (i.e. the queue is constructed when the value is not 0).
99 boost::ipc_atomic
< uint32_t > m_initialized
;
100 //! Number of allocation blocks in the queue.
101 const uint32_t m_capacity
;
102 //! Size of an allocation block, in bytes.
103 const size_type m_block_size
;
104 //! Shared state of the mutex for protecting queue data structures.
105 boost::log::ipc::aux::interprocess_mutex::shared_state m_mutex_state
;
106 //! Shared state of the condition variable used to block writers when the queue is full.
107 boost::log::ipc::aux::interprocess_condition_variable::shared_state m_nonfull_queue_state
;
108 //! The current number of allocated blocks in the queue.
110 //! The current writing position (allocation block index).
112 //! The current reading position (allocation block index).
115 header(uint32_t capacity
, size_type block_size
) :
116 m_abi_tag(get_abi_tag()),
117 m_capacity(capacity
),
118 m_block_size(block_size
),
123 // Must be initialized last. m_initialized is zero-initialized initially.
124 m_initialized
.opaque_add(1u, boost::memory_order_release
);
127 //! Returns the header structure ABI tag
128 static uint32_t get_abi_tag() BOOST_NOEXCEPT
130 // This FOURCC identifies the queue type
131 boost::log::aux::murmur3_32
hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q'));
133 // This FOURCC identifies the queue implementation
134 hash
.mix(boost::log::aux::make_fourcc('w', 'n', 't', '5'));
135 hash
.mix(abi_version
);
137 // We will use these constants to align pointers
138 hash
.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE
);
139 hash
.mix(block_header::data_alignment
);
141 // The members in the sequence below must be enumerated in the same order as they are declared in the header structure.
142 // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header.
144 #define BOOST_LOG_MIX_HEADER_MEMBER(name)\
145 hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\
146 hash.mix(static_cast< uint32_t >(offsetof(header, name)))
148 BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag
);
149 BOOST_LOG_MIX_HEADER_MEMBER(m_padding
);
150 BOOST_LOG_MIX_HEADER_MEMBER(m_initialized
);
151 BOOST_LOG_MIX_HEADER_MEMBER(m_capacity
);
152 BOOST_LOG_MIX_HEADER_MEMBER(m_block_size
);
153 BOOST_LOG_MIX_HEADER_MEMBER(m_mutex_state
);
154 BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue_state
);
155 BOOST_LOG_MIX_HEADER_MEMBER(m_size
);
156 BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos
);
157 BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos
);
159 #undef BOOST_LOG_MIX_HEADER_MEMBER
161 return hash
.finalize();
164 //! Returns an element header at the specified index
165 block_header
* get_block(uint32_t index
) const BOOST_NOEXCEPT
167 BOOST_ASSERT(index
< m_capacity
);
168 unsigned char* p
= const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header
), BOOST_LOG_CPU_CACHE_LINE_SIZE
);
169 p
+= static_cast< std::size_t >(m_block_size
) * static_cast< std::size_t >(index
);
170 return reinterpret_cast< block_header
* >(p
);
173 BOOST_DELETED_FUNCTION(header(header
const&))
174 BOOST_DELETED_FUNCTION(header
& operator=(header
const&))
178 //! Shared memory object and mapping
179 boost::log::ipc::aux::mapped_shared_memory m_shared_memory
;
180 //! Queue overflow handling policy
181 const overflow_policy m_overflow_policy
;
182 //! The mask for selecting bits that constitute size values from 0 to (block_size - 1)
183 size_type m_block_size_mask
;
184 //! The number of the bit set in block_size (i.e. log base 2 of block_size)
185 uint32_t m_block_size_log2
;
187 //! Mutex for protecting queue data structures.
188 boost::log::ipc::aux::interprocess_mutex m_mutex
;
189 //! Event used to block readers when the queue is empty.
190 boost::log::ipc::aux::interprocess_event m_nonempty_queue
;
191 //! Condition variable used to block writers when the queue is full.
192 boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue
;
193 //! The event indicates that stop has been requested
194 boost::log::ipc::aux::auto_handle m_stop
;
196 //! The queue name, as specified by the user
197 const object_name m_name
;
200 //! The constructor creates a new shared memory segment
203 open_mode::create_only_tag
,
204 object_name
const& name
,
206 size_type block_size
,
207 overflow_policy oflow_policy
,
208 permissions
const& perms
210 m_overflow_policy(oflow_policy
),
211 m_block_size_mask(0u),
212 m_block_size_log2(0u),
215 BOOST_ASSERT(block_size
>= block_header::get_header_overhead());
216 const std::wstring wname
= boost::log::aux::utf8_to_utf16(name
.c_str());
217 const std::size_t shmem_size
= estimate_region_size(capacity
, block_size
);
218 m_shared_memory
.create(wname
.c_str(), shmem_size
, perms
);
219 m_shared_memory
.map();
221 create_queue(wname
, capacity
, block_size
, perms
);
224 //! The constructor creates a new shared memory segment or opens the existing one
227 open_mode::open_or_create_tag
,
228 object_name
const& name
,
230 size_type block_size
,
231 overflow_policy oflow_policy
,
232 permissions
const& perms
234 m_overflow_policy(oflow_policy
),
235 m_block_size_mask(0u),
236 m_block_size_log2(0u),
239 BOOST_ASSERT(block_size
>= block_header::get_header_overhead());
240 const std::wstring wname
= boost::log::aux::utf8_to_utf16(name
.c_str());
241 const std::size_t shmem_size
= estimate_region_size(capacity
, block_size
);
242 const bool created
= m_shared_memory
.create_or_open(wname
.c_str(), shmem_size
, perms
);
243 m_shared_memory
.map();
246 create_queue(wname
, capacity
, block_size
, perms
);
248 adopt_queue(wname
, m_shared_memory
.size(), perms
);
251 //! The constructor opens the existing shared memory segment
254 open_mode::open_only_tag
,
255 object_name
const& name
,
256 overflow_policy oflow_policy
,
257 permissions
const& perms
259 m_overflow_policy(oflow_policy
),
260 m_block_size_mask(0u),
261 m_block_size_log2(0u),
264 const std::wstring wname
= boost::log::aux::utf8_to_utf16(name
.c_str());
265 m_shared_memory
.open(wname
.c_str());
266 m_shared_memory
.map();
268 adopt_queue(wname
, m_shared_memory
.size(), perms
);
271 object_name
const& name() const BOOST_NOEXCEPT
276 uint32_t capacity() const BOOST_NOEXCEPT
278 return get_header()->m_capacity
;
281 size_type
block_size() const BOOST_NOEXCEPT
283 return get_header()->m_block_size
;
286 operation_result
send(void const* message_data
, size_type message_size
)
288 const uint32_t block_count
= estimate_block_count(message_size
);
290 header
* const hdr
= get_header();
292 if (BOOST_UNLIKELY(block_count
> hdr
->m_capacity
))
293 BOOST_LOG_THROW_DESCR(logic_error
, "Message size exceeds the interprocess queue capacity");
298 boost::log::ipc::aux::interprocess_mutex::optional_unlock
unlock(m_mutex
);
302 if ((hdr
->m_capacity
- hdr
->m_size
) >= block_count
)
305 const overflow_policy oflow_policy
= m_overflow_policy
;
306 if (oflow_policy
== fail_on_overflow
)
308 else if (BOOST_UNLIKELY(oflow_policy
== throw_on_overflow
))
309 BOOST_LOG_THROW_DESCR(capacity_limit_reached
, "Interprocess queue is full");
311 if (!m_nonfull_queue
.wait(unlock
, m_stop
.get()))
315 enqueue_message(message_data
, message_size
, block_count
);
320 bool try_send(void const* message_data
, size_type message_size
)
322 const uint32_t block_count
= estimate_block_count(message_size
);
324 header
* const hdr
= get_header();
326 if (BOOST_UNLIKELY(block_count
> hdr
->m_capacity
))
327 BOOST_LOG_THROW_DESCR(logic_error
, "Message size exceeds the interprocess queue capacity");
332 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(m_mutex
);
334 if ((hdr
->m_capacity
- hdr
->m_size
) < block_count
)
337 enqueue_message(message_data
, message_size
, block_count
);
342 operation_result
receive(receive_handler handler
, void* state
)
347 boost::log::ipc::aux::interprocess_mutex::optional_unlock
unlock(m_mutex
);
349 header
* const hdr
= get_header();
353 if (hdr
->m_size
> 0u)
359 if (!m_nonempty_queue
.wait(m_stop
.get()) || !lock_queue())
362 unlock
.engage(m_mutex
);
365 dequeue_message(handler
, state
);
370 bool try_receive(receive_handler handler
, void* state
)
375 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(m_mutex
);
377 header
* const hdr
= get_header();
378 if (hdr
->m_size
== 0u)
381 dequeue_message(handler
, state
);
388 BOOST_VERIFY(boost::winapi::SetEvent(m_stop
.get()) != 0);
393 BOOST_VERIFY(boost::winapi::ResetEvent(m_stop
.get()) != 0);
399 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(m_mutex
);
404 header
* get_header() const BOOST_NOEXCEPT
406 return static_cast< header
* >(m_shared_memory
.address());
409 static std::size_t estimate_region_size(uint32_t capacity
, size_type block_size
) BOOST_NOEXCEPT
411 return boost::alignment::align_up(sizeof(header
), BOOST_LOG_CPU_CACHE_LINE_SIZE
) + static_cast< std::size_t >(capacity
) * static_cast< std::size_t >(block_size
);
414 void create_stop_event()
416 #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6
417 boost::winapi::HANDLE_ h
= boost::winapi::CreateEventExW
421 boost::winapi::CREATE_EVENT_MANUAL_RESET_
,
422 boost::winapi::SYNCHRONIZE_
| boost::winapi::EVENT_MODIFY_STATE_
425 boost::winapi::HANDLE_ h
= boost::winapi::CreateEventW
428 true, // manual reset
429 false, // initial state
433 if (BOOST_UNLIKELY(h
== NULL
))
435 boost::winapi::DWORD_ err
= boost::winapi::GetLastError();
436 BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error
, "Failed to create an stop event object", (err
));
442 void create_queue(std::wstring
const& name
, uint32_t capacity
, size_type block_size
, permissions
const& perms
)
444 // Initialize synchronization primitives before initializing the header as the openers will wait for it to be initialized
445 header
* const hdr
= get_header();
446 m_mutex
.create((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".mutex").c_str(), &hdr
->m_mutex_state
, perms
);
447 m_nonempty_queue
.create((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".nonempty_queue_event").c_str(), false, perms
);
448 m_nonfull_queue
.init((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".nonfull_queue_cond_var").c_str(), &hdr
->m_nonfull_queue_state
, perms
);
451 new (hdr
) header(capacity
, block_size
);
453 init_block_size(block_size
);
456 void adopt_queue(std::wstring
const& name
, std::size_t shmem_size
, permissions
const& perms
)
458 if (shmem_size
< sizeof(header
))
459 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
461 // Wait until the mapped region becomes initialized
462 header
* const hdr
= get_header();
463 BOOST_CONSTEXPR_OR_CONST
unsigned int wait_loops
= 1000u, spin_loops
= 16u, spins
= 16u;
464 for (unsigned int i
= 0; i
< wait_loops
; ++i
)
466 uint32_t initialized
= hdr
->m_initialized
.load(boost::memory_order_acquire
);
474 for (unsigned int j
= 0; j
< spins
; ++j
)
476 boost::log::aux::pause();
481 boost::winapi::SwitchToThread();
485 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
488 // Check that the queue layout matches the current process ABI
489 if (hdr
->m_abi_tag
!= header::get_abi_tag())
490 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible");
492 if (!boost::log::aux::is_power_of_2(hdr
->m_block_size
))
493 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2");
495 m_mutex
.open((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".mutex").c_str(), &hdr
->m_mutex_state
);
496 m_nonempty_queue
.open((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".nonempty_queue_event").c_str());
497 m_nonfull_queue
.init((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".nonfull_queue_cond_var").c_str(), &hdr
->m_nonfull_queue_state
, perms
);
500 init_block_size(hdr
->m_block_size
);
503 void init_block_size(size_type block_size
)
505 m_block_size_mask
= block_size
- 1u;
507 uint32_t block_size_log2
= 0u;
508 if ((block_size
& 0x0000ffff) == 0u)
511 block_size_log2
+= 16u;
513 if ((block_size
& 0x000000ff) == 0u)
516 block_size_log2
+= 8u;
518 if ((block_size
& 0x0000000f) == 0u)
521 block_size_log2
+= 4u;
523 if ((block_size
& 0x00000003) == 0u)
526 block_size_log2
+= 2u;
528 if ((block_size
& 0x00000001) == 0u)
532 m_block_size_log2
= block_size_log2
;
537 return m_mutex
.lock(m_stop
.get());
542 header
* const hdr
= get_header();
546 m_nonfull_queue
.notify_all();
549 //! Returns the number of allocation blocks that are required to store user's payload of the specified size
550 uint32_t estimate_block_count(size_type size
) const BOOST_NOEXCEPT
552 // ceil((size + get_header_overhead()) / block_size)
553 return static_cast< uint32_t >((size
+ block_header::get_header_overhead() + m_block_size_mask
) >> m_block_size_log2
);
556 //! Puts the message to the back of the queue
557 void enqueue_message(void const* message_data
, size_type message_size
, uint32_t block_count
)
559 header
* const hdr
= get_header();
561 const uint32_t capacity
= hdr
->m_capacity
;
562 const size_type block_size
= hdr
->m_block_size
;
563 uint32_t pos
= hdr
->m_put_pos
;
564 BOOST_ASSERT(pos
< capacity
);
566 block_header
* block
= hdr
->get_block(pos
);
567 block
->m_size
= message_size
;
569 size_type write_size
= (std::min
)(static_cast< size_type
>((capacity
- pos
) * block_size
- block_header::get_header_overhead()), message_size
);
570 std::memcpy(block
->get_data(), message_data
, write_size
);
573 if (BOOST_UNLIKELY(pos
>= capacity
))
575 // Write the rest of the message at the beginning of the queue
577 message_data
= static_cast< const unsigned char* >(message_data
) + write_size
;
578 write_size
= message_size
- write_size
;
580 std::memcpy(hdr
->get_block(0u), message_data
, write_size
);
583 hdr
->m_put_pos
= pos
;
585 const uint32_t old_queue_size
= hdr
->m_size
;
586 hdr
->m_size
= old_queue_size
+ block_count
;
587 if (old_queue_size
== 0u)
588 m_nonempty_queue
.set();
591 //! Retrieves the next message and invokes the handler to store the message contents
592 void dequeue_message(receive_handler handler
, void* state
)
594 header
* const hdr
= get_header();
596 const uint32_t capacity
= hdr
->m_capacity
;
597 const size_type block_size
= hdr
->m_block_size
;
598 uint32_t pos
= hdr
->m_get_pos
;
599 BOOST_ASSERT(pos
< capacity
);
601 block_header
* block
= hdr
->get_block(pos
);
602 size_type message_size
= block
->m_size
;
603 uint32_t block_count
= estimate_block_count(message_size
);
605 BOOST_ASSERT(block_count
<= hdr
->m_size
);
607 size_type read_size
= (std::min
)(static_cast< size_type
>((capacity
- pos
) * block_size
- block_header::get_header_overhead()), message_size
);
608 handler(state
, block
->get_data(), read_size
);
611 if (BOOST_UNLIKELY(pos
>= capacity
))
613 // Read the tail of the message
615 read_size
= message_size
- read_size
;
617 handler(state
, hdr
->get_block(0u), read_size
);
620 hdr
->m_get_pos
= pos
;
621 hdr
->m_size
-= block_count
;
623 m_nonfull_queue
.notify_all();
627 BOOST_LOG_API
void reliable_message_queue::create(object_name
const& name
, uint32_t capacity
, size_type block_size
, overflow_policy oflow_policy
, permissions
const& perms
)
629 BOOST_ASSERT(m_impl
== NULL
);
630 if (!boost::log::aux::is_power_of_2(block_size
))
631 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
634 m_impl
= new implementation(open_mode::create_only
, name
, capacity
, static_cast< size_type
>(boost::alignment::align_up(block_size
, BOOST_LOG_CPU_CACHE_LINE_SIZE
)), oflow_policy
, perms
);
636 catch (boost::exception
& e
)
638 e
<< boost::log::ipc::object_name_info(name
);
643 BOOST_LOG_API
void reliable_message_queue::open_or_create(object_name
const& name
, uint32_t capacity
, size_type block_size
, overflow_policy oflow_policy
, permissions
const& perms
)
645 BOOST_ASSERT(m_impl
== NULL
);
646 if (!boost::log::aux::is_power_of_2(block_size
))
647 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
650 m_impl
= new implementation(open_mode::open_or_create
, name
, capacity
, static_cast< size_type
>(boost::alignment::align_up(block_size
, BOOST_LOG_CPU_CACHE_LINE_SIZE
)), oflow_policy
, perms
);
652 catch (boost::exception
& e
)
654 e
<< boost::log::ipc::object_name_info(name
);
659 BOOST_LOG_API
void reliable_message_queue::open(object_name
const& name
, overflow_policy oflow_policy
, permissions
const& perms
)
661 BOOST_ASSERT(m_impl
== NULL
);
664 m_impl
= new implementation(open_mode::open_only
, name
, oflow_policy
, perms
);
666 catch (boost::exception
& e
)
668 e
<< boost::log::ipc::object_name_info(name
);
673 BOOST_LOG_API
void reliable_message_queue::clear()
675 BOOST_ASSERT(m_impl
!= NULL
);
680 catch (boost::exception
& e
)
682 e
<< boost::log::ipc::object_name_info(m_impl
->name());
687 BOOST_LOG_API object_name
const& reliable_message_queue::name() const
689 BOOST_ASSERT(m_impl
!= NULL
);
690 return m_impl
->name();
693 BOOST_LOG_API
uint32_t reliable_message_queue::capacity() const
695 BOOST_ASSERT(m_impl
!= NULL
);
696 return m_impl
->capacity();
699 BOOST_LOG_API
reliable_message_queue::size_type
reliable_message_queue::block_size() const
701 BOOST_ASSERT(m_impl
!= NULL
);
702 return m_impl
->block_size();
705 BOOST_LOG_API
void reliable_message_queue::stop_local()
707 BOOST_ASSERT(m_impl
!= NULL
);
710 m_impl
->stop_local();
712 catch (boost::exception
& e
)
714 e
<< boost::log::ipc::object_name_info(m_impl
->name());
719 BOOST_LOG_API
void reliable_message_queue::reset_local()
721 BOOST_ASSERT(m_impl
!= NULL
);
724 m_impl
->reset_local();
726 catch (boost::exception
& e
)
728 e
<< boost::log::ipc::object_name_info(m_impl
->name());
733 BOOST_LOG_API
void reliable_message_queue::do_close() BOOST_NOEXCEPT
739 BOOST_LOG_API
reliable_message_queue::operation_result
reliable_message_queue::send(void const* message_data
, size_type message_size
)
741 BOOST_ASSERT(m_impl
!= NULL
);
744 return m_impl
->send(message_data
, message_size
);
746 catch (boost::exception
& e
)
748 e
<< boost::log::ipc::object_name_info(m_impl
->name());
753 BOOST_LOG_API
bool reliable_message_queue::try_send(void const* message_data
, size_type message_size
)
755 BOOST_ASSERT(m_impl
!= NULL
);
758 return m_impl
->try_send(message_data
, message_size
);
760 catch (boost::exception
& e
)
762 e
<< boost::log::ipc::object_name_info(m_impl
->name());
767 BOOST_LOG_API
reliable_message_queue::operation_result
reliable_message_queue::do_receive(receive_handler handler
, void* state
)
769 BOOST_ASSERT(m_impl
!= NULL
);
772 return m_impl
->receive(handler
, state
);
774 catch (boost::exception
& e
)
776 e
<< boost::log::ipc::object_name_info(m_impl
->name());
781 BOOST_LOG_API
bool reliable_message_queue::do_try_receive(receive_handler handler
, void* state
)
783 BOOST_ASSERT(m_impl
!= NULL
);
786 return m_impl
->try_receive(handler
, state
);
788 catch (boost::exception
& e
)
790 e
<< boost::log::ipc::object_name_info(m_impl
->name());
795 //! Fixed buffer receive handler
796 BOOST_LOG_API
void reliable_message_queue::fixed_buffer_receive_handler(void* state
, const void* data
, size_type size
)
798 fixed_buffer_state
* p
= static_cast< fixed_buffer_state
* >(state
);
799 if (BOOST_UNLIKELY(size
> p
->size
))
800 BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message"));
802 std::memcpy(p
->data
, data
, size
);
807 BOOST_LOG_API
void reliable_message_queue::remove(object_name
const&)
809 // System objects are reference counted on Windows, nothing to do here
814 BOOST_LOG_CLOSE_NAMESPACE
// namespace log
818 #include <boost/log/detail/footer.hpp>