]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/log/src/posix/ipc_reliable_message_queue.cpp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / boost / libs / log / src / posix / ipc_reliable_message_queue.cpp
1 /*
2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
7 */
8 /*!
9 * \file posix/ipc_reliable_message_queue.cpp
10 * \author Lingxi Li
11 * \author Andrey Semashev
12 * \date 17.11.2015
13 *
14 * \brief This header is the Boost.Log library implementation, see the library documentation
15 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
16 *
17 * This file provides an interprocess message queue implementation on POSIX platforms.
18 */
19
20 #include <boost/log/detail/config.hpp>
21 #include <cstddef>
22 #include <cerrno>
23 #include <cstring>
24 #include <new>
25 #include <string>
26 #include <stdexcept>
27 #include <algorithm>
28 #include <unistd.h>
29 #if defined(BOOST_HAS_SCHED_YIELD)
30 #include <sched.h>
31 #elif defined(BOOST_HAS_PTHREAD_YIELD)
32 #include <pthread.h>
33 #elif defined(BOOST_HAS_NANOSLEEP)
34 #include <time.h>
35 #endif
36 #include <boost/assert.hpp>
37 #include <boost/static_assert.hpp>
38 #include <boost/cstdint.hpp>
39 #include <boost/atomic/atomic.hpp>
40 #include <boost/atomic/capabilities.hpp>
41 #include <boost/throw_exception.hpp>
42 #include <boost/log/exceptions.hpp>
43 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
44 #include <boost/log/support/exception.hpp>
45 #include <boost/log/detail/pause.hpp>
46 #include <boost/exception/info.hpp>
47 #include <boost/exception/enable_error_info.hpp>
48 #include <boost/interprocess/creation_tags.hpp>
49 #include <boost/interprocess/exceptions.hpp>
50 #include <boost/interprocess/permissions.hpp>
51 #include <boost/interprocess/mapped_region.hpp>
52 #include <boost/interprocess/shared_memory_object.hpp>
53 #include <boost/align/align_up.hpp>
54 #include "ipc_sync_wrappers.hpp"
55 #include "murmur3.hpp"
56 #include "bit_tools.hpp"
57 #include <boost/log/detail/header.hpp>
58
59 #if BOOST_ATOMIC_INT32_LOCK_FREE != 2
60 // 32-bit atomic ops are required to be able to place atomic<uint32_t> in the process-shared memory
61 #error Boost.Log: Native 32-bit atomic operations are required but not supported by Boost.Atomic on the target platform
62 #endif
63
64 namespace boost {
65
66 BOOST_LOG_OPEN_NAMESPACE
67
68 namespace ipc {
69
70 //! Message queue implementation data
71 struct reliable_message_queue::implementation
72 {
73 private:
74 //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment.
75 struct block_header
76 {
77 // Element data alignment, in bytes
78 enum { data_alignment = 32u };
79
80 //! Size of the element data, in bytes
81 size_type m_size;
82
83 //! Returns the block header overhead, in bytes
84 static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT
85 {
86 return static_cast< size_type >(boost::alignment::align_up(sizeof(block_header), data_alignment));
87 }
88
89 //! Returns a pointer to the element data
90 void* get_data() const BOOST_NOEXCEPT
91 {
92 return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead();
93 }
94 };
95
96 //! Header of the message queue. Placed at the beginning of the shared memory segment.
97 struct header
98 {
99 // Increment this constant whenever you change the binary layout of the queue (apart from this header structure)
100 enum { abi_version = 0 };
101
102 // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!!
103
104 //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment.
105 uint32_t m_abi_tag;
106 //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
107 unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)];
108 //! Reference counter. Also acts as a flag indicating that the queue is constructed (i.e. the queue is constructed when the counter is not 0).
109 boost::atomic< uint32_t > m_ref_count;
110 //! Number of allocation blocks in the queue.
111 const uint32_t m_capacity;
112 //! Size of an allocation block, in bytes.
113 const size_type m_block_size;
114 //! Mutex for protecting queue data structures.
115 boost::log::ipc::aux::interprocess_mutex m_mutex;
116 //! Condition variable used to block readers when the queue is empty.
117 boost::log::ipc::aux::interprocess_condition_variable m_nonempty_queue;
118 //! Condition variable used to block writers when the queue is full.
119 boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue;
120 //! The current number of allocated blocks in the queue.
121 uint32_t m_size;
122 //! The current writing position (allocation block index).
123 uint32_t m_put_pos;
124 //! The current reading position (allocation block index).
125 uint32_t m_get_pos;
126
127 header(uint32_t capacity, size_type block_size) :
128 m_abi_tag(get_abi_tag()),
129 m_capacity(capacity),
130 m_block_size(block_size),
131 m_size(0u),
132 m_put_pos(0u),
133 m_get_pos(0u)
134 {
135 // Must be initialized last. m_ref_count is zero-initialized initially.
136 m_ref_count.fetch_add(1u, boost::memory_order_release);
137 }
138
139 //! Returns the header structure ABI tag
140 static uint32_t get_abi_tag() BOOST_NOEXCEPT
141 {
142 // This FOURCC identifies the queue type
143 boost::log::aux::murmur3_32 hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q'));
144
145 // This FOURCC identifies the queue implementation
146 hash.mix(boost::log::aux::make_fourcc('p', 't', 'h', 'r'));
147 hash.mix(abi_version);
148
149 // We will use these constants to align pointers
150 hash.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE);
151 hash.mix(block_header::data_alignment);
152
153 // The members in the sequence below must be enumerated in the same order as they are declared in the header structure.
154 // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header.
155
156 #define BOOST_LOG_MIX_HEADER_MEMBER(name)\
157 hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\
158 hash.mix(static_cast< uint32_t >(offsetof(header, name)))
159
160 BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag);
161 BOOST_LOG_MIX_HEADER_MEMBER(m_padding);
162 BOOST_LOG_MIX_HEADER_MEMBER(m_ref_count);
163 BOOST_LOG_MIX_HEADER_MEMBER(m_capacity);
164 BOOST_LOG_MIX_HEADER_MEMBER(m_block_size);
165 BOOST_LOG_MIX_HEADER_MEMBER(m_mutex);
166 BOOST_LOG_MIX_HEADER_MEMBER(m_nonempty_queue);
167 BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue);
168 BOOST_LOG_MIX_HEADER_MEMBER(m_size);
169 BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos);
170 BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos);
171
172 #undef BOOST_LOG_MIX_HEADER_MEMBER
173
174 return hash.finalize();
175 }
176
177 //! Returns an element header at the specified index
178 block_header* get_block(uint32_t index) const BOOST_NOEXCEPT
179 {
180 BOOST_ASSERT(index < m_capacity);
181 unsigned char* p = const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE);
182 p += static_cast< std::size_t >(m_block_size) * static_cast< std::size_t >(index);
183 return reinterpret_cast< block_header* >(p);
184 }
185
186 BOOST_DELETED_FUNCTION(header(header const&))
187 BOOST_DELETED_FUNCTION(header& operator=(header const&))
188 };
189
190 private:
191 //! Shared memory object
192 boost::interprocess::shared_memory_object m_shared_memory;
193 //! Shared memory mapping into the process address space
194 boost::interprocess::mapped_region m_region;
195 //! Queue overflow handling policy
196 const overflow_policy m_overflow_policy;
197 //! The mask for selecting bits that constitute size values from 0 to (block_size - 1)
198 size_type m_block_size_mask;
199 //! The number of the bit set in block_size (i.e. log base 2 of block_size)
200 uint32_t m_block_size_log2;
201 //! The flag indicates that stop has been requested
202 bool m_stop;
203
204 //! Queue shared memory object name
205 const object_name m_name;
206
207 public:
208 //! The constructor creates a new shared memory segment
209 implementation
210 (
211 open_mode::create_only_tag,
212 object_name const& name,
213 uint32_t capacity,
214 size_type block_size,
215 overflow_policy oflow_policy,
216 permissions const& perms
217 ) :
218 m_shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, boost::interprocess::permissions(perms.get_native())),
219 m_region(),
220 m_overflow_policy(oflow_policy),
221 m_block_size_mask(0u),
222 m_block_size_log2(0u),
223 m_stop(false),
224 m_name(name)
225 {
226 create_region(capacity, block_size);
227 }
228
229 //! The constructor creates a new shared memory segment or opens the existing one
230 implementation
231 (
232 open_mode::open_or_create_tag,
233 object_name const& name,
234 uint32_t capacity,
235 size_type block_size,
236 overflow_policy oflow_policy,
237 permissions const& perms
238 ) :
239 m_shared_memory(boost::interprocess::open_or_create, name.c_str(), boost::interprocess::read_write, boost::interprocess::permissions(perms.get_native())),
240 m_region(),
241 m_overflow_policy(oflow_policy),
242 m_block_size_mask(0u),
243 m_block_size_log2(0u),
244 m_stop(false),
245 m_name(name)
246 {
247 boost::interprocess::offset_t shmem_size = 0;
248 if (!m_shared_memory.get_size(shmem_size) || shmem_size == 0)
249 create_region(capacity, block_size);
250 else
251 adopt_region(shmem_size);
252 }
253
254 //! The constructor opens the existing shared memory segment
255 implementation
256 (
257 open_mode::open_only_tag,
258 object_name const& name,
259 overflow_policy oflow_policy
260 ) :
261 m_shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write),
262 m_region(),
263 m_overflow_policy(oflow_policy),
264 m_block_size_mask(0u),
265 m_block_size_log2(0u),
266 m_stop(false),
267 m_name(name)
268 {
269 boost::interprocess::offset_t shmem_size = 0;
270 if (!m_shared_memory.get_size(shmem_size))
271 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment not found");
272
273 adopt_region(shmem_size);
274 }
275
276 ~implementation()
277 {
278 close_region();
279 }
280
281 object_name const& name() const BOOST_NOEXCEPT
282 {
283 return m_name;
284 }
285
286 uint32_t capacity() const BOOST_NOEXCEPT
287 {
288 return get_header()->m_capacity;
289 }
290
291 size_type block_size() const BOOST_NOEXCEPT
292 {
293 return get_header()->m_block_size;
294 }
295
296 operation_result send(void const* message_data, size_type message_size)
297 {
298 const uint32_t block_count = estimate_block_count(message_size);
299
300 header* const hdr = get_header();
301
302 if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
303 BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
304
305 if (m_stop)
306 return aborted;
307
308 lock_queue();
309 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
310
311 while (true)
312 {
313 if (m_stop)
314 return aborted;
315
316 if ((hdr->m_capacity - hdr->m_size) >= block_count)
317 break;
318
319 const overflow_policy oflow_policy = m_overflow_policy;
320 if (oflow_policy == fail_on_overflow)
321 return no_space;
322 else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow))
323 BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full");
324
325 hdr->m_nonfull_queue.wait(hdr->m_mutex);
326 }
327
328 enqueue_message(message_data, message_size, block_count);
329
330 return succeeded;
331 }
332
333 bool try_send(void const* message_data, size_type message_size)
334 {
335 const uint32_t block_count = estimate_block_count(message_size);
336
337 header* const hdr = get_header();
338
339 if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
340 BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
341
342 if (m_stop)
343 return false;
344
345 lock_queue();
346 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
347
348 if (m_stop)
349 return false;
350
351 if ((hdr->m_capacity - hdr->m_size) < block_count)
352 return false;
353
354 enqueue_message(message_data, message_size, block_count);
355
356 return true;
357 }
358
359 operation_result receive(receive_handler handler, void* state)
360 {
361 if (m_stop)
362 return aborted;
363
364 lock_queue();
365 header* const hdr = get_header();
366 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
367
368 while (true)
369 {
370 if (m_stop)
371 return aborted;
372
373 if (hdr->m_size > 0u)
374 break;
375
376 hdr->m_nonempty_queue.wait(hdr->m_mutex);
377 }
378
379 dequeue_message(handler, state);
380
381 return succeeded;
382 }
383
384 bool try_receive(receive_handler handler, void* state)
385 {
386 if (m_stop)
387 return false;
388
389 lock_queue();
390 header* const hdr = get_header();
391 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
392
393 if (hdr->m_size == 0u)
394 return false;
395
396 dequeue_message(handler, state);
397
398 return true;
399 }
400
401 void stop_local()
402 {
403 if (m_stop)
404 return;
405
406 lock_queue();
407 header* const hdr = get_header();
408 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
409
410 m_stop = true;
411
412 hdr->m_nonempty_queue.notify_all();
413 hdr->m_nonfull_queue.notify_all();
414 }
415
416 void reset_local()
417 {
418 m_stop = false;
419 }
420
421 void clear()
422 {
423 lock_queue();
424 header* const hdr = get_header();
425 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
426 clear_queue();
427 }
428
429 private:
430 header* get_header() const BOOST_NOEXCEPT
431 {
432 return static_cast< header* >(m_region.get_address());
433 }
434
435 static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT
436 {
437 return boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size);
438 }
439
440 void create_region(uint32_t capacity, size_type block_size)
441 {
442 const std::size_t shmem_size = estimate_region_size(capacity, block_size);
443 m_shared_memory.truncate(shmem_size);
444 boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0u, shmem_size).swap(m_region);
445
446 new (m_region.get_address()) header(capacity, block_size);
447
448 init_block_size(block_size);
449 }
450
451 void adopt_region(std::size_t shmem_size)
452 {
453 if (shmem_size < sizeof(header))
454 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
455
456 boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0u, shmem_size).swap(m_region);
457
458 // Wait until the mapped region becomes initialized
459 header* const hdr = get_header();
460 BOOST_CONSTEXPR_OR_CONST unsigned int wait_loops = 200u, spin_loops = 16u, spins = 16u;
461 for (unsigned int i = 0; i < wait_loops; ++i)
462 {
463 uint32_t ref_count = hdr->m_ref_count.load(boost::memory_order_acquire);
464 while (ref_count > 0u)
465 {
466 if (hdr->m_ref_count.compare_exchange_weak(ref_count, ref_count + 1u, boost::memory_order_acq_rel, boost::memory_order_acquire))
467 goto done;
468 }
469
470 if (i < spin_loops)
471 {
472 for (unsigned int j = 0; j < spins; ++j)
473 {
474 boost::log::aux::pause();
475 }
476 }
477 else
478 {
479 #if defined(BOOST_HAS_SCHED_YIELD)
480 sched_yield();
481 #elif defined(BOOST_HAS_PTHREAD_YIELD)
482 pthread_yield();
483 #elif defined(BOOST_HAS_NANOSLEEP)
484 timespec ts = {};
485 ts.tv_sec = 0;
486 ts.tv_nsec = 1000;
487 nanosleep(&ts, NULL);
488 #else
489 usleep(1);
490 #endif
491 }
492 }
493
494 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
495
496 done:
497 try
498 {
499 // Check that the queue layout matches the current process ABI
500 if (hdr->m_abi_tag != header::get_abi_tag())
501 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible");
502
503 if (!boost::log::aux::is_power_of_2(hdr->m_block_size))
504 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2");
505
506 init_block_size(hdr->m_block_size);
507 }
508 catch (...)
509 {
510 close_region();
511 throw;
512 }
513 }
514
515 void close_region() BOOST_NOEXCEPT
516 {
517 header* const hdr = get_header();
518
519 if (hdr->m_ref_count.fetch_sub(1u, boost::memory_order_acq_rel) == 1u)
520 {
521 boost::interprocess::shared_memory_object::remove(m_shared_memory.get_name());
522
523 hdr->~header();
524
525 boost::interprocess::mapped_region().swap(m_region);
526 boost::interprocess::shared_memory_object().swap(m_shared_memory);
527
528 m_block_size_mask = 0u;
529 m_block_size_log2 = 0u;
530 }
531 }
532
533 void init_block_size(size_type block_size)
534 {
535 m_block_size_mask = block_size - 1u;
536
537 uint32_t block_size_log2 = 0u;
538 if ((block_size & 0x0000ffff) == 0u)
539 {
540 block_size >>= 16u;
541 block_size_log2 += 16u;
542 }
543 if ((block_size & 0x000000ff) == 0u)
544 {
545 block_size >>= 8u;
546 block_size_log2 += 8u;
547 }
548 if ((block_size & 0x0000000f) == 0u)
549 {
550 block_size >>= 4u;
551 block_size_log2 += 4u;
552 }
553 if ((block_size & 0x00000003) == 0u)
554 {
555 block_size >>= 2u;
556 block_size_log2 += 2u;
557 }
558 if ((block_size & 0x00000001) == 0u)
559 {
560 ++block_size_log2;
561 }
562 m_block_size_log2 = block_size_log2;
563 }
564
565 void lock_queue()
566 {
567 header* const hdr = get_header();
568
569 #if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST)
570 try
571 {
572 #endif
573 hdr->m_mutex.lock();
574 #if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST)
575 }
576 catch (boost::log::ipc::aux::lock_owner_dead&)
577 {
578 // The mutex is locked by the current thread, but the previous owner terminated without releasing the lock
579 try
580 {
581 clear_queue();
582 hdr->m_mutex.recover();
583 }
584 catch (...)
585 {
586 hdr->m_mutex.unlock();
587 throw;
588 }
589 }
590 #endif
591 }
592
593 void clear_queue()
594 {
595 header* const hdr = get_header();
596 hdr->m_size = 0u;
597 hdr->m_put_pos = 0u;
598 hdr->m_get_pos = 0u;
599 hdr->m_nonfull_queue.notify_all();
600 }
601
602 //! Returns the number of allocation blocks that are required to store user's payload of the specified size
603 uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT
604 {
605 // ceil((size + get_header_overhead()) / block_size)
606 return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2);
607 }
608
609 //! Puts the message to the back of the queue
610 void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count)
611 {
612 header* const hdr = get_header();
613
614 const uint32_t capacity = hdr->m_capacity;
615 const size_type block_size = hdr->m_block_size;
616 uint32_t pos = hdr->m_put_pos;
617
618 block_header* block = hdr->get_block(pos);
619 block->m_size = message_size;
620
621 size_type write_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
622 std::memcpy(block->get_data(), message_data, write_size);
623
624 pos += block_count;
625 if (BOOST_UNLIKELY(pos >= capacity))
626 {
627 // Write the rest of the message at the beginning of the queue
628 pos -= capacity;
629 message_data = static_cast< const unsigned char* >(message_data) + write_size;
630 write_size = message_size - write_size;
631 if (write_size > 0u)
632 std::memcpy(hdr->get_block(0u), message_data, write_size);
633 }
634
635 hdr->m_put_pos = pos;
636
637 const uint32_t old_queue_size = hdr->m_size;
638 hdr->m_size = old_queue_size + block_count;
639 if (old_queue_size == 0u)
640 hdr->m_nonempty_queue.notify_one();
641 }
642
643 //! Retrieves the next message and invokes the handler to store the message contents
644 void dequeue_message(receive_handler handler, void* state)
645 {
646 header* const hdr = get_header();
647
648 const uint32_t capacity = hdr->m_capacity;
649 const size_type block_size = hdr->m_block_size;
650 uint32_t pos = hdr->m_get_pos;
651
652 block_header* block = hdr->get_block(pos);
653 size_type message_size = block->m_size;
654 uint32_t block_count = estimate_block_count(message_size);
655
656 BOOST_ASSERT(block_count <= hdr->m_size);
657
658 size_type read_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
659 handler(state, block->get_data(), read_size);
660
661 pos += block_count;
662 if (BOOST_UNLIKELY(pos >= capacity))
663 {
664 // Read the tail of the message
665 pos -= capacity;
666 read_size = message_size - read_size;
667 if (read_size > 0u)
668 handler(state, hdr->get_block(0u), read_size);
669 }
670
671 hdr->m_get_pos = pos;
672 hdr->m_size -= block_count;
673
674 hdr->m_nonfull_queue.notify_all();
675 }
676 };
677
678 BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
679 {
680 BOOST_ASSERT(m_impl == NULL);
681 if (!boost::log::aux::is_power_of_2(block_size))
682 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
683 try
684 {
685 m_impl = new implementation(open_mode::create_only, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
686 }
687 catch (boost::exception& e)
688 {
689 e << boost::log::ipc::object_name_info(name);
690 throw;
691 }
692 catch (boost::interprocess::interprocess_exception& e)
693 {
694 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
695 }
696 }
697
698 BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
699 {
700 BOOST_ASSERT(m_impl == NULL);
701 if (!boost::log::aux::is_power_of_2(block_size))
702 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
703 try
704 {
705 m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
706 }
707 catch (boost::exception& e)
708 {
709 e << boost::log::ipc::object_name_info(name);
710 throw;
711 }
712 catch (boost::interprocess::interprocess_exception& e)
713 {
714 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
715 }
716 }
717
718 BOOST_LOG_API void reliable_message_queue::open(object_name const& name, overflow_policy oflow_policy, permissions const&)
719 {
720 BOOST_ASSERT(m_impl == NULL);
721 try
722 {
723 m_impl = new implementation(open_mode::open_only, name, oflow_policy);
724 }
725 catch (boost::exception& e)
726 {
727 e << boost::log::ipc::object_name_info(name);
728 throw;
729 }
730 catch (boost::interprocess::interprocess_exception& e)
731 {
732 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
733 }
734 }
735
736 BOOST_LOG_API void reliable_message_queue::clear()
737 {
738 BOOST_ASSERT(m_impl != NULL);
739 try
740 {
741 m_impl->clear();
742 }
743 catch (boost::exception& e)
744 {
745 e << boost::log::ipc::object_name_info(m_impl->name());
746 throw;
747 }
748 }
749
750 BOOST_LOG_API object_name const& reliable_message_queue::name() const
751 {
752 BOOST_ASSERT(m_impl != NULL);
753 return m_impl->name();
754 }
755
756 BOOST_LOG_API uint32_t reliable_message_queue::capacity() const
757 {
758 BOOST_ASSERT(m_impl != NULL);
759 return m_impl->capacity();
760 }
761
762 BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const
763 {
764 BOOST_ASSERT(m_impl != NULL);
765 return m_impl->block_size();
766 }
767
768 BOOST_LOG_API void reliable_message_queue::stop_local()
769 {
770 BOOST_ASSERT(m_impl != NULL);
771 try
772 {
773 m_impl->stop_local();
774 }
775 catch (boost::exception& e)
776 {
777 e << boost::log::ipc::object_name_info(m_impl->name());
778 throw;
779 }
780 }
781
782 BOOST_LOG_API void reliable_message_queue::reset_local()
783 {
784 BOOST_ASSERT(m_impl != NULL);
785 try
786 {
787 m_impl->reset_local();
788 }
789 catch (boost::exception& e)
790 {
791 e << boost::log::ipc::object_name_info(m_impl->name());
792 throw;
793 }
794 }
795
796 BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT
797 {
798 delete m_impl;
799 m_impl = NULL;
800 }
801
802 BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size)
803 {
804 BOOST_ASSERT(m_impl != NULL);
805 try
806 {
807 return m_impl->send(message_data, message_size);
808 }
809 catch (boost::exception& e)
810 {
811 e << boost::log::ipc::object_name_info(m_impl->name());
812 throw;
813 }
814 }
815
816 BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size)
817 {
818 BOOST_ASSERT(m_impl != NULL);
819 try
820 {
821 return m_impl->try_send(message_data, message_size);
822 }
823 catch (boost::exception& e)
824 {
825 e << boost::log::ipc::object_name_info(m_impl->name());
826 throw;
827 }
828 }
829
830 BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::do_receive(receive_handler handler, void* state)
831 {
832 BOOST_ASSERT(m_impl != NULL);
833 try
834 {
835 return m_impl->receive(handler, state);
836 }
837 catch (boost::exception& e)
838 {
839 e << boost::log::ipc::object_name_info(m_impl->name());
840 throw;
841 }
842 }
843
844 BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handler, void* state)
845 {
846 BOOST_ASSERT(m_impl != NULL);
847 try
848 {
849 return m_impl->try_receive(handler, state);
850 }
851 catch (boost::exception& e)
852 {
853 e << boost::log::ipc::object_name_info(m_impl->name());
854 throw;
855 }
856 }
857
858 //! Fixed buffer receive handler
859 BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size)
860 {
861 fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state);
862 if (BOOST_UNLIKELY(size > p->size))
863 BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message"));
864
865 std::memcpy(p->data, data, size);
866 p->data += size;
867 p->size -= size;
868 }
869
870 BOOST_LOG_API void reliable_message_queue::remove(object_name const& name)
871 {
872 boost::interprocess::shared_memory_object::remove(name.c_str());
873 }
874
875 } // namespace ipc
876
877 BOOST_LOG_CLOSE_NAMESPACE // namespace log
878
879 } // namespace boost
880
881 #include <boost/log/detail/footer.hpp>