]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/boost/boost/log/sinks/async_frontend.hpp
import quincy beta 17.1.0
[ceph.git] / ceph / src / boost / boost / log / sinks / async_frontend.hpp
index d7bfd30aed83ac378ca673a6b6d6b1d064346c10..a329ce7a243e77994ac356dd8361a07e39960c5e 100644 (file)
@@ -26,7 +26,6 @@
 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
 #endif
 
-#include <boost/bind/bind.hpp>
 #include <boost/static_assert.hpp>
 #include <boost/memory_order.hpp>
 #include <boost/atomic/atomic.hpp>
@@ -56,12 +55,13 @@ namespace sinks {
 
 #ifndef BOOST_LOG_DOXYGEN_PASS
 
-#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(n, data)\
+#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(z, n, data)\
     template< typename T0 >\
     explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
         base_type(true),\
         queue_base_type(arg0),\
         m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
+        m_ActiveOperation(idle),\
         m_StopRequested(false),\
         m_FlushRequested(false)\
     {\
@@ -73,6 +73,7 @@ namespace sinks {
         base_type(true),\
         queue_base_type(arg0),\
         m_pBackend(backend),\
+        m_ActiveOperation(idle),\
         m_StopRequested(false),\
         m_FlushRequested(false)\
     {\
@@ -80,32 +81,34 @@ namespace sinks {
             start_feeding_thread();\
     }
 
-#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(n, data)\
-    template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
-    explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
+#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(z, n, data)\
+    template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
+    explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
         base_type(true),\
-        queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
-        m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\
+        queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
+        m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
+        m_ActiveOperation(idle),\
         m_StopRequested(false),\
         m_FlushRequested(false)\
     {\
-        if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
+        if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
             start_feeding_thread();\
     }\
-    template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
-    explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
+    template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
+    explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
         base_type(true),\
-        queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
+        queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
         m_pBackend(backend),\
+        m_ActiveOperation(idle),\
         m_StopRequested(false),\
         m_FlushRequested(false)\
     {\
-        if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
+        if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
             start_feeding_thread();\
     }
 
 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
-    BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(n, data)
+    BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(z, n, data)
 
 #endif // BOOST_LOG_DOXYGEN_PASS
 
@@ -113,7 +116,13 @@ namespace sinks {
  * \brief Asynchronous logging sink frontend
  *
  * The frontend starts a separate thread on construction. All logging records are passed
- * to the backend in this dedicated thread only.
+ * to the backend in this dedicated thread.
+ *
+ * The user can prevent spawning the internal thread by specifying \c start_thread parameter
+ * with the value of \c false on construction. In this case log records will be buffered
+ * in the internal queue until the user calls \c run, \c feed_records or \c flush in his own
+ * thread. Log record queueing strategy is specified in the \c QueueingStrategyT template
+ * parameter.
  */
 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
 class asynchronous_sink :
@@ -129,51 +138,53 @@ private:
     //! Frontend synchronization mutex type
     typedef typename base_type::mutex_type frontend_mutex_type;
 
-    //! A scope guard that implements thread ID management
-    class scoped_thread_id
+    //! Operation bit mask
+    enum operation
+    {
+        idle = 0u,
+        feeding_records = 1u,
+        flushing = 3u
+    };
+
+    //! Function object to run the log record feeding thread
+    class run_func
     {
+    public:
+        typedef void result_type;
+
     private:
-        frontend_mutex_type& m_Mutex;
-        condition_variable_any& m_Cond;
-        thread::id& m_ThreadID;
-        boost::atomic< bool >& m_StopRequested;
+        asynchronous_sink* m_self;
 
     public:
-        //! Initializing constructor
-        scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr)
-            : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
+        explicit run_func(asynchronous_sink* self) BOOST_NOEXCEPT : m_self(self)
         {
-            lock_guard< frontend_mutex_type > lock(m_Mutex);
-            if (m_ThreadID != thread::id())
-                BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
-            m_ThreadID = this_thread::get_id();
         }
+
+        result_type operator()() const
+        {
+            m_self->run();
+        }
+    };
+
+    //! A scope guard that implements active operation management
+    class scoped_feeding_opereation
+    {
+    private:
+        asynchronous_sink& m_self;
+
+    public:
         //! Initializing constructor
-        scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr)
-            : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
+        explicit scoped_feeding_opereation(asynchronous_sink& self) : m_self(self)
         {
-            unique_lock< frontend_mutex_type > lock(move(l));
-            if (m_ThreadID != thread::id())
-                BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
-            m_ThreadID = this_thread::get_id();
         }
         //! Destructor
-        ~scoped_thread_id()
+        ~scoped_feeding_opereation()
         {
-            try
-            {
-                lock_guard< frontend_mutex_type > lock(m_Mutex);
-                m_StopRequested.store(false, boost::memory_order_release);
-                m_ThreadID = thread::id();
-                m_Cond.notify_all();
-            }
-            catch (...)
-            {
-            }
+            m_self.complete_feeding_operation();
         }
 
-        BOOST_DELETED_FUNCTION(scoped_thread_id(scoped_thread_id const&))
-        BOOST_DELETED_FUNCTION(scoped_thread_id& operator= (scoped_thread_id const&))
+        BOOST_DELETED_FUNCTION(scoped_feeding_opereation(scoped_feeding_opereation const&))
+        BOOST_DELETED_FUNCTION(scoped_feeding_opereation& operator= (scoped_feeding_opereation const&))
     };
 
     //! A scope guard that resets a flag on destructor
@@ -194,7 +205,7 @@ private:
             try
             {
                 lock_guard< frontend_mutex_type > lock(m_Mutex);
-                m_Flag.store(false, boost::memory_order_release);
+                m_Flag.store(false, boost::memory_order_relaxed);
                 m_Cond.notify_all();
             }
             catch (...)
@@ -233,11 +244,11 @@ private:
 
     //! Dedicated record feeding thread
     thread m_DedicatedFeedingThread;
-    //! Feeding thread ID
-    thread::id m_FeedingThreadID;
     //! Condition variable to implement blocking operations
     condition_variable_any m_BlockCond;
 
+    //! Currently active operation
+    operation m_ActiveOperation;
     //! The flag indicates that the feeding loop has to be stopped
     boost::atomic< bool > m_StopRequested;
     //! The flag indicates that queue flush has been requested
@@ -251,11 +262,12 @@ public:
      * \param start_thread If \c true, the frontend creates a thread to feed
      *                     log records to the backend. Otherwise no thread is
      *                     started and it is assumed that the user will call
-     *                     either \c run or \c feed_records himself.
+     *                     \c run, \c feed_records or \c flush himself.
      */
-    asynchronous_sink(bool start_thread = true) :
+    explicit asynchronous_sink(bool start_thread = true) :
         base_type(true),
         m_pBackend(boost::make_shared< sink_backend_type >()),
+        m_ActiveOperation(idle),
         m_StopRequested(false),
         m_FlushRequested(false)
     {
@@ -269,13 +281,14 @@ public:
      * \param start_thread If \c true, the frontend creates a thread to feed
      *                     log records to the backend. Otherwise no thread is
      *                     started and it is assumed that the user will call
-     *                     either \c run or \c feed_records himself.
+     *                     \c run, \c feed_records or \c flush himself.
      *
      * \pre \a backend is not \c NULL.
      */
     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
         base_type(true),
         m_pBackend(backend),
+        m_ActiveOperation(idle),
         m_StopRequested(false),
         m_FlushRequested(false)
     {
@@ -292,7 +305,7 @@ public:
      *   \li start_thread - If \c true, the frontend creates a thread to feed
      *                      log records to the backend. Otherwise no thread is
      *                      started and it is assumed that the user will call
-     *                      either \c run or \c feed_records himself.
+     *                      \c run, \c feed_records or \c flush himself.
      */
 #ifndef BOOST_LOG_DOXYGEN_PASS
     BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
@@ -304,7 +317,7 @@ public:
     /*!
      * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
      */
-    ~asynchronous_sink() BOOST_NOEXCEPT
+    ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
     {
         try
         {
@@ -328,7 +341,7 @@ public:
     /*!
      * Enqueues the log record to the backend
      */
-    void consume(record_view const& rec)
+    void consume(record_view const& rec) BOOST_OVERRIDE
     {
         if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
         {
@@ -343,7 +356,7 @@ public:
     /*!
      * The method attempts to pass logging record to the backend
      */
-    bool try_consume(record_view const& rec)
+    bool try_consume(record_view const& rec) BOOST_OVERRIDE
     {
         if (!m_FlushRequested.load(boost::memory_order_acquire))
         {
@@ -365,7 +378,13 @@ public:
     void run()
     {
         // First check that no other thread is running
-        scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
+        {
+            unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
+            if (start_feeding_operation(lock, feeding_records))
+                return;
+        }
+
+        scoped_feeding_opereation guard(*this);
 
         // Now start the feeding loop
         while (true)
@@ -384,40 +403,42 @@ public:
     }
 
     /*!
-     * The method softly interrupts record feeding loop. This method must be called when the \c run
-     * method execution has to be interrupted. Unlike regular thread interruption, calling
-     * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend
-     * will attempt to finish its business with the record in progress and return afterwards.
-     * This method can be called either if the sink was created with a dedicated thread,
-     * or if the feeding loop was initiated by user.
+     * The method softly interrupts record feeding loop. This method must be called when \c run,
+     * \c feed_records or \c flush method execution has to be interrupted. Unlike regular thread
+     * interruption, calling \c stop will not interrupt the record processing in the middle.
+     * Instead, the sink frontend will attempt to finish its business with the record in progress
+     * and return afterwards. This method can be called either if the sink was created with
+     * an internal dedicated thread, or if the feeding loop was initiated by user.
+     *
+     * If no record feeding operation is in progress, calling \c stop marks the sink frontend
+     * so that the next feeding operation stops immediately.
      *
      * \note Returning from this method does not guarantee that there are no records left buffered
      *       in the sink frontend. It is possible that log records keep coming during and after this
      *       method is called. At some point of execution of this method log records stop being processed,
      *       and all records that come after this point are put into the queue. These records will be
      *       processed upon further calls to \c run or \c feed_records.
+     *
+     * \note If the record feeding loop is being run in a user's thread (i.e. \c start_thread was specified
+     *       as \c false on frontend construction), this method does not guarantee that upon return the thread
+     *       has returned from the record feeding loop or that it won't enter it in the future. The method
+     *       only ensures that the record feeding thread will eventually return from the feeding loop. It is
+     *       user's responsibility to synchronize with the user's record feeding thread.
      */
     void stop()
     {
-        unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
-        if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
+        boost::thread feeding_thread;
         {
-            try
-            {
-                m_StopRequested.store(true, boost::memory_order_release);
-                queue_base_type::interrupt_dequeue();
-                while (m_StopRequested.load(boost::memory_order_acquire))
-                    m_BlockCond.wait(lock);
-            }
-            catch (...)
-            {
-                m_StopRequested.store(false, boost::memory_order_release);
-                throw;
-            }
+            lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
+
+            m_StopRequested.store(true, boost::memory_order_release);
+            queue_base_type::interrupt_dequeue();
 
-            lock.unlock();
-            m_DedicatedFeedingThread.join();
+            m_DedicatedFeedingThread.swap(feeding_thread);
         }
+
+        if (feeding_thread.joinable())
+            feeding_thread.join();
     }
 
     /*!
@@ -428,7 +449,13 @@ public:
     void feed_records()
     {
         // First check that no other thread is running
-        scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
+        {
+            unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
+            if (start_feeding_operation(lock, feeding_records))
+                return;
+        }
+
+        scoped_feeding_opereation guard(*this);
 
         // Now start the feeding loop
         do_feed_records();
@@ -437,29 +464,31 @@ public:
     /*!
      * The method feeds all log records that may have been buffered to the backend and returns.
      * Unlike \c feed_records, in case of ordering queueing the method also feeds records
-     * that were enqueued during the ordering window, attempting to empty the queue completely.
+     * that were enqueued during the ordering window, attempting to drain the queue completely.
      */
-    void flush()
+    void flush() BOOST_OVERRIDE
     {
-        unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
-        if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
         {
-            // There is already a thread feeding records, let it do the job
-            m_FlushRequested.store(true, boost::memory_order_release);
-            queue_base_type::interrupt_dequeue();
-            while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
-                m_BlockCond.wait(lock);
+            unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
+            if (static_cast< unsigned int >(m_ActiveOperation & feeding_records) != 0u)
+            {
+                // There is already a thread feeding records, let it do the job
+                m_FlushRequested.store(true, boost::memory_order_release);
+                queue_base_type::interrupt_dequeue();
+                while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
+                    m_BlockCond.wait(lock);
 
-            // The condition may have been signalled when the feeding thread was finishing.
-            // In that case records may not have been flushed, and we do the flush ourselves.
-            if (m_FeedingThreadID != thread::id())
-                return;
-        }
+                // The condition may have been signalled when the feeding operation was finishing.
+                // In that case records may not have been flushed, and we do the flush ourselves.
+                if (m_ActiveOperation != idle)
+                    return;
+            }
 
-        m_FlushRequested.store(true, boost::memory_order_release);
+            m_ActiveOperation = flushing;
+            m_FlushRequested.store(true, boost::memory_order_relaxed);
+        }
 
-        // Flush records ourselves. The guard releases the lock.
-        scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested);
+        scoped_feeding_opereation guard(*this);
 
         do_feed_records();
     }
@@ -469,7 +498,44 @@ private:
     //! The method spawns record feeding thread
     void start_feeding_thread()
     {
-        boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread);
+        boost::thread(run_func(this)).swap(m_DedicatedFeedingThread);
+    }
+
+    //! Starts record feeding operation. The method blocks or throws if another feeding operation is in progress.
+    bool start_feeding_operation(unique_lock< frontend_mutex_type >& lock, operation op)
+    {
+        while (m_ActiveOperation != idle)
+        {
+            if (BOOST_UNLIKELY(op == feeding_records && m_ActiveOperation == feeding_records))
+                BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
+
+            if (BOOST_UNLIKELY(m_StopRequested.load(boost::memory_order_relaxed)))
+            {
+                m_StopRequested.store(false, boost::memory_order_relaxed);
+                return true;
+            }
+
+            m_BlockCond.wait(lock);
+        }
+
+        m_ActiveOperation = op;
+
+        return false;
+    }
+
+    //! Completes record feeding operation
+    void complete_feeding_operation() BOOST_NOEXCEPT
+    {
+        try
+        {
+            lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
+            m_ActiveOperation = idle;
+            m_StopRequested.store(false, boost::memory_order_relaxed);
+            m_BlockCond.notify_all();
+        }
+        catch (...)
+        {
+        }
     }
 
     //! The record feeding loop