2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
9 * \file ipc_reliable_message_queue_win.hpp
11 * \author Andrey Semashev
14 * \brief This header is the Boost.Log library implementation, see the library documentation
15 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
17 * This file provides an interprocess message queue implementation on POSIX platforms.
20 #include <boost/log/detail/config.hpp>
28 #include <boost/assert.hpp>
29 #include <boost/static_assert.hpp>
30 #include <boost/cstdint.hpp>
31 #include <boost/atomic/atomic.hpp>
32 #include <boost/atomic/capabilities.hpp>
33 #include <boost/log/exceptions.hpp>
34 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
35 #include <boost/log/support/exception.hpp>
36 #include <boost/log/detail/pause.hpp>
37 #include <boost/exception/info.hpp>
38 #include <boost/exception/enable_error_info.hpp>
39 #include <boost/align/align_up.hpp>
40 #include <boost/detail/winapi/thread.hpp> // SwitchToThread
41 #include "windows/ipc_sync_wrappers.hpp"
42 #include "windows/mapped_shared_memory.hpp"
43 #include "windows/utf_code_conversion.hpp"
44 #include "murmur3.hpp"
45 #include "bit_tools.hpp"
47 #include <boost/log/detail/header.hpp>
49 #if BOOST_ATOMIC_INT32_LOCK_FREE != 2
50 // 32-bit atomic ops are required to be able to place atomic<uint32_t> in the process-shared memory
51 #error Boost.Log: Native 32-bit atomic operations are required but not supported by Boost.Atomic on the target platform
54 //! A suffix used in names of interprocess objects created by the queue.
55 //! Used as a protection against clashing with user-supplied names of interprocess queues and also to resolve conflicts between queues of different types.
56 #define BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".3010b9950926463398eee00b35b44651"
60 BOOST_LOG_OPEN_NAMESPACE
64 //! Message queue implementation data
65 struct reliable_message_queue::implementation
68 //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment.
71 // Element data alignment, in bytes
72 enum { data_alignment
= 32u };
74 //! Size of the element data, in bytes
77 //! Returns the block header overhead, in bytes
78 static BOOST_CONSTEXPR size_type
get_header_overhead() BOOST_NOEXCEPT
80 return static_cast< size_type
>(boost::alignment::align_up(sizeof(block_header
), data_alignment
));
83 //! Returns a pointer to the element data
84 void* get_data() const BOOST_NOEXCEPT
86 return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead();
90 //! Header of the message queue. Placed at the beginning of the shared memory segment.
93 // Increment this constant whenever you change the binary layout of the queue (apart from this header structure)
94 enum { abi_version
= 0 };
96 // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!!
98 //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment.
100 //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
101 unsigned char m_padding
[BOOST_LOG_CPU_CACHE_LINE_SIZE
- sizeof(uint32_t)];
102 //! A flag indicating that the queue is constructed (i.e. the queue is constructed when the value is not 0).
103 boost::atomic
< uint32_t > m_initialized
;
104 //! Number of allocation blocks in the queue.
105 const uint32_t m_capacity
;
106 //! Size of an allocation block, in bytes.
107 const size_type m_block_size
;
108 //! Shared state of the mutex for protecting queue data structures.
109 boost::log::ipc::aux::interprocess_mutex::shared_state m_mutex_state
;
110 //! Shared state of the condition variable used to block writers when the queue is full.
111 boost::log::ipc::aux::interprocess_condition_variable::shared_state m_nonfull_queue_state
;
112 //! The current number of allocated blocks in the queue.
114 //! The current writing position (allocation block index).
116 //! The current reading position (allocation block index).
119 header(uint32_t capacity
, size_type block_size
) :
120 m_abi_tag(get_abi_tag()),
121 m_capacity(capacity
),
122 m_block_size(block_size
),
127 // Must be initialized last. m_initialized is zero-initialized initially.
128 m_initialized
.fetch_add(1u, boost::memory_order_release
);
131 //! Returns the header structure ABI tag
132 static uint32_t get_abi_tag() BOOST_NOEXCEPT
134 // This FOURCC identifies the queue type
135 boost::log::aux::murmur3_32
hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q'));
137 // This FOURCC identifies the queue implementation
138 hash
.mix(boost::log::aux::make_fourcc('w', 'n', 't', '5'));
139 hash
.mix(abi_version
);
141 // We will use these constants to align pointers
142 hash
.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE
);
143 hash
.mix(block_header::data_alignment
);
145 // The members in the sequence below must be enumerated in the same order as they are declared in the header structure.
146 // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header.
148 #define BOOST_LOG_MIX_HEADER_MEMBER(name)\
149 hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\
150 hash.mix(static_cast< uint32_t >(offsetof(header, name)))
152 BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag
);
153 BOOST_LOG_MIX_HEADER_MEMBER(m_padding
);
154 BOOST_LOG_MIX_HEADER_MEMBER(m_initialized
);
155 BOOST_LOG_MIX_HEADER_MEMBER(m_capacity
);
156 BOOST_LOG_MIX_HEADER_MEMBER(m_block_size
);
157 BOOST_LOG_MIX_HEADER_MEMBER(m_mutex_state
);
158 BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue_state
);
159 BOOST_LOG_MIX_HEADER_MEMBER(m_size
);
160 BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos
);
161 BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos
);
163 #undef BOOST_LOG_MIX_HEADER_MEMBER
165 return hash
.finalize();
168 //! Returns an element header at the specified index
169 block_header
* get_block(uint32_t index
) const BOOST_NOEXCEPT
171 BOOST_ASSERT(index
< m_capacity
);
172 unsigned char* p
= const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header
), BOOST_LOG_CPU_CACHE_LINE_SIZE
);
173 p
+= static_cast< std::size_t >(m_block_size
) * static_cast< std::size_t >(index
);
174 return reinterpret_cast< block_header
* >(p
);
177 BOOST_DELETED_FUNCTION(header(header
const&))
178 BOOST_DELETED_FUNCTION(header
& operator=(header
const&))
182 //! Shared memory object and mapping
183 boost::log::ipc::aux::mapped_shared_memory m_shared_memory
;
184 //! Queue overflow handling policy
185 const overflow_policy m_overflow_policy
;
186 //! The mask for selecting bits that constitute size values from 0 to (block_size - 1)
187 size_type m_block_size_mask
;
188 //! The number of the bit set in block_size (i.e. log base 2 of block_size)
189 uint32_t m_block_size_log2
;
191 //! Mutex for protecting queue data structures.
192 boost::log::ipc::aux::interprocess_mutex m_mutex
;
193 //! Event used to block readers when the queue is empty.
194 boost::log::ipc::aux::interprocess_event m_nonempty_queue
;
195 //! Condition variable used to block writers when the queue is full.
196 boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue
;
197 //! The event indicates that stop has been requested
198 boost::log::ipc::aux::auto_handle m_stop
;
200 //! The queue name, as specified by the user
201 const object_name m_name
;
204 //! The constructor creates a new shared memory segment
207 open_mode::create_only_tag
,
208 object_name
const& name
,
210 size_type block_size
,
211 overflow_policy oflow_policy
,
212 permissions
const& perms
214 m_overflow_policy(oflow_policy
),
215 m_block_size_mask(0u),
216 m_block_size_log2(0u),
219 const std::wstring wname
= boost::log::aux::utf8_to_utf16(name
.c_str());
220 const std::size_t shmem_size
= estimate_region_size(capacity
, block_size
);
221 m_shared_memory
.create(wname
.c_str(), shmem_size
, perms
);
222 m_shared_memory
.map();
224 create_queue(wname
, capacity
, block_size
, perms
);
227 //! The constructor creates a new shared memory segment or opens the existing one
230 open_mode::open_or_create_tag
,
231 object_name
const& name
,
233 size_type block_size
,
234 overflow_policy oflow_policy
,
235 permissions
const& perms
237 m_overflow_policy(oflow_policy
),
238 m_block_size_mask(0u),
239 m_block_size_log2(0u),
242 const std::wstring wname
= boost::log::aux::utf8_to_utf16(name
.c_str());
243 const std::size_t shmem_size
= estimate_region_size(capacity
, block_size
);
244 const bool created
= m_shared_memory
.create_or_open(wname
.c_str(), shmem_size
, perms
);
245 m_shared_memory
.map();
248 create_queue(wname
, capacity
, block_size
, perms
);
250 adopt_queue(wname
, m_shared_memory
.size(), perms
);
253 //! The constructor opens the existing shared memory segment
256 open_mode::open_only_tag
,
257 object_name
const& name
,
258 overflow_policy oflow_policy
,
259 permissions
const& perms
261 m_overflow_policy(oflow_policy
),
262 m_block_size_mask(0u),
263 m_block_size_log2(0u),
266 const std::wstring wname
= boost::log::aux::utf8_to_utf16(name
.c_str());
267 m_shared_memory
.open(wname
.c_str());
268 m_shared_memory
.map();
270 adopt_queue(wname
, m_shared_memory
.size(), perms
);
273 object_name
const& name() const BOOST_NOEXCEPT
278 uint32_t capacity() const BOOST_NOEXCEPT
280 return get_header()->m_capacity
;
283 size_type
block_size() const BOOST_NOEXCEPT
285 return get_header()->m_block_size
;
288 operation_result
send(void const* message_data
, size_type message_size
)
290 const uint32_t block_count
= estimate_block_count(message_size
);
292 header
* const hdr
= get_header();
294 if (BOOST_UNLIKELY(block_count
> hdr
->m_capacity
))
295 BOOST_LOG_THROW_DESCR(logic_error
, "Message size exceeds the interprocess queue capacity");
300 boost::log::ipc::aux::interprocess_mutex::optional_unlock
unlock(m_mutex
);
304 if ((hdr
->m_capacity
- hdr
->m_size
) >= block_count
)
307 const overflow_policy oflow_policy
= m_overflow_policy
;
308 if (oflow_policy
== fail_on_overflow
)
310 else if (BOOST_UNLIKELY(oflow_policy
== throw_on_overflow
))
311 BOOST_LOG_THROW_DESCR(capacity_limit_reached
, "Interprocess queue is full");
313 if (!m_nonfull_queue
.wait(unlock
, m_stop
.get()))
317 enqueue_message(message_data
, message_size
, block_count
);
322 bool try_send(void const* message_data
, size_type message_size
)
324 const uint32_t block_count
= estimate_block_count(message_size
);
326 header
* const hdr
= get_header();
328 if (BOOST_UNLIKELY(block_count
> hdr
->m_capacity
))
329 BOOST_LOG_THROW_DESCR(logic_error
, "Message size exceeds the interprocess queue capacity");
334 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(m_mutex
);
336 if ((hdr
->m_capacity
- hdr
->m_size
) < block_count
)
339 enqueue_message(message_data
, message_size
, block_count
);
344 operation_result
receive(receive_handler handler
, void* state
)
349 boost::log::ipc::aux::interprocess_mutex::optional_unlock
unlock(m_mutex
);
351 header
* const hdr
= get_header();
355 if (hdr
->m_size
> 0u)
361 if (!m_nonempty_queue
.wait(m_stop
.get()) || !lock_queue())
364 unlock
.engage(m_mutex
);
367 dequeue_message(handler
, state
);
372 bool try_receive(receive_handler handler
, void* state
)
377 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(m_mutex
);
379 header
* const hdr
= get_header();
380 if (hdr
->m_size
== 0u)
383 dequeue_message(handler
, state
);
390 BOOST_VERIFY(boost::detail::winapi::SetEvent(m_stop
.get()) != 0);
395 BOOST_VERIFY(boost::detail::winapi::ResetEvent(m_stop
.get()) != 0);
401 boost::log::ipc::aux::interprocess_mutex::auto_unlock
unlock(m_mutex
);
406 header
* get_header() const BOOST_NOEXCEPT
408 return static_cast< header
* >(m_shared_memory
.address());
411 static std::size_t estimate_region_size(uint32_t capacity
, size_type block_size
) BOOST_NOEXCEPT
413 return boost::alignment::align_up(sizeof(header
), BOOST_LOG_CPU_CACHE_LINE_SIZE
) + static_cast< std::size_t >(capacity
) * static_cast< std::size_t >(block_size
);
416 void create_stop_event()
418 #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6
419 boost::detail::winapi::HANDLE_ h
= boost::detail::winapi::CreateEventExW
423 boost::detail::winapi::CREATE_EVENT_MANUAL_RESET_
,
424 boost::detail::winapi::SYNCHRONIZE_
| boost::detail::winapi::EVENT_MODIFY_STATE_
427 boost::detail::winapi::HANDLE_ h
= boost::detail::winapi::CreateEventW
430 true, // manual reset
431 false, // initial state
435 if (BOOST_UNLIKELY(h
== NULL
))
437 boost::detail::winapi::DWORD_ err
= boost::detail::winapi::GetLastError();
438 BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error
, "Failed to create an stop event object", (err
));
444 void create_queue(std::wstring
const& name
, uint32_t capacity
, size_type block_size
, permissions
const& perms
)
446 // Initialize synchronization primitives before initializing the header as the openers will wait for it to be initialized
447 header
* const hdr
= get_header();
448 m_mutex
.create((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".mutex").c_str(), &hdr
->m_mutex_state
, perms
);
449 m_nonempty_queue
.create((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".nonempty_queue_event").c_str(), false, perms
);
450 m_nonfull_queue
.init((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".nonfull_queue_cond_var").c_str(), &hdr
->m_nonfull_queue_state
, perms
);
453 new (hdr
) header(capacity
, block_size
);
455 init_block_size(block_size
);
458 void adopt_queue(std::wstring
const& name
, std::size_t shmem_size
, permissions
const& perms
)
460 if (shmem_size
< sizeof(header
))
461 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
463 // Wait until the mapped region becomes initialized
464 header
* const hdr
= get_header();
465 BOOST_CONSTEXPR_OR_CONST
unsigned int wait_loops
= 1000u, spin_loops
= 16u, spins
= 16u;
466 for (unsigned int i
= 0; i
< wait_loops
; ++i
)
468 uint32_t initialized
= hdr
->m_initialized
.load(boost::memory_order_acquire
);
476 for (unsigned int j
= 0; j
< spins
; ++j
)
478 boost::log::aux::pause();
483 boost::detail::winapi::SwitchToThread();
487 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
490 // Check that the queue layout matches the current process ABI
491 if (hdr
->m_abi_tag
!= header::get_abi_tag())
492 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible");
494 if (!boost::log::aux::is_power_of_2(hdr
->m_block_size
))
495 BOOST_LOG_THROW_DESCR(setup_error
, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2");
497 m_mutex
.open((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".mutex").c_str(), &hdr
->m_mutex_state
);
498 m_nonempty_queue
.open((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".nonempty_queue_event").c_str());
499 m_nonfull_queue
.init((name
+ BOOST_LOG_IPC_NAMES_AUX_SUFFIX L
".nonfull_queue_cond_var").c_str(), &hdr
->m_nonfull_queue_state
, perms
);
502 init_block_size(hdr
->m_block_size
);
505 void init_block_size(size_type block_size
)
507 m_block_size_mask
= block_size
- 1u;
509 uint32_t block_size_log2
= 0u;
510 if ((block_size
& 0x0000ffff) == 0u)
513 block_size_log2
+= 16u;
515 if ((block_size
& 0x000000ff) == 0u)
518 block_size_log2
+= 8u;
520 if ((block_size
& 0x0000000f) == 0u)
523 block_size_log2
+= 4u;
525 if ((block_size
& 0x00000003) == 0u)
528 block_size_log2
+= 2u;
530 if ((block_size
& 0x00000001) == 0u)
534 m_block_size_log2
= block_size_log2
;
539 return m_mutex
.lock(m_stop
.get());
544 header
* const hdr
= get_header();
548 m_nonfull_queue
.notify_all();
551 //! Returns the number of allocation blocks that are required to store user's payload of the specified size
552 uint32_t estimate_block_count(size_type size
) const BOOST_NOEXCEPT
554 // ceil((size + get_header_overhead()) / block_size)
555 return static_cast< uint32_t >((size
+ block_header::get_header_overhead() + m_block_size_mask
) >> m_block_size_log2
);
558 //! Puts the message to the back of the queue
559 void enqueue_message(void const* message_data
, size_type message_size
, uint32_t block_count
)
561 header
* const hdr
= get_header();
563 const uint32_t capacity
= hdr
->m_capacity
;
564 const size_type block_size
= hdr
->m_block_size
;
565 uint32_t pos
= hdr
->m_put_pos
;
567 block_header
* block
= hdr
->get_block(pos
);
568 block
->m_size
= message_size
;
570 size_type write_size
= (std::min
)(static_cast< size_type
>((capacity
- pos
) * block_size
- block_header::get_header_overhead()), message_size
);
571 std::memcpy(block
->get_data(), message_data
, write_size
);
574 if (BOOST_UNLIKELY(pos
>= capacity
))
576 // Write the rest of the message at the beginning of the queue
578 message_data
= static_cast< const unsigned char* >(message_data
) + write_size
;
579 write_size
= message_size
- write_size
;
581 std::memcpy(hdr
->get_block(0u), message_data
, write_size
);
584 hdr
->m_put_pos
= pos
;
586 const uint32_t old_queue_size
= hdr
->m_size
;
587 hdr
->m_size
= old_queue_size
+ block_count
;
588 if (old_queue_size
== 0u)
589 m_nonempty_queue
.set();
592 //! Retrieves the next message and invokes the handler to store the message contents
593 void dequeue_message(receive_handler handler
, void* state
)
595 header
* const hdr
= get_header();
597 const uint32_t capacity
= hdr
->m_capacity
;
598 const size_type block_size
= hdr
->m_block_size
;
599 uint32_t pos
= hdr
->m_get_pos
;
601 block_header
* block
= hdr
->get_block(pos
);
602 size_type message_size
= block
->m_size
;
603 uint32_t block_count
= estimate_block_count(message_size
);
605 BOOST_ASSERT(block_count
<= hdr
->m_size
);
607 size_type read_size
= (std::min
)(static_cast< size_type
>((capacity
- pos
) * block_size
- block_header::get_header_overhead()), message_size
);
608 handler(state
, block
->get_data(), read_size
);
611 if (BOOST_UNLIKELY(pos
>= capacity
))
613 // Read the tail of the message
615 read_size
= message_size
- read_size
;
617 handler(state
, hdr
->get_block(0u), read_size
);
620 hdr
->m_get_pos
= pos
;
621 hdr
->m_size
-= block_count
;
623 m_nonfull_queue
.notify_all();
627 BOOST_LOG_API
void reliable_message_queue::create(object_name
const& name
, uint32_t capacity
, size_type block_size
, overflow_policy oflow_policy
, permissions
const& perms
)
629 BOOST_ASSERT(m_impl
== NULL
);
630 if (!boost::log::aux::is_power_of_2(block_size
))
631 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
634 m_impl
= new implementation(open_mode::create_only
, name
, capacity
, static_cast< size_type
>(boost::alignment::align_up(block_size
, BOOST_LOG_CPU_CACHE_LINE_SIZE
)), oflow_policy
, perms
);
636 catch (boost::exception
& e
)
638 e
<< boost::log::ipc::object_name_info(name
);
643 BOOST_LOG_API
void reliable_message_queue::open_or_create(object_name
const& name
, uint32_t capacity
, size_type block_size
, overflow_policy oflow_policy
, permissions
const& perms
)
645 BOOST_ASSERT(m_impl
== NULL
);
646 if (!boost::log::aux::is_power_of_2(block_size
))
647 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
650 m_impl
= new implementation(open_mode::open_or_create
, name
, capacity
, static_cast< size_type
>(boost::alignment::align_up(block_size
, BOOST_LOG_CPU_CACHE_LINE_SIZE
)), oflow_policy
, perms
);
652 catch (boost::exception
& e
)
654 e
<< boost::log::ipc::object_name_info(name
);
659 BOOST_LOG_API
void reliable_message_queue::open(object_name
const& name
, overflow_policy oflow_policy
, permissions
const& perms
)
661 BOOST_ASSERT(m_impl
== NULL
);
664 m_impl
= new implementation(open_mode::open_only
, name
, oflow_policy
, perms
);
666 catch (boost::exception
& e
)
668 e
<< boost::log::ipc::object_name_info(name
);
673 BOOST_LOG_API
void reliable_message_queue::clear()
675 BOOST_ASSERT(m_impl
!= NULL
);
680 catch (boost::exception
& e
)
682 e
<< boost::log::ipc::object_name_info(m_impl
->name());
687 BOOST_LOG_API object_name
const& reliable_message_queue::name() const
689 BOOST_ASSERT(m_impl
!= NULL
);
690 return m_impl
->name();
693 BOOST_LOG_API
uint32_t reliable_message_queue::capacity() const
695 BOOST_ASSERT(m_impl
!= NULL
);
696 return m_impl
->capacity();
699 BOOST_LOG_API
reliable_message_queue::size_type
reliable_message_queue::block_size() const
701 BOOST_ASSERT(m_impl
!= NULL
);
702 return m_impl
->block_size();
705 BOOST_LOG_API
void reliable_message_queue::stop_local()
707 BOOST_ASSERT(m_impl
!= NULL
);
710 m_impl
->stop_local();
712 catch (boost::exception
& e
)
714 e
<< boost::log::ipc::object_name_info(m_impl
->name());
719 BOOST_LOG_API
void reliable_message_queue::reset_local()
721 BOOST_ASSERT(m_impl
!= NULL
);
724 m_impl
->reset_local();
726 catch (boost::exception
& e
)
728 e
<< boost::log::ipc::object_name_info(m_impl
->name());
733 BOOST_LOG_API
void reliable_message_queue::do_close() BOOST_NOEXCEPT
739 BOOST_LOG_API
reliable_message_queue::operation_result
reliable_message_queue::send(void const* message_data
, size_type message_size
)
741 BOOST_ASSERT(m_impl
!= NULL
);
744 return m_impl
->send(message_data
, message_size
);
746 catch (boost::exception
& e
)
748 e
<< boost::log::ipc::object_name_info(m_impl
->name());
753 BOOST_LOG_API
bool reliable_message_queue::try_send(void const* message_data
, size_type message_size
)
755 BOOST_ASSERT(m_impl
!= NULL
);
758 return m_impl
->try_send(message_data
, message_size
);
760 catch (boost::exception
& e
)
762 e
<< boost::log::ipc::object_name_info(m_impl
->name());
767 BOOST_LOG_API
reliable_message_queue::operation_result
reliable_message_queue::do_receive(receive_handler handler
, void* state
)
769 BOOST_ASSERT(m_impl
!= NULL
);
772 return m_impl
->receive(handler
, state
);
774 catch (boost::exception
& e
)
776 e
<< boost::log::ipc::object_name_info(m_impl
->name());
781 BOOST_LOG_API
bool reliable_message_queue::do_try_receive(receive_handler handler
, void* state
)
783 BOOST_ASSERT(m_impl
!= NULL
);
786 return m_impl
->try_receive(handler
, state
);
788 catch (boost::exception
& e
)
790 e
<< boost::log::ipc::object_name_info(m_impl
->name());
795 //! Fixed buffer receive handler
796 BOOST_LOG_API
void reliable_message_queue::fixed_buffer_receive_handler(void* state
, const void* data
, size_type size
)
798 fixed_buffer_state
* p
= static_cast< fixed_buffer_state
* >(state
);
799 if (BOOST_UNLIKELY(size
> p
->size
))
800 BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message"));
802 std::memcpy(p
->data
, data
, size
);
807 BOOST_LOG_API
void reliable_message_queue::remove(object_name
const&)
809 // System objects are reference counted on Windows, nothing to do here
814 BOOST_LOG_CLOSE_NAMESPACE
// namespace log
818 #include <boost/log/detail/footer.hpp>