]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/log/src/windows/ipc_reliable_message_queue.cpp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / boost / libs / log / src / windows / ipc_reliable_message_queue.cpp
1 /*
2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
7 */
8 /*!
9 * \file ipc_reliable_message_queue_win.hpp
10 * \author Lingxi Li
11 * \author Andrey Semashev
12 * \date 28.10.2015
13 *
14 * \brief This header is the Boost.Log library implementation, see the library documentation
15 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
16 *
17 * This file provides an interprocess message queue implementation on POSIX platforms.
18 */
19
20 #include <boost/log/detail/config.hpp>
21 #include <cstddef>
22 #include <cstring>
23 #include <new>
24 #include <limits>
25 #include <string>
26 #include <algorithm>
27 #include <stdexcept>
28 #include <boost/assert.hpp>
29 #include <boost/static_assert.hpp>
30 #include <boost/cstdint.hpp>
31 #include <boost/atomic/atomic.hpp>
32 #include <boost/atomic/capabilities.hpp>
33 #include <boost/log/exceptions.hpp>
34 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
35 #include <boost/log/support/exception.hpp>
36 #include <boost/log/detail/pause.hpp>
37 #include <boost/exception/info.hpp>
38 #include <boost/exception/enable_error_info.hpp>
39 #include <boost/align/align_up.hpp>
40 #include <boost/detail/winapi/thread.hpp> // SwitchToThread
41 #include "windows/ipc_sync_wrappers.hpp"
42 #include "windows/mapped_shared_memory.hpp"
43 #include "windows/utf_code_conversion.hpp"
44 #include "murmur3.hpp"
45 #include "bit_tools.hpp"
46 #include <windows.h>
47 #include <boost/log/detail/header.hpp>
48
49 #if BOOST_ATOMIC_INT32_LOCK_FREE != 2
50 // 32-bit atomic ops are required to be able to place atomic<uint32_t> in the process-shared memory
51 #error Boost.Log: Native 32-bit atomic operations are required but not supported by Boost.Atomic on the target platform
52 #endif
53
54 //! A suffix used in names of interprocess objects created by the queue.
55 //! Used as a protection against clashing with user-supplied names of interprocess queues and also to resolve conflicts between queues of different types.
56 #define BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".3010b9950926463398eee00b35b44651"
57
58 namespace boost {
59
60 BOOST_LOG_OPEN_NAMESPACE
61
62 namespace ipc {
63
64 //! Message queue implementation data
65 struct reliable_message_queue::implementation
66 {
67 private:
68 //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment.
69 struct block_header
70 {
71 // Element data alignment, in bytes
72 enum { data_alignment = 32u };
73
74 //! Size of the element data, in bytes
75 size_type m_size;
76
77 //! Returns the block header overhead, in bytes
78 static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT
79 {
80 return static_cast< size_type >(boost::alignment::align_up(sizeof(block_header), data_alignment));
81 }
82
83 //! Returns a pointer to the element data
84 void* get_data() const BOOST_NOEXCEPT
85 {
86 return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead();
87 }
88 };
89
90 //! Header of the message queue. Placed at the beginning of the shared memory segment.
91 struct header
92 {
93 // Increment this constant whenever you change the binary layout of the queue (apart from this header structure)
94 enum { abi_version = 0 };
95
96 // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!!
97
98 //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment.
99 uint32_t m_abi_tag;
100 //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
101 unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)];
102 //! A flag indicating that the queue is constructed (i.e. the queue is constructed when the value is not 0).
103 boost::atomic< uint32_t > m_initialized;
104 //! Number of allocation blocks in the queue.
105 const uint32_t m_capacity;
106 //! Size of an allocation block, in bytes.
107 const size_type m_block_size;
108 //! Shared state of the mutex for protecting queue data structures.
109 boost::log::ipc::aux::interprocess_mutex::shared_state m_mutex_state;
110 //! Shared state of the condition variable used to block writers when the queue is full.
111 boost::log::ipc::aux::interprocess_condition_variable::shared_state m_nonfull_queue_state;
112 //! The current number of allocated blocks in the queue.
113 uint32_t m_size;
114 //! The current writing position (allocation block index).
115 uint32_t m_put_pos;
116 //! The current reading position (allocation block index).
117 uint32_t m_get_pos;
118
119 header(uint32_t capacity, size_type block_size) :
120 m_abi_tag(get_abi_tag()),
121 m_capacity(capacity),
122 m_block_size(block_size),
123 m_size(0u),
124 m_put_pos(0u),
125 m_get_pos(0u)
126 {
127 // Must be initialized last. m_initialized is zero-initialized initially.
128 m_initialized.fetch_add(1u, boost::memory_order_release);
129 }
130
131 //! Returns the header structure ABI tag
132 static uint32_t get_abi_tag() BOOST_NOEXCEPT
133 {
134 // This FOURCC identifies the queue type
135 boost::log::aux::murmur3_32 hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q'));
136
137 // This FOURCC identifies the queue implementation
138 hash.mix(boost::log::aux::make_fourcc('w', 'n', 't', '5'));
139 hash.mix(abi_version);
140
141 // We will use these constants to align pointers
142 hash.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE);
143 hash.mix(block_header::data_alignment);
144
145 // The members in the sequence below must be enumerated in the same order as they are declared in the header structure.
146 // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header.
147
148 #define BOOST_LOG_MIX_HEADER_MEMBER(name)\
149 hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\
150 hash.mix(static_cast< uint32_t >(offsetof(header, name)))
151
152 BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag);
153 BOOST_LOG_MIX_HEADER_MEMBER(m_padding);
154 BOOST_LOG_MIX_HEADER_MEMBER(m_initialized);
155 BOOST_LOG_MIX_HEADER_MEMBER(m_capacity);
156 BOOST_LOG_MIX_HEADER_MEMBER(m_block_size);
157 BOOST_LOG_MIX_HEADER_MEMBER(m_mutex_state);
158 BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue_state);
159 BOOST_LOG_MIX_HEADER_MEMBER(m_size);
160 BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos);
161 BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos);
162
163 #undef BOOST_LOG_MIX_HEADER_MEMBER
164
165 return hash.finalize();
166 }
167
168 //! Returns an element header at the specified index
169 block_header* get_block(uint32_t index) const BOOST_NOEXCEPT
170 {
171 BOOST_ASSERT(index < m_capacity);
172 unsigned char* p = const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE);
173 p += static_cast< std::size_t >(m_block_size) * static_cast< std::size_t >(index);
174 return reinterpret_cast< block_header* >(p);
175 }
176
177 BOOST_DELETED_FUNCTION(header(header const&))
178 BOOST_DELETED_FUNCTION(header& operator=(header const&))
179 };
180
181 private:
182 //! Shared memory object and mapping
183 boost::log::ipc::aux::mapped_shared_memory m_shared_memory;
184 //! Queue overflow handling policy
185 const overflow_policy m_overflow_policy;
186 //! The mask for selecting bits that constitute size values from 0 to (block_size - 1)
187 size_type m_block_size_mask;
188 //! The number of the bit set in block_size (i.e. log base 2 of block_size)
189 uint32_t m_block_size_log2;
190
191 //! Mutex for protecting queue data structures.
192 boost::log::ipc::aux::interprocess_mutex m_mutex;
193 //! Event used to block readers when the queue is empty.
194 boost::log::ipc::aux::interprocess_event m_nonempty_queue;
195 //! Condition variable used to block writers when the queue is full.
196 boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue;
197 //! The event indicates that stop has been requested
198 boost::log::ipc::aux::auto_handle m_stop;
199
200 //! The queue name, as specified by the user
201 const object_name m_name;
202
203 public:
204 //! The constructor creates a new shared memory segment
205 implementation
206 (
207 open_mode::create_only_tag,
208 object_name const& name,
209 uint32_t capacity,
210 size_type block_size,
211 overflow_policy oflow_policy,
212 permissions const& perms
213 ) :
214 m_overflow_policy(oflow_policy),
215 m_block_size_mask(0u),
216 m_block_size_log2(0u),
217 m_name(name)
218 {
219 const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str());
220 const std::size_t shmem_size = estimate_region_size(capacity, block_size);
221 m_shared_memory.create(wname.c_str(), shmem_size, perms);
222 m_shared_memory.map();
223
224 create_queue(wname, capacity, block_size, perms);
225 }
226
227 //! The constructor creates a new shared memory segment or opens the existing one
228 implementation
229 (
230 open_mode::open_or_create_tag,
231 object_name const& name,
232 uint32_t capacity,
233 size_type block_size,
234 overflow_policy oflow_policy,
235 permissions const& perms
236 ) :
237 m_overflow_policy(oflow_policy),
238 m_block_size_mask(0u),
239 m_block_size_log2(0u),
240 m_name(name)
241 {
242 const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str());
243 const std::size_t shmem_size = estimate_region_size(capacity, block_size);
244 const bool created = m_shared_memory.create_or_open(wname.c_str(), shmem_size, perms);
245 m_shared_memory.map();
246
247 if (created)
248 create_queue(wname, capacity, block_size, perms);
249 else
250 adopt_queue(wname, m_shared_memory.size(), perms);
251 }
252
253 //! The constructor opens the existing shared memory segment
254 implementation
255 (
256 open_mode::open_only_tag,
257 object_name const& name,
258 overflow_policy oflow_policy,
259 permissions const& perms
260 ) :
261 m_overflow_policy(oflow_policy),
262 m_block_size_mask(0u),
263 m_block_size_log2(0u),
264 m_name(name)
265 {
266 const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str());
267 m_shared_memory.open(wname.c_str());
268 m_shared_memory.map();
269
270 adopt_queue(wname, m_shared_memory.size(), perms);
271 }
272
273 object_name const& name() const BOOST_NOEXCEPT
274 {
275 return m_name;
276 }
277
278 uint32_t capacity() const BOOST_NOEXCEPT
279 {
280 return get_header()->m_capacity;
281 }
282
283 size_type block_size() const BOOST_NOEXCEPT
284 {
285 return get_header()->m_block_size;
286 }
287
288 operation_result send(void const* message_data, size_type message_size)
289 {
290 const uint32_t block_count = estimate_block_count(message_size);
291
292 header* const hdr = get_header();
293
294 if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
295 BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
296
297 if (!lock_queue())
298 return aborted;
299
300 boost::log::ipc::aux::interprocess_mutex::optional_unlock unlock(m_mutex);
301
302 while (true)
303 {
304 if ((hdr->m_capacity - hdr->m_size) >= block_count)
305 break;
306
307 const overflow_policy oflow_policy = m_overflow_policy;
308 if (oflow_policy == fail_on_overflow)
309 return no_space;
310 else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow))
311 BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full");
312
313 if (!m_nonfull_queue.wait(unlock, m_stop.get()))
314 return aborted;
315 }
316
317 enqueue_message(message_data, message_size, block_count);
318
319 return succeeded;
320 }
321
322 bool try_send(void const* message_data, size_type message_size)
323 {
324 const uint32_t block_count = estimate_block_count(message_size);
325
326 header* const hdr = get_header();
327
328 if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
329 BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
330
331 if (!lock_queue())
332 return false;
333
334 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex);
335
336 if ((hdr->m_capacity - hdr->m_size) < block_count)
337 return false;
338
339 enqueue_message(message_data, message_size, block_count);
340
341 return true;
342 }
343
344 operation_result receive(receive_handler handler, void* state)
345 {
346 if (!lock_queue())
347 return aborted;
348
349 boost::log::ipc::aux::interprocess_mutex::optional_unlock unlock(m_mutex);
350
351 header* const hdr = get_header();
352
353 while (true)
354 {
355 if (hdr->m_size > 0u)
356 break;
357
358 m_mutex.unlock();
359 unlock.disengage();
360
361 if (!m_nonempty_queue.wait(m_stop.get()) || !lock_queue())
362 return aborted;
363
364 unlock.engage(m_mutex);
365 }
366
367 dequeue_message(handler, state);
368
369 return succeeded;
370 }
371
372 bool try_receive(receive_handler handler, void* state)
373 {
374 if (!lock_queue())
375 return false;
376
377 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex);
378
379 header* const hdr = get_header();
380 if (hdr->m_size == 0u)
381 return false;
382
383 dequeue_message(handler, state);
384
385 return true;
386 }
387
388 void stop_local()
389 {
390 BOOST_VERIFY(boost::detail::winapi::SetEvent(m_stop.get()) != 0);
391 }
392
393 void reset_local()
394 {
395 BOOST_VERIFY(boost::detail::winapi::ResetEvent(m_stop.get()) != 0);
396 }
397
398 void clear()
399 {
400 m_mutex.lock();
401 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex);
402 clear_queue();
403 }
404
405 private:
406 header* get_header() const BOOST_NOEXCEPT
407 {
408 return static_cast< header* >(m_shared_memory.address());
409 }
410
411 static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT
412 {
413 return boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size);
414 }
415
416 void create_stop_event()
417 {
418 #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6
419 boost::detail::winapi::HANDLE_ h = boost::detail::winapi::CreateEventExW
420 (
421 NULL, // permissions
422 NULL, // name
423 boost::detail::winapi::CREATE_EVENT_MANUAL_RESET_,
424 boost::detail::winapi::SYNCHRONIZE_ | boost::detail::winapi::EVENT_MODIFY_STATE_
425 );
426 #else
427 boost::detail::winapi::HANDLE_ h = boost::detail::winapi::CreateEventW
428 (
429 NULL, // permissions
430 true, // manual reset
431 false, // initial state
432 NULL // name
433 );
434 #endif
435 if (BOOST_UNLIKELY(h == NULL))
436 {
437 boost::detail::winapi::DWORD_ err = boost::detail::winapi::GetLastError();
438 BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to create an stop event object", (err));
439 }
440
441 m_stop.init(h);
442 }
443
444 void create_queue(std::wstring const& name, uint32_t capacity, size_type block_size, permissions const& perms)
445 {
446 // Initialize synchronization primitives before initializing the header as the openers will wait for it to be initialized
447 header* const hdr = get_header();
448 m_mutex.create((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".mutex").c_str(), &hdr->m_mutex_state, perms);
449 m_nonempty_queue.create((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonempty_queue_event").c_str(), false, perms);
450 m_nonfull_queue.init((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonfull_queue_cond_var").c_str(), &hdr->m_nonfull_queue_state, perms);
451 create_stop_event();
452
453 new (hdr) header(capacity, block_size);
454
455 init_block_size(block_size);
456 }
457
458 void adopt_queue(std::wstring const& name, std::size_t shmem_size, permissions const& perms)
459 {
460 if (shmem_size < sizeof(header))
461 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
462
463 // Wait until the mapped region becomes initialized
464 header* const hdr = get_header();
465 BOOST_CONSTEXPR_OR_CONST unsigned int wait_loops = 1000u, spin_loops = 16u, spins = 16u;
466 for (unsigned int i = 0; i < wait_loops; ++i)
467 {
468 uint32_t initialized = hdr->m_initialized.load(boost::memory_order_acquire);
469 if (initialized)
470 {
471 goto done;
472 }
473
474 if (i < spin_loops)
475 {
476 for (unsigned int j = 0; j < spins; ++j)
477 {
478 boost::log::aux::pause();
479 }
480 }
481 else
482 {
483 boost::detail::winapi::SwitchToThread();
484 }
485 }
486
487 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
488
489 done:
490 // Check that the queue layout matches the current process ABI
491 if (hdr->m_abi_tag != header::get_abi_tag())
492 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible");
493
494 if (!boost::log::aux::is_power_of_2(hdr->m_block_size))
495 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2");
496
497 m_mutex.open((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".mutex").c_str(), &hdr->m_mutex_state);
498 m_nonempty_queue.open((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonempty_queue_event").c_str());
499 m_nonfull_queue.init((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonfull_queue_cond_var").c_str(), &hdr->m_nonfull_queue_state, perms);
500 create_stop_event();
501
502 init_block_size(hdr->m_block_size);
503 }
504
505 void init_block_size(size_type block_size)
506 {
507 m_block_size_mask = block_size - 1u;
508
509 uint32_t block_size_log2 = 0u;
510 if ((block_size & 0x0000ffff) == 0u)
511 {
512 block_size >>= 16u;
513 block_size_log2 += 16u;
514 }
515 if ((block_size & 0x000000ff) == 0u)
516 {
517 block_size >>= 8u;
518 block_size_log2 += 8u;
519 }
520 if ((block_size & 0x0000000f) == 0u)
521 {
522 block_size >>= 4u;
523 block_size_log2 += 4u;
524 }
525 if ((block_size & 0x00000003) == 0u)
526 {
527 block_size >>= 2u;
528 block_size_log2 += 2u;
529 }
530 if ((block_size & 0x00000001) == 0u)
531 {
532 ++block_size_log2;
533 }
534 m_block_size_log2 = block_size_log2;
535 }
536
537 bool lock_queue()
538 {
539 return m_mutex.lock(m_stop.get());
540 }
541
542 void clear_queue()
543 {
544 header* const hdr = get_header();
545 hdr->m_size = 0u;
546 hdr->m_put_pos = 0u;
547 hdr->m_get_pos = 0u;
548 m_nonfull_queue.notify_all();
549 }
550
551 //! Returns the number of allocation blocks that are required to store user's payload of the specified size
552 uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT
553 {
554 // ceil((size + get_header_overhead()) / block_size)
555 return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2);
556 }
557
558 //! Puts the message to the back of the queue
559 void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count)
560 {
561 header* const hdr = get_header();
562
563 const uint32_t capacity = hdr->m_capacity;
564 const size_type block_size = hdr->m_block_size;
565 uint32_t pos = hdr->m_put_pos;
566
567 block_header* block = hdr->get_block(pos);
568 block->m_size = message_size;
569
570 size_type write_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
571 std::memcpy(block->get_data(), message_data, write_size);
572
573 pos += block_count;
574 if (BOOST_UNLIKELY(pos >= capacity))
575 {
576 // Write the rest of the message at the beginning of the queue
577 pos -= capacity;
578 message_data = static_cast< const unsigned char* >(message_data) + write_size;
579 write_size = message_size - write_size;
580 if (write_size > 0u)
581 std::memcpy(hdr->get_block(0u), message_data, write_size);
582 }
583
584 hdr->m_put_pos = pos;
585
586 const uint32_t old_queue_size = hdr->m_size;
587 hdr->m_size = old_queue_size + block_count;
588 if (old_queue_size == 0u)
589 m_nonempty_queue.set();
590 }
591
592 //! Retrieves the next message and invokes the handler to store the message contents
593 void dequeue_message(receive_handler handler, void* state)
594 {
595 header* const hdr = get_header();
596
597 const uint32_t capacity = hdr->m_capacity;
598 const size_type block_size = hdr->m_block_size;
599 uint32_t pos = hdr->m_get_pos;
600
601 block_header* block = hdr->get_block(pos);
602 size_type message_size = block->m_size;
603 uint32_t block_count = estimate_block_count(message_size);
604
605 BOOST_ASSERT(block_count <= hdr->m_size);
606
607 size_type read_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
608 handler(state, block->get_data(), read_size);
609
610 pos += block_count;
611 if (BOOST_UNLIKELY(pos >= capacity))
612 {
613 // Read the tail of the message
614 pos -= capacity;
615 read_size = message_size - read_size;
616 if (read_size > 0u)
617 handler(state, hdr->get_block(0u), read_size);
618 }
619
620 hdr->m_get_pos = pos;
621 hdr->m_size -= block_count;
622
623 m_nonfull_queue.notify_all();
624 }
625 };
626
627 BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
628 {
629 BOOST_ASSERT(m_impl == NULL);
630 if (!boost::log::aux::is_power_of_2(block_size))
631 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
632 try
633 {
634 m_impl = new implementation(open_mode::create_only, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
635 }
636 catch (boost::exception& e)
637 {
638 e << boost::log::ipc::object_name_info(name);
639 throw;
640 }
641 }
642
643 BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
644 {
645 BOOST_ASSERT(m_impl == NULL);
646 if (!boost::log::aux::is_power_of_2(block_size))
647 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
648 try
649 {
650 m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
651 }
652 catch (boost::exception& e)
653 {
654 e << boost::log::ipc::object_name_info(name);
655 throw;
656 }
657 }
658
659 BOOST_LOG_API void reliable_message_queue::open(object_name const& name, overflow_policy oflow_policy, permissions const& perms)
660 {
661 BOOST_ASSERT(m_impl == NULL);
662 try
663 {
664 m_impl = new implementation(open_mode::open_only, name, oflow_policy, perms);
665 }
666 catch (boost::exception& e)
667 {
668 e << boost::log::ipc::object_name_info(name);
669 throw;
670 }
671 }
672
673 BOOST_LOG_API void reliable_message_queue::clear()
674 {
675 BOOST_ASSERT(m_impl != NULL);
676 try
677 {
678 m_impl->clear();
679 }
680 catch (boost::exception& e)
681 {
682 e << boost::log::ipc::object_name_info(m_impl->name());
683 throw;
684 }
685 }
686
687 BOOST_LOG_API object_name const& reliable_message_queue::name() const
688 {
689 BOOST_ASSERT(m_impl != NULL);
690 return m_impl->name();
691 }
692
693 BOOST_LOG_API uint32_t reliable_message_queue::capacity() const
694 {
695 BOOST_ASSERT(m_impl != NULL);
696 return m_impl->capacity();
697 }
698
699 BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const
700 {
701 BOOST_ASSERT(m_impl != NULL);
702 return m_impl->block_size();
703 }
704
705 BOOST_LOG_API void reliable_message_queue::stop_local()
706 {
707 BOOST_ASSERT(m_impl != NULL);
708 try
709 {
710 m_impl->stop_local();
711 }
712 catch (boost::exception& e)
713 {
714 e << boost::log::ipc::object_name_info(m_impl->name());
715 throw;
716 }
717 }
718
719 BOOST_LOG_API void reliable_message_queue::reset_local()
720 {
721 BOOST_ASSERT(m_impl != NULL);
722 try
723 {
724 m_impl->reset_local();
725 }
726 catch (boost::exception& e)
727 {
728 e << boost::log::ipc::object_name_info(m_impl->name());
729 throw;
730 }
731 }
732
733 BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT
734 {
735 delete m_impl;
736 m_impl = NULL;
737 }
738
739 BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size)
740 {
741 BOOST_ASSERT(m_impl != NULL);
742 try
743 {
744 return m_impl->send(message_data, message_size);
745 }
746 catch (boost::exception& e)
747 {
748 e << boost::log::ipc::object_name_info(m_impl->name());
749 throw;
750 }
751 }
752
753 BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size)
754 {
755 BOOST_ASSERT(m_impl != NULL);
756 try
757 {
758 return m_impl->try_send(message_data, message_size);
759 }
760 catch (boost::exception& e)
761 {
762 e << boost::log::ipc::object_name_info(m_impl->name());
763 throw;
764 }
765 }
766
767 BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::do_receive(receive_handler handler, void* state)
768 {
769 BOOST_ASSERT(m_impl != NULL);
770 try
771 {
772 return m_impl->receive(handler, state);
773 }
774 catch (boost::exception& e)
775 {
776 e << boost::log::ipc::object_name_info(m_impl->name());
777 throw;
778 }
779 }
780
781 BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handler, void* state)
782 {
783 BOOST_ASSERT(m_impl != NULL);
784 try
785 {
786 return m_impl->try_receive(handler, state);
787 }
788 catch (boost::exception& e)
789 {
790 e << boost::log::ipc::object_name_info(m_impl->name());
791 throw;
792 }
793 }
794
795 //! Fixed buffer receive handler
796 BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size)
797 {
798 fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state);
799 if (BOOST_UNLIKELY(size > p->size))
800 BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message"));
801
802 std::memcpy(p->data, data, size);
803 p->data += size;
804 p->size -= size;
805 }
806
807 BOOST_LOG_API void reliable_message_queue::remove(object_name const&)
808 {
809 // System objects are reference counted on Windows, nothing to do here
810 }
811
812 } // namespace ipc
813
814 BOOST_LOG_CLOSE_NAMESPACE // namespace log
815
816 } // namespace boost
817
818 #include <boost/log/detail/footer.hpp>