]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/log/src/windows/ipc_reliable_message_queue.cpp
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / boost / libs / log / src / windows / ipc_reliable_message_queue.cpp
1 /*
2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
7 */
8 /*!
9 * \file ipc_reliable_message_queue_win.hpp
10 * \author Lingxi Li
11 * \author Andrey Semashev
12 * \date 28.10.2015
13 *
14 * \brief This header is the Boost.Log library implementation, see the library documentation
15 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
16 *
17 * This file provides an interprocess message queue implementation on POSIX platforms.
18 */
19
20 #include <boost/log/detail/config.hpp>
21 #include <cstddef>
22 #include <cstring>
23 #include <new>
24 #include <limits>
25 #include <string>
26 #include <algorithm>
27 #include <stdexcept>
28 #include <boost/assert.hpp>
29 #include <boost/static_assert.hpp>
30 #include <boost/cstdint.hpp>
31 #include <boost/memory_order.hpp>
32 #include <boost/atomic/ipc_atomic.hpp>
33 #include <boost/atomic/capabilities.hpp>
34 #include <boost/log/exceptions.hpp>
35 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
36 #include <boost/log/support/exception.hpp>
37 #include <boost/log/detail/pause.hpp>
38 #include <boost/exception/info.hpp>
39 #include <boost/exception/enable_error_info.hpp>
40 #include <boost/align/align_up.hpp>
41 #include <boost/winapi/thread.hpp> // SwitchToThread
42 #include "windows/ipc_sync_wrappers.hpp"
43 #include "windows/mapped_shared_memory.hpp"
44 #include "windows/utf_code_conversion.hpp"
45 #include "murmur3.hpp"
46 #include "bit_tools.hpp"
47 #include <windows.h>
48 #include <boost/log/detail/header.hpp>
49
50 //! A suffix used in names of interprocess objects created by the queue.
51 //! Used as a protection against clashing with user-supplied names of interprocess queues and also to resolve conflicts between queues of different types.
52 #define BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".3010b9950926463398eee00b35b44651"
53
54 namespace boost {
55
56 BOOST_LOG_OPEN_NAMESPACE
57
58 namespace ipc {
59
60 //! Message queue implementation data
61 struct reliable_message_queue::implementation
62 {
63 private:
64 //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment.
65 struct block_header
66 {
67 // Element data alignment, in bytes
68 enum { data_alignment = 32u };
69
70 //! Size of the element data, in bytes
71 size_type m_size;
72
73 //! Returns the block header overhead, in bytes
74 static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT
75 {
76 return static_cast< size_type >(boost::alignment::align_up(sizeof(block_header), data_alignment));
77 }
78
79 //! Returns a pointer to the element data
80 void* get_data() const BOOST_NOEXCEPT
81 {
82 return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead();
83 }
84 };
85
86 //! Header of the message queue. Placed at the beginning of the shared memory segment.
87 struct header
88 {
89 // Increment this constant whenever you change the binary layout of the queue (apart from this header structure)
90 enum { abi_version = 0 };
91
92 // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!!
93
94 //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment.
95 uint32_t m_abi_tag;
96 //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
97 unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)];
98 //! A flag indicating that the queue is constructed (i.e. the queue is constructed when the value is not 0).
99 boost::ipc_atomic< uint32_t > m_initialized;
100 //! Number of allocation blocks in the queue.
101 const uint32_t m_capacity;
102 //! Size of an allocation block, in bytes.
103 const size_type m_block_size;
104 //! Shared state of the mutex for protecting queue data structures.
105 boost::log::ipc::aux::interprocess_mutex::shared_state m_mutex_state;
106 //! Shared state of the condition variable used to block writers when the queue is full.
107 boost::log::ipc::aux::interprocess_condition_variable::shared_state m_nonfull_queue_state;
108 //! The current number of allocated blocks in the queue.
109 uint32_t m_size;
110 //! The current writing position (allocation block index).
111 uint32_t m_put_pos;
112 //! The current reading position (allocation block index).
113 uint32_t m_get_pos;
114
115 header(uint32_t capacity, size_type block_size) :
116 m_abi_tag(get_abi_tag()),
117 m_capacity(capacity),
118 m_block_size(block_size),
119 m_size(0u),
120 m_put_pos(0u),
121 m_get_pos(0u)
122 {
123 // Must be initialized last. m_initialized is zero-initialized initially.
124 m_initialized.opaque_add(1u, boost::memory_order_release);
125 }
126
127 //! Returns the header structure ABI tag
128 static uint32_t get_abi_tag() BOOST_NOEXCEPT
129 {
130 // This FOURCC identifies the queue type
131 boost::log::aux::murmur3_32 hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q'));
132
133 // This FOURCC identifies the queue implementation
134 hash.mix(boost::log::aux::make_fourcc('w', 'n', 't', '5'));
135 hash.mix(abi_version);
136
137 // We will use these constants to align pointers
138 hash.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE);
139 hash.mix(block_header::data_alignment);
140
141 // The members in the sequence below must be enumerated in the same order as they are declared in the header structure.
142 // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header.
143
144 #define BOOST_LOG_MIX_HEADER_MEMBER(name)\
145 hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\
146 hash.mix(static_cast< uint32_t >(offsetof(header, name)))
147
148 BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag);
149 BOOST_LOG_MIX_HEADER_MEMBER(m_padding);
150 BOOST_LOG_MIX_HEADER_MEMBER(m_initialized);
151 BOOST_LOG_MIX_HEADER_MEMBER(m_capacity);
152 BOOST_LOG_MIX_HEADER_MEMBER(m_block_size);
153 BOOST_LOG_MIX_HEADER_MEMBER(m_mutex_state);
154 BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue_state);
155 BOOST_LOG_MIX_HEADER_MEMBER(m_size);
156 BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos);
157 BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos);
158
159 #undef BOOST_LOG_MIX_HEADER_MEMBER
160
161 return hash.finalize();
162 }
163
164 //! Returns an element header at the specified index
165 block_header* get_block(uint32_t index) const BOOST_NOEXCEPT
166 {
167 BOOST_ASSERT(index < m_capacity);
168 unsigned char* p = const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE);
169 p += static_cast< std::size_t >(m_block_size) * static_cast< std::size_t >(index);
170 return reinterpret_cast< block_header* >(p);
171 }
172
173 BOOST_DELETED_FUNCTION(header(header const&))
174 BOOST_DELETED_FUNCTION(header& operator=(header const&))
175 };
176
177 private:
178 //! Shared memory object and mapping
179 boost::log::ipc::aux::mapped_shared_memory m_shared_memory;
180 //! Queue overflow handling policy
181 const overflow_policy m_overflow_policy;
182 //! The mask for selecting bits that constitute size values from 0 to (block_size - 1)
183 size_type m_block_size_mask;
184 //! The number of the bit set in block_size (i.e. log base 2 of block_size)
185 uint32_t m_block_size_log2;
186
187 //! Mutex for protecting queue data structures.
188 boost::log::ipc::aux::interprocess_mutex m_mutex;
189 //! Event used to block readers when the queue is empty.
190 boost::log::ipc::aux::interprocess_event m_nonempty_queue;
191 //! Condition variable used to block writers when the queue is full.
192 boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue;
193 //! The event indicates that stop has been requested
194 boost::log::ipc::aux::auto_handle m_stop;
195
196 //! The queue name, as specified by the user
197 const object_name m_name;
198
199 public:
200 //! The constructor creates a new shared memory segment
201 implementation
202 (
203 open_mode::create_only_tag,
204 object_name const& name,
205 uint32_t capacity,
206 size_type block_size,
207 overflow_policy oflow_policy,
208 permissions const& perms
209 ) :
210 m_overflow_policy(oflow_policy),
211 m_block_size_mask(0u),
212 m_block_size_log2(0u),
213 m_name(name)
214 {
215 BOOST_ASSERT(block_size >= block_header::get_header_overhead());
216 const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str());
217 const std::size_t shmem_size = estimate_region_size(capacity, block_size);
218 m_shared_memory.create(wname.c_str(), shmem_size, perms);
219 m_shared_memory.map();
220
221 create_queue(wname, capacity, block_size, perms);
222 }
223
224 //! The constructor creates a new shared memory segment or opens the existing one
225 implementation
226 (
227 open_mode::open_or_create_tag,
228 object_name const& name,
229 uint32_t capacity,
230 size_type block_size,
231 overflow_policy oflow_policy,
232 permissions const& perms
233 ) :
234 m_overflow_policy(oflow_policy),
235 m_block_size_mask(0u),
236 m_block_size_log2(0u),
237 m_name(name)
238 {
239 BOOST_ASSERT(block_size >= block_header::get_header_overhead());
240 const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str());
241 const std::size_t shmem_size = estimate_region_size(capacity, block_size);
242 const bool created = m_shared_memory.create_or_open(wname.c_str(), shmem_size, perms);
243 m_shared_memory.map();
244
245 if (created)
246 create_queue(wname, capacity, block_size, perms);
247 else
248 adopt_queue(wname, m_shared_memory.size(), perms);
249 }
250
251 //! The constructor opens the existing shared memory segment
252 implementation
253 (
254 open_mode::open_only_tag,
255 object_name const& name,
256 overflow_policy oflow_policy,
257 permissions const& perms
258 ) :
259 m_overflow_policy(oflow_policy),
260 m_block_size_mask(0u),
261 m_block_size_log2(0u),
262 m_name(name)
263 {
264 const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str());
265 m_shared_memory.open(wname.c_str());
266 m_shared_memory.map();
267
268 adopt_queue(wname, m_shared_memory.size(), perms);
269 }
270
271 object_name const& name() const BOOST_NOEXCEPT
272 {
273 return m_name;
274 }
275
276 uint32_t capacity() const BOOST_NOEXCEPT
277 {
278 return get_header()->m_capacity;
279 }
280
281 size_type block_size() const BOOST_NOEXCEPT
282 {
283 return get_header()->m_block_size;
284 }
285
286 operation_result send(void const* message_data, size_type message_size)
287 {
288 const uint32_t block_count = estimate_block_count(message_size);
289
290 header* const hdr = get_header();
291
292 if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
293 BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
294
295 if (!lock_queue())
296 return aborted;
297
298 boost::log::ipc::aux::interprocess_mutex::optional_unlock unlock(m_mutex);
299
300 while (true)
301 {
302 if ((hdr->m_capacity - hdr->m_size) >= block_count)
303 break;
304
305 const overflow_policy oflow_policy = m_overflow_policy;
306 if (oflow_policy == fail_on_overflow)
307 return no_space;
308 else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow))
309 BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full");
310
311 if (!m_nonfull_queue.wait(unlock, m_stop.get()))
312 return aborted;
313 }
314
315 enqueue_message(message_data, message_size, block_count);
316
317 return succeeded;
318 }
319
320 bool try_send(void const* message_data, size_type message_size)
321 {
322 const uint32_t block_count = estimate_block_count(message_size);
323
324 header* const hdr = get_header();
325
326 if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
327 BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
328
329 if (!lock_queue())
330 return false;
331
332 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex);
333
334 if ((hdr->m_capacity - hdr->m_size) < block_count)
335 return false;
336
337 enqueue_message(message_data, message_size, block_count);
338
339 return true;
340 }
341
342 operation_result receive(receive_handler handler, void* state)
343 {
344 if (!lock_queue())
345 return aborted;
346
347 boost::log::ipc::aux::interprocess_mutex::optional_unlock unlock(m_mutex);
348
349 header* const hdr = get_header();
350
351 while (true)
352 {
353 if (hdr->m_size > 0u)
354 break;
355
356 m_mutex.unlock();
357 unlock.disengage();
358
359 if (!m_nonempty_queue.wait(m_stop.get()) || !lock_queue())
360 return aborted;
361
362 unlock.engage(m_mutex);
363 }
364
365 dequeue_message(handler, state);
366
367 return succeeded;
368 }
369
370 bool try_receive(receive_handler handler, void* state)
371 {
372 if (!lock_queue())
373 return false;
374
375 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex);
376
377 header* const hdr = get_header();
378 if (hdr->m_size == 0u)
379 return false;
380
381 dequeue_message(handler, state);
382
383 return true;
384 }
385
386 void stop_local()
387 {
388 BOOST_VERIFY(boost::winapi::SetEvent(m_stop.get()) != 0);
389 }
390
391 void reset_local()
392 {
393 BOOST_VERIFY(boost::winapi::ResetEvent(m_stop.get()) != 0);
394 }
395
396 void clear()
397 {
398 m_mutex.lock();
399 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex);
400 clear_queue();
401 }
402
403 private:
404 header* get_header() const BOOST_NOEXCEPT
405 {
406 return static_cast< header* >(m_shared_memory.address());
407 }
408
409 static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT
410 {
411 return boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size);
412 }
413
414 void create_stop_event()
415 {
416 #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6
417 boost::winapi::HANDLE_ h = boost::winapi::CreateEventExW
418 (
419 NULL, // permissions
420 NULL, // name
421 boost::winapi::CREATE_EVENT_MANUAL_RESET_,
422 boost::winapi::SYNCHRONIZE_ | boost::winapi::EVENT_MODIFY_STATE_
423 );
424 #else
425 boost::winapi::HANDLE_ h = boost::winapi::CreateEventW
426 (
427 NULL, // permissions
428 true, // manual reset
429 false, // initial state
430 NULL // name
431 );
432 #endif
433 if (BOOST_UNLIKELY(h == NULL))
434 {
435 boost::winapi::DWORD_ err = boost::winapi::GetLastError();
436 BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to create an stop event object", (err));
437 }
438
439 m_stop.init(h);
440 }
441
442 void create_queue(std::wstring const& name, uint32_t capacity, size_type block_size, permissions const& perms)
443 {
444 // Initialize synchronization primitives before initializing the header as the openers will wait for it to be initialized
445 header* const hdr = get_header();
446 m_mutex.create((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".mutex").c_str(), &hdr->m_mutex_state, perms);
447 m_nonempty_queue.create((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonempty_queue_event").c_str(), false, perms);
448 m_nonfull_queue.init((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonfull_queue_cond_var").c_str(), &hdr->m_nonfull_queue_state, perms);
449 create_stop_event();
450
451 new (hdr) header(capacity, block_size);
452
453 init_block_size(block_size);
454 }
455
456 void adopt_queue(std::wstring const& name, std::size_t shmem_size, permissions const& perms)
457 {
458 if (shmem_size < sizeof(header))
459 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
460
461 // Wait until the mapped region becomes initialized
462 header* const hdr = get_header();
463 BOOST_CONSTEXPR_OR_CONST unsigned int wait_loops = 1000u, spin_loops = 16u, spins = 16u;
464 for (unsigned int i = 0; i < wait_loops; ++i)
465 {
466 uint32_t initialized = hdr->m_initialized.load(boost::memory_order_acquire);
467 if (initialized)
468 {
469 goto done;
470 }
471
472 if (i < spin_loops)
473 {
474 for (unsigned int j = 0; j < spins; ++j)
475 {
476 boost::log::aux::pause();
477 }
478 }
479 else
480 {
481 boost::winapi::SwitchToThread();
482 }
483 }
484
485 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
486
487 done:
488 // Check that the queue layout matches the current process ABI
489 if (hdr->m_abi_tag != header::get_abi_tag())
490 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible");
491
492 if (!boost::log::aux::is_power_of_2(hdr->m_block_size))
493 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2");
494
495 m_mutex.open((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".mutex").c_str(), &hdr->m_mutex_state);
496 m_nonempty_queue.open((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonempty_queue_event").c_str());
497 m_nonfull_queue.init((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonfull_queue_cond_var").c_str(), &hdr->m_nonfull_queue_state, perms);
498 create_stop_event();
499
500 init_block_size(hdr->m_block_size);
501 }
502
503 void init_block_size(size_type block_size)
504 {
505 m_block_size_mask = block_size - 1u;
506
507 uint32_t block_size_log2 = 0u;
508 if ((block_size & 0x0000ffff) == 0u)
509 {
510 block_size >>= 16u;
511 block_size_log2 += 16u;
512 }
513 if ((block_size & 0x000000ff) == 0u)
514 {
515 block_size >>= 8u;
516 block_size_log2 += 8u;
517 }
518 if ((block_size & 0x0000000f) == 0u)
519 {
520 block_size >>= 4u;
521 block_size_log2 += 4u;
522 }
523 if ((block_size & 0x00000003) == 0u)
524 {
525 block_size >>= 2u;
526 block_size_log2 += 2u;
527 }
528 if ((block_size & 0x00000001) == 0u)
529 {
530 ++block_size_log2;
531 }
532 m_block_size_log2 = block_size_log2;
533 }
534
535 bool lock_queue()
536 {
537 return m_mutex.lock(m_stop.get());
538 }
539
540 void clear_queue()
541 {
542 header* const hdr = get_header();
543 hdr->m_size = 0u;
544 hdr->m_put_pos = 0u;
545 hdr->m_get_pos = 0u;
546 m_nonfull_queue.notify_all();
547 }
548
549 //! Returns the number of allocation blocks that are required to store user's payload of the specified size
550 uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT
551 {
552 // ceil((size + get_header_overhead()) / block_size)
553 return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2);
554 }
555
556 //! Puts the message to the back of the queue
557 void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count)
558 {
559 header* const hdr = get_header();
560
561 const uint32_t capacity = hdr->m_capacity;
562 const size_type block_size = hdr->m_block_size;
563 uint32_t pos = hdr->m_put_pos;
564 BOOST_ASSERT(pos < capacity);
565
566 block_header* block = hdr->get_block(pos);
567 block->m_size = message_size;
568
569 size_type write_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
570 std::memcpy(block->get_data(), message_data, write_size);
571
572 pos += block_count;
573 if (BOOST_UNLIKELY(pos >= capacity))
574 {
575 // Write the rest of the message at the beginning of the queue
576 pos -= capacity;
577 message_data = static_cast< const unsigned char* >(message_data) + write_size;
578 write_size = message_size - write_size;
579 if (write_size > 0u)
580 std::memcpy(hdr->get_block(0u), message_data, write_size);
581 }
582
583 hdr->m_put_pos = pos;
584
585 const uint32_t old_queue_size = hdr->m_size;
586 hdr->m_size = old_queue_size + block_count;
587 if (old_queue_size == 0u)
588 m_nonempty_queue.set();
589 }
590
591 //! Retrieves the next message and invokes the handler to store the message contents
592 void dequeue_message(receive_handler handler, void* state)
593 {
594 header* const hdr = get_header();
595
596 const uint32_t capacity = hdr->m_capacity;
597 const size_type block_size = hdr->m_block_size;
598 uint32_t pos = hdr->m_get_pos;
599 BOOST_ASSERT(pos < capacity);
600
601 block_header* block = hdr->get_block(pos);
602 size_type message_size = block->m_size;
603 uint32_t block_count = estimate_block_count(message_size);
604
605 BOOST_ASSERT(block_count <= hdr->m_size);
606
607 size_type read_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
608 handler(state, block->get_data(), read_size);
609
610 pos += block_count;
611 if (BOOST_UNLIKELY(pos >= capacity))
612 {
613 // Read the tail of the message
614 pos -= capacity;
615 read_size = message_size - read_size;
616 if (read_size > 0u)
617 handler(state, hdr->get_block(0u), read_size);
618 }
619
620 hdr->m_get_pos = pos;
621 hdr->m_size -= block_count;
622
623 m_nonfull_queue.notify_all();
624 }
625 };
626
627 BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
628 {
629 BOOST_ASSERT(m_impl == NULL);
630 if (!boost::log::aux::is_power_of_2(block_size))
631 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
632 try
633 {
634 m_impl = new implementation(open_mode::create_only, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
635 }
636 catch (boost::exception& e)
637 {
638 e << boost::log::ipc::object_name_info(name);
639 throw;
640 }
641 }
642
643 BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
644 {
645 BOOST_ASSERT(m_impl == NULL);
646 if (!boost::log::aux::is_power_of_2(block_size))
647 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
648 try
649 {
650 m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
651 }
652 catch (boost::exception& e)
653 {
654 e << boost::log::ipc::object_name_info(name);
655 throw;
656 }
657 }
658
659 BOOST_LOG_API void reliable_message_queue::open(object_name const& name, overflow_policy oflow_policy, permissions const& perms)
660 {
661 BOOST_ASSERT(m_impl == NULL);
662 try
663 {
664 m_impl = new implementation(open_mode::open_only, name, oflow_policy, perms);
665 }
666 catch (boost::exception& e)
667 {
668 e << boost::log::ipc::object_name_info(name);
669 throw;
670 }
671 }
672
673 BOOST_LOG_API void reliable_message_queue::clear()
674 {
675 BOOST_ASSERT(m_impl != NULL);
676 try
677 {
678 m_impl->clear();
679 }
680 catch (boost::exception& e)
681 {
682 e << boost::log::ipc::object_name_info(m_impl->name());
683 throw;
684 }
685 }
686
687 BOOST_LOG_API object_name const& reliable_message_queue::name() const
688 {
689 BOOST_ASSERT(m_impl != NULL);
690 return m_impl->name();
691 }
692
693 BOOST_LOG_API uint32_t reliable_message_queue::capacity() const
694 {
695 BOOST_ASSERT(m_impl != NULL);
696 return m_impl->capacity();
697 }
698
699 BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const
700 {
701 BOOST_ASSERT(m_impl != NULL);
702 return m_impl->block_size();
703 }
704
705 BOOST_LOG_API void reliable_message_queue::stop_local()
706 {
707 BOOST_ASSERT(m_impl != NULL);
708 try
709 {
710 m_impl->stop_local();
711 }
712 catch (boost::exception& e)
713 {
714 e << boost::log::ipc::object_name_info(m_impl->name());
715 throw;
716 }
717 }
718
719 BOOST_LOG_API void reliable_message_queue::reset_local()
720 {
721 BOOST_ASSERT(m_impl != NULL);
722 try
723 {
724 m_impl->reset_local();
725 }
726 catch (boost::exception& e)
727 {
728 e << boost::log::ipc::object_name_info(m_impl->name());
729 throw;
730 }
731 }
732
733 BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT
734 {
735 delete m_impl;
736 m_impl = NULL;
737 }
738
739 BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size)
740 {
741 BOOST_ASSERT(m_impl != NULL);
742 try
743 {
744 return m_impl->send(message_data, message_size);
745 }
746 catch (boost::exception& e)
747 {
748 e << boost::log::ipc::object_name_info(m_impl->name());
749 throw;
750 }
751 }
752
753 BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size)
754 {
755 BOOST_ASSERT(m_impl != NULL);
756 try
757 {
758 return m_impl->try_send(message_data, message_size);
759 }
760 catch (boost::exception& e)
761 {
762 e << boost::log::ipc::object_name_info(m_impl->name());
763 throw;
764 }
765 }
766
767 BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::do_receive(receive_handler handler, void* state)
768 {
769 BOOST_ASSERT(m_impl != NULL);
770 try
771 {
772 return m_impl->receive(handler, state);
773 }
774 catch (boost::exception& e)
775 {
776 e << boost::log::ipc::object_name_info(m_impl->name());
777 throw;
778 }
779 }
780
781 BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handler, void* state)
782 {
783 BOOST_ASSERT(m_impl != NULL);
784 try
785 {
786 return m_impl->try_receive(handler, state);
787 }
788 catch (boost::exception& e)
789 {
790 e << boost::log::ipc::object_name_info(m_impl->name());
791 throw;
792 }
793 }
794
795 //! Fixed buffer receive handler
796 BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size)
797 {
798 fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state);
799 if (BOOST_UNLIKELY(size > p->size))
800 BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message"));
801
802 std::memcpy(p->data, data, size);
803 p->data += size;
804 p->size -= size;
805 }
806
807 BOOST_LOG_API void reliable_message_queue::remove(object_name const&)
808 {
809 // System objects are reference counted on Windows, nothing to do here
810 }
811
812 } // namespace ipc
813
814 BOOST_LOG_CLOSE_NAMESPACE // namespace log
815
816 } // namespace boost
817
818 #include <boost/log/detail/footer.hpp>