]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/boost/libs/log/src/posix/ipc_reliable_message_queue.cpp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / libs / log / src / posix / ipc_reliable_message_queue.cpp
index 07d61fa55f95862d73be116b24eb1c4899edb04d..39333834b54874fad6afbd13b30f1b73cd33f123 100644 (file)
@@ -21,6 +21,7 @@
 #include <cstddef>
 #include <cerrno>
 #include <cstring>
+#include <ctime>
 #include <new>
 #include <string>
 #include <stdexcept>
@@ -36,7 +37,9 @@
 #include <boost/assert.hpp>
 #include <boost/static_assert.hpp>
 #include <boost/cstdint.hpp>
+#include <boost/memory_order.hpp>
 #include <boost/atomic/atomic.hpp>
+#include <boost/atomic/ipc_atomic.hpp>
 #include <boost/atomic/capabilities.hpp>
 #include <boost/throw_exception.hpp>
 #include <boost/log/exceptions.hpp>
 #include "bit_tools.hpp"
 #include <boost/log/detail/header.hpp>
 
-#if BOOST_ATOMIC_INT32_LOCK_FREE != 2
-// 32-bit atomic ops are required to be able to place atomic<uint32_t> in the process-shared memory
-#error Boost.Log: Native 32-bit atomic operations are required but not supported by Boost.Atomic on the target platform
-#endif
-
 namespace boost {
 
 BOOST_LOG_OPEN_NAMESPACE
@@ -106,7 +104,7 @@ private:
         //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
         unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)];
         //! Reference counter. Also acts as a flag indicating that the queue is constructed (i.e. the queue is constructed when the counter is not 0).
-        boost::atomic< uint32_t > m_ref_count;
+        boost::ipc_atomic< uint32_t > m_ref_count;
         //! Number of allocation blocks in the queue.
         const uint32_t m_capacity;
         //! Size of an allocation block, in bytes.
@@ -133,7 +131,7 @@ private:
             m_get_pos(0u)
         {
             // Must be initialized last. m_ref_count is zero-initialized initially.
-            m_ref_count.fetch_add(1u, boost::memory_order_release);
+            m_ref_count.opaque_add(1u, boost::memory_order_release);
         }
 
         //! Returns the header structure ABI tag
@@ -199,11 +197,22 @@ private:
     //! The number of the bit set in block_size (i.e. log base 2 of block_size)
     uint32_t m_block_size_log2;
     //! The flag indicates that stop has been requested
-    bool m_stop;
+    boost::atomic< bool > m_stop;
 
     //! Queue shared memory object name
     const object_name m_name;
 
+    //! The total number of loop iterations in \c adopt_region for waiting for the region initialization to complete
+    static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops = 200u;
+    //! Threshold of the number of loop iterations in \c adopt_region for using pause instructions for yielding
+    static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops_pause = 16u;
+    //! Threshold of the number of loop iterations in \c adopt_region for using \c short_yield for yielding
+    static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops_short_yield = 64u;
+    //! Timeout, in seconds, for performing shared memory creation/opening loop
+    static BOOST_CONSTEXPR_OR_CONST unsigned int region_open_or_create_timeout = 60u;
+    //! The number of short yields to perform during the shared memory creation/opening loop
+    static BOOST_CONSTEXPR_OR_CONST unsigned int region_open_or_create_short_yield_loops = 64u;
+
 public:
     //! The constructor creates a new shared memory segment
     implementation
@@ -215,7 +224,7 @@ public:
         overflow_policy oflow_policy,
         permissions const& perms
     ) :
-        m_shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, boost::interprocess::permissions(perms.get_native())),
+        m_shared_memory(),
         m_region(),
         m_overflow_policy(oflow_policy),
         m_block_size_mask(0u),
@@ -223,6 +232,26 @@ public:
         m_stop(false),
         m_name(name)
     {
+        BOOST_ASSERT(block_size >= block_header::get_header_overhead());
+
+        boost::interprocess::permissions ipc_perms(perms.get_native());
+        while (true)
+        {
+            try
+            {
+                boost::interprocess::shared_memory_object shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, ipc_perms);
+                m_shared_memory.swap(shared_memory);
+                break;
+            }
+            catch (boost::interprocess::interprocess_exception& e)
+            {
+                // shared_memory_object does not handle EINTR returned from shm_open internally.
+                // https://github.com/boostorg/interprocess/issues/152
+                if (e.get_native_error() != EINTR)
+                    throw;
+            }
+        }
+
         create_region(capacity, block_size);
     }
 
@@ -236,7 +265,7 @@ public:
         overflow_policy oflow_policy,
         permissions const& perms
     ) :
-        m_shared_memory(boost::interprocess::open_or_create, name.c_str(), boost::interprocess::read_write, boost::interprocess::permissions(perms.get_native())),
+        m_shared_memory(),
         m_region(),
         m_overflow_policy(oflow_policy),
         m_block_size_mask(0u),
@@ -244,11 +273,75 @@ public:
         m_stop(false),
         m_name(name)
     {
-        boost::interprocess::offset_t shmem_size = 0;
-        if (!m_shared_memory.get_size(shmem_size) || shmem_size == 0)
+        BOOST_ASSERT(block_size >= block_header::get_header_overhead());
+
+        // We need to know for certain whether we create the shared memory segment or open an existing one.
+        // This is to ensure that only one thread initializes the segment and all other threads wait until completion.
+        // Since shared_memory_object(open_or_create) constructor does not report whether the segment was actually created,
+        // we have to loop trying to create or open the segment. https://github.com/boostorg/interprocess/issues/151
+        boost::interprocess::permissions ipc_perms(perms.get_native());
+        bool created = false;
+        unsigned int i = 0u;
+        std::time_t start_time = std::time(NULL);
+        while (true)
+        {
+            while (true)
+            {
+                try
+                {
+                    boost::interprocess::shared_memory_object shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, ipc_perms);
+                    m_shared_memory.swap(shared_memory);
+                    created = true;
+                    goto done;
+                }
+                catch (boost::interprocess::interprocess_exception& e)
+                {
+                    if (e.get_error_code() == boost::interprocess::already_exists_error)
+                        break;
+
+                    // shared_memory_object does not handle EINTR returned from shm_open internally.
+                    // https://github.com/boostorg/interprocess/issues/152
+                    if (e.get_native_error() != EINTR)
+                       throw;
+                }
+            }
+
+            while (true)
+            {
+                try
+                {
+                    boost::interprocess::shared_memory_object shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write);
+                    m_shared_memory.swap(shared_memory);
+                    created = false;
+                    goto done;
+                }
+                catch (boost::interprocess::interprocess_exception& e)
+                {
+                    if (e.get_error_code() == boost::interprocess::not_found_error)
+                        break;
+
+                    if (e.get_native_error() != EINTR)
+                        throw;
+                }
+            }
+
+            std::time_t now = std::time(NULL);
+            if (BOOST_UNLIKELY((now - start_time) >= region_open_or_create_timeout))
+                BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be created or opened: shared memory segment failed to be created or opened until timeout (possible livelock)");
+
+            if (i < region_open_or_create_short_yield_loops)
+                short_yield();
+            else
+                long_yield();
+
+            ++i;
+        }
+
+    done:
+        if (created)
             create_region(capacity, block_size);
         else
-            adopt_region(shmem_size);
+            adopt_region();
     }
 
     //! The constructor opens the existing shared memory segment
@@ -258,7 +351,7 @@ public:
         object_name const& name,
         overflow_policy oflow_policy
     ) :
-        m_shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write),
+        m_shared_memory(),
         m_region(),
         m_overflow_policy(oflow_policy),
         m_block_size_mask(0u),
@@ -266,11 +359,24 @@ public:
         m_stop(false),
         m_name(name)
     {
-        boost::interprocess::offset_t shmem_size = 0;
-        if (!m_shared_memory.get_size(shmem_size))
-            BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment not found");
+        while (true)
+        {
+            try
+            {
+                boost::interprocess::shared_memory_object shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write);
+                m_shared_memory.swap(shared_memory);
+                break;
+            }
+            catch (boost::interprocess::interprocess_exception& e)
+            {
+                // shared_memory_object does not handle EINTR returned from shm_open internally.
+                // https://github.com/boostorg/interprocess/issues/152
+                if (e.get_native_error() != EINTR)
+                    throw;
+            }
+        }
 
-        adopt_region(shmem_size);
+        adopt_region();
     }
 
     ~implementation()
@@ -302,7 +408,7 @@ public:
         if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
             BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
 
-        if (m_stop)
+        if (m_stop.load(boost::memory_order_relaxed))
             return aborted;
 
         lock_queue();
@@ -310,7 +416,7 @@ public:
 
         while (true)
         {
-            if (m_stop)
+            if (m_stop.load(boost::memory_order_relaxed))
                 return aborted;
 
             if ((hdr->m_capacity - hdr->m_size) >= block_count)
@@ -339,13 +445,13 @@ public:
         if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
             BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
 
-        if (m_stop)
+        if (m_stop.load(boost::memory_order_relaxed))
             return false;
 
         lock_queue();
         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
 
-        if (m_stop)
+        if (m_stop.load(boost::memory_order_relaxed))
             return false;
 
         if ((hdr->m_capacity - hdr->m_size) < block_count)
@@ -358,7 +464,7 @@ public:
 
     operation_result receive(receive_handler handler, void* state)
     {
-        if (m_stop)
+        if (m_stop.load(boost::memory_order_relaxed))
             return aborted;
 
         lock_queue();
@@ -367,7 +473,7 @@ public:
 
         while (true)
         {
-            if (m_stop)
+            if (m_stop.load(boost::memory_order_relaxed))
                 return aborted;
 
             if (hdr->m_size > 0u)
@@ -383,7 +489,7 @@ public:
 
     bool try_receive(receive_handler handler, void* state)
     {
-        if (m_stop)
+        if (m_stop.load(boost::memory_order_relaxed))
             return false;
 
         lock_queue();
@@ -400,14 +506,14 @@ public:
 
     void stop_local()
     {
-        if (m_stop)
+        if (m_stop.load(boost::memory_order_relaxed))
             return;
 
         lock_queue();
         header* const hdr = get_header();
         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
 
-        m_stop = true;
+        m_stop.store(true, boost::memory_order_relaxed);
 
         hdr->m_nonempty_queue.notify_all();
         hdr->m_nonfull_queue.notify_all();
@@ -415,7 +521,7 @@ public:
 
     void reset_local()
     {
-        m_stop = false;
+        m_stop.store(false, boost::memory_order_relaxed);
     }
 
     void clear()
@@ -448,17 +554,48 @@ private:
         init_block_size(block_size);
     }
 
-    void adopt_region(std::size_t shmem_size)
+    void adopt_region()
     {
-        if (shmem_size < sizeof(header))
+        std::size_t shmem_size = 0u;
+        unsigned int i = 0u;
+        std::time_t start_time = std::time(NULL);
+        while (true)
+        {
+            boost::interprocess::offset_t shm_size = 0;
+            const bool get_size_result = m_shared_memory.get_size(shm_size);
+            if (BOOST_LIKELY(get_size_result && shm_size > 0))
+            {
+                shmem_size = static_cast< std::size_t >(shm_size);
+                break;
+            }
+
+            std::time_t now = std::time(NULL);
+            if (BOOST_UNLIKELY((now - start_time) >= region_open_or_create_timeout))
+            {
+                if (get_size_result)
+                    goto shmem_size_too_small;
+                BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size could not be determined until timeout");
+            }
+
+            if (i < region_open_or_create_short_yield_loops)
+                short_yield();
+            else
+                long_yield();
+
+            ++i;
+        }
+
+        if (BOOST_UNLIKELY(shmem_size < sizeof(header)))
+        {
+        shmem_size_too_small:
             BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
+        }
 
-        boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0u, shmem_size).swap(m_region);
+        boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0, shmem_size).swap(m_region);
 
         // Wait until the mapped region becomes initialized
         header* const hdr = get_header();
-        BOOST_CONSTEXPR_OR_CONST unsigned int wait_loops = 200u, spin_loops = 16u, spins = 16u;
-        for (unsigned int i = 0; i < wait_loops; ++i)
+        for (i = 0u; i < region_init_wait_loops; ++i)
         {
             uint32_t ref_count = hdr->m_ref_count.load(boost::memory_order_acquire);
             while (ref_count > 0u)
@@ -467,28 +604,12 @@ private:
                     goto done;
             }
 
-            if (i < spin_loops)
-            {
-                for (unsigned int j = 0; j < spins; ++j)
-                {
-                    boost::log::aux::pause();
-                }
-            }
+            if (i < region_init_wait_loops_pause)
+                boost::log::aux::pause();
+            else if (i < region_init_wait_loops_short_yield)
+                short_yield();
             else
-            {
-#if defined(BOOST_HAS_SCHED_YIELD)
-                sched_yield();
-#elif defined(BOOST_HAS_PTHREAD_YIELD)
-                pthread_yield();
-#elif defined(BOOST_HAS_NANOSLEEP)
-                timespec ts = {};
-                ts.tv_sec = 0;
-                ts.tv_nsec = 1000;
-                nanosleep(&ts, NULL);
-#else
-                usleep(1);
-#endif
-            }
+                long_yield();
         }
 
         BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
@@ -614,6 +735,7 @@ private:
         const uint32_t capacity = hdr->m_capacity;
         const size_type block_size = hdr->m_block_size;
         uint32_t pos = hdr->m_put_pos;
+        BOOST_ASSERT(pos < capacity);
 
         block_header* block = hdr->get_block(pos);
         block->m_size = message_size;
@@ -648,6 +770,7 @@ private:
         const uint32_t capacity = hdr->m_capacity;
         const size_type block_size = hdr->m_block_size;
         uint32_t pos = hdr->m_get_pos;
+        BOOST_ASSERT(pos < capacity);
 
         block_header* block = hdr->get_block(pos);
         size_type message_size = block->m_size;
@@ -673,6 +796,29 @@ private:
 
         hdr->m_nonfull_queue.notify_all();
     }
+
+    static void short_yield() BOOST_NOEXCEPT
+    {
+#if defined(BOOST_HAS_SCHED_YIELD)
+        sched_yield();
+#elif defined(BOOST_HAS_PTHREAD_YIELD)
+        pthread_yield();
+#else
+        long_yield();
+#endif
+    }
+
+    static void long_yield() BOOST_NOEXCEPT
+    {
+#if defined(BOOST_HAS_NANOSLEEP)
+        timespec ts = {};
+        ts.tv_sec = 0;
+        ts.tv_nsec = 1000;
+        nanosleep(&ts, NULL);
+#else
+        usleep(1);
+#endif
+    }
 };
 
 BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)