#include <cstddef>
#include <cerrno>
#include <cstring>
+#include <ctime>
#include <new>
#include <string>
#include <stdexcept>
#include <boost/assert.hpp>
#include <boost/static_assert.hpp>
#include <boost/cstdint.hpp>
+#include <boost/memory_order.hpp>
#include <boost/atomic/atomic.hpp>
+#include <boost/atomic/ipc_atomic.hpp>
#include <boost/atomic/capabilities.hpp>
#include <boost/throw_exception.hpp>
#include <boost/log/exceptions.hpp>
#include "bit_tools.hpp"
#include <boost/log/detail/header.hpp>
-#if BOOST_ATOMIC_INT32_LOCK_FREE != 2
-// 32-bit atomic ops are required to be able to place atomic<uint32_t> in the process-shared memory
-#error Boost.Log: Native 32-bit atomic operations are required but not supported by Boost.Atomic on the target platform
-#endif
-
namespace boost {
BOOST_LOG_OPEN_NAMESPACE
//! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)];
//! Reference counter. Also acts as a flag indicating that the queue is constructed (i.e. the queue is constructed when the counter is not 0).
- boost::atomic< uint32_t > m_ref_count;
+ boost::ipc_atomic< uint32_t > m_ref_count;
//! Number of allocation blocks in the queue.
const uint32_t m_capacity;
//! Size of an allocation block, in bytes.
m_get_pos(0u)
{
// Must be initialized last. m_ref_count is zero-initialized initially.
- m_ref_count.fetch_add(1u, boost::memory_order_release);
+ m_ref_count.opaque_add(1u, boost::memory_order_release);
}
//! Returns the header structure ABI tag
//! The number of the bit set in block_size (i.e. log base 2 of block_size)
uint32_t m_block_size_log2;
//! The flag indicates that stop has been requested
- bool m_stop;
+ boost::atomic< bool > m_stop;
//! Queue shared memory object name
const object_name m_name;
+ //! The total number of loop iterations in \c adopt_region for waiting for the region initialization to complete
+ static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops = 200u;
+ //! Threshold of the number of loop iterations in \c adopt_region for using pause instructions for yielding
+ static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops_pause = 16u;
+ //! Threshold of the number of loop iterations in \c adopt_region for using \c short_yield for yielding
+ static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops_short_yield = 64u;
+ //! Timeout, in seconds, for performing shared memory creation/opening loop
+ static BOOST_CONSTEXPR_OR_CONST unsigned int region_open_or_create_timeout = 60u;
+ //! The number of short yields to perform during the shared memory creation/opening loop
+ static BOOST_CONSTEXPR_OR_CONST unsigned int region_open_or_create_short_yield_loops = 64u;
+
public:
//! The constructor creates a new shared memory segment
implementation
overflow_policy oflow_policy,
permissions const& perms
) :
- m_shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, boost::interprocess::permissions(perms.get_native())),
+ m_shared_memory(),
m_region(),
m_overflow_policy(oflow_policy),
m_block_size_mask(0u),
m_stop(false),
m_name(name)
{
+ BOOST_ASSERT(block_size >= block_header::get_header_overhead());
+
+ boost::interprocess::permissions ipc_perms(perms.get_native());
+ while (true)
+ {
+ try
+ {
+ boost::interprocess::shared_memory_object shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, ipc_perms);
+ m_shared_memory.swap(shared_memory);
+ break;
+ }
+ catch (boost::interprocess::interprocess_exception& e)
+ {
+ // shared_memory_object does not handle EINTR returned from shm_open internally.
+ // https://github.com/boostorg/interprocess/issues/152
+ if (e.get_native_error() != EINTR)
+ throw;
+ }
+ }
+
create_region(capacity, block_size);
}
overflow_policy oflow_policy,
permissions const& perms
) :
- m_shared_memory(boost::interprocess::open_or_create, name.c_str(), boost::interprocess::read_write, boost::interprocess::permissions(perms.get_native())),
+ m_shared_memory(),
m_region(),
m_overflow_policy(oflow_policy),
m_block_size_mask(0u),
m_stop(false),
m_name(name)
{
- boost::interprocess::offset_t shmem_size = 0;
- if (!m_shared_memory.get_size(shmem_size) || shmem_size == 0)
+ BOOST_ASSERT(block_size >= block_header::get_header_overhead());
+
+ // We need to know for certain whether we create the shared memory segment or open an existing one.
+ // This is to ensure that only one thread initializes the segment and all other threads wait until completion.
+ // Since shared_memory_object(open_or_create) constructor does not report whether the segment was actually created,
+ // we have to loop trying to create or open the segment. https://github.com/boostorg/interprocess/issues/151
+ boost::interprocess::permissions ipc_perms(perms.get_native());
+ bool created = false;
+ unsigned int i = 0u;
+ std::time_t start_time = std::time(NULL);
+ while (true)
+ {
+ while (true)
+ {
+ try
+ {
+ boost::interprocess::shared_memory_object shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, ipc_perms);
+ m_shared_memory.swap(shared_memory);
+ created = true;
+ goto done;
+ }
+ catch (boost::interprocess::interprocess_exception& e)
+ {
+ if (e.get_error_code() == boost::interprocess::already_exists_error)
+ break;
+
+ // shared_memory_object does not handle EINTR returned from shm_open internally.
+ // https://github.com/boostorg/interprocess/issues/152
+ if (e.get_native_error() != EINTR)
+ throw;
+ }
+ }
+
+ while (true)
+ {
+ try
+ {
+ boost::interprocess::shared_memory_object shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write);
+ m_shared_memory.swap(shared_memory);
+ created = false;
+ goto done;
+ }
+ catch (boost::interprocess::interprocess_exception& e)
+ {
+ if (e.get_error_code() == boost::interprocess::not_found_error)
+ break;
+
+ if (e.get_native_error() != EINTR)
+ throw;
+ }
+ }
+
+ std::time_t now = std::time(NULL);
+ if (BOOST_UNLIKELY((now - start_time) >= region_open_or_create_timeout))
+ BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be created or opened: shared memory segment failed to be created or opened until timeout (possible livelock)");
+
+ if (i < region_open_or_create_short_yield_loops)
+ short_yield();
+ else
+ long_yield();
+
+ ++i;
+ }
+
+ done:
+ if (created)
create_region(capacity, block_size);
else
- adopt_region(shmem_size);
+ adopt_region();
}
//! The constructor opens the existing shared memory segment
object_name const& name,
overflow_policy oflow_policy
) :
- m_shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write),
+ m_shared_memory(),
m_region(),
m_overflow_policy(oflow_policy),
m_block_size_mask(0u),
m_stop(false),
m_name(name)
{
- boost::interprocess::offset_t shmem_size = 0;
- if (!m_shared_memory.get_size(shmem_size))
- BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment not found");
+ while (true)
+ {
+ try
+ {
+ boost::interprocess::shared_memory_object shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write);
+ m_shared_memory.swap(shared_memory);
+ break;
+ }
+ catch (boost::interprocess::interprocess_exception& e)
+ {
+ // shared_memory_object does not handle EINTR returned from shm_open internally.
+ // https://github.com/boostorg/interprocess/issues/152
+ if (e.get_native_error() != EINTR)
+ throw;
+ }
+ }
- adopt_region(shmem_size);
+ adopt_region();
}
~implementation()
if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
- if (m_stop)
+ if (m_stop.load(boost::memory_order_relaxed))
return aborted;
lock_queue();
while (true)
{
- if (m_stop)
+ if (m_stop.load(boost::memory_order_relaxed))
return aborted;
if ((hdr->m_capacity - hdr->m_size) >= block_count)
if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
- if (m_stop)
+ if (m_stop.load(boost::memory_order_relaxed))
return false;
lock_queue();
boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
- if (m_stop)
+ if (m_stop.load(boost::memory_order_relaxed))
return false;
if ((hdr->m_capacity - hdr->m_size) < block_count)
operation_result receive(receive_handler handler, void* state)
{
- if (m_stop)
+ if (m_stop.load(boost::memory_order_relaxed))
return aborted;
lock_queue();
while (true)
{
- if (m_stop)
+ if (m_stop.load(boost::memory_order_relaxed))
return aborted;
if (hdr->m_size > 0u)
bool try_receive(receive_handler handler, void* state)
{
- if (m_stop)
+ if (m_stop.load(boost::memory_order_relaxed))
return false;
lock_queue();
void stop_local()
{
- if (m_stop)
+ if (m_stop.load(boost::memory_order_relaxed))
return;
lock_queue();
header* const hdr = get_header();
boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
- m_stop = true;
+ m_stop.store(true, boost::memory_order_relaxed);
hdr->m_nonempty_queue.notify_all();
hdr->m_nonfull_queue.notify_all();
void reset_local()
{
- m_stop = false;
+ m_stop.store(false, boost::memory_order_relaxed);
}
void clear()
init_block_size(block_size);
}
- void adopt_region(std::size_t shmem_size)
+ void adopt_region()
{
- if (shmem_size < sizeof(header))
+ std::size_t shmem_size = 0u;
+ unsigned int i = 0u;
+ std::time_t start_time = std::time(NULL);
+ while (true)
+ {
+ boost::interprocess::offset_t shm_size = 0;
+ const bool get_size_result = m_shared_memory.get_size(shm_size);
+ if (BOOST_LIKELY(get_size_result && shm_size > 0))
+ {
+ shmem_size = static_cast< std::size_t >(shm_size);
+ break;
+ }
+
+ std::time_t now = std::time(NULL);
+ if (BOOST_UNLIKELY((now - start_time) >= region_open_or_create_timeout))
+ {
+ if (get_size_result)
+ goto shmem_size_too_small;
+ BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size could not be determined until timeout");
+ }
+
+ if (i < region_open_or_create_short_yield_loops)
+ short_yield();
+ else
+ long_yield();
+
+ ++i;
+ }
+
+ if (BOOST_UNLIKELY(shmem_size < sizeof(header)))
+ {
+ shmem_size_too_small:
BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
+ }
- boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0u, shmem_size).swap(m_region);
+ boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0, shmem_size).swap(m_region);
// Wait until the mapped region becomes initialized
header* const hdr = get_header();
- BOOST_CONSTEXPR_OR_CONST unsigned int wait_loops = 200u, spin_loops = 16u, spins = 16u;
- for (unsigned int i = 0; i < wait_loops; ++i)
+ for (i = 0u; i < region_init_wait_loops; ++i)
{
uint32_t ref_count = hdr->m_ref_count.load(boost::memory_order_acquire);
while (ref_count > 0u)
goto done;
}
- if (i < spin_loops)
- {
- for (unsigned int j = 0; j < spins; ++j)
- {
- boost::log::aux::pause();
- }
- }
+ if (i < region_init_wait_loops_pause)
+ boost::log::aux::pause();
+ else if (i < region_init_wait_loops_short_yield)
+ short_yield();
else
- {
-#if defined(BOOST_HAS_SCHED_YIELD)
- sched_yield();
-#elif defined(BOOST_HAS_PTHREAD_YIELD)
- pthread_yield();
-#elif defined(BOOST_HAS_NANOSLEEP)
- timespec ts = {};
- ts.tv_sec = 0;
- ts.tv_nsec = 1000;
- nanosleep(&ts, NULL);
-#else
- usleep(1);
-#endif
- }
+ long_yield();
}
BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
const uint32_t capacity = hdr->m_capacity;
const size_type block_size = hdr->m_block_size;
uint32_t pos = hdr->m_put_pos;
+ BOOST_ASSERT(pos < capacity);
block_header* block = hdr->get_block(pos);
block->m_size = message_size;
const uint32_t capacity = hdr->m_capacity;
const size_type block_size = hdr->m_block_size;
uint32_t pos = hdr->m_get_pos;
+ BOOST_ASSERT(pos < capacity);
block_header* block = hdr->get_block(pos);
size_type message_size = block->m_size;
hdr->m_nonfull_queue.notify_all();
}
+
+ static void short_yield() BOOST_NOEXCEPT
+ {
+#if defined(BOOST_HAS_SCHED_YIELD)
+ sched_yield();
+#elif defined(BOOST_HAS_PTHREAD_YIELD)
+ pthread_yield();
+#else
+ long_yield();
+#endif
+ }
+
+ static void long_yield() BOOST_NOEXCEPT
+ {
+#if defined(BOOST_HAS_NANOSLEEP)
+ timespec ts = {};
+ ts.tv_sec = 0;
+ ts.tv_nsec = 1000;
+ nanosleep(&ts, NULL);
+#else
+ usleep(1);
+#endif
+ }
};
BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)