#error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
#endif
-#include <boost/bind/bind.hpp>
#include <boost/static_assert.hpp>
#include <boost/memory_order.hpp>
#include <boost/atomic/atomic.hpp>
#ifndef BOOST_LOG_DOXYGEN_PASS
-#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(n, data)\
+#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(z, n, data)\
template< typename T0 >\
explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
base_type(true),\
queue_base_type(arg0),\
m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
+ m_ActiveOperation(idle),\
m_StopRequested(false),\
m_FlushRequested(false)\
{\
base_type(true),\
queue_base_type(arg0),\
m_pBackend(backend),\
+ m_ActiveOperation(idle),\
m_StopRequested(false),\
m_FlushRequested(false)\
{\
start_feeding_thread();\
}
-#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(n, data)\
- template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
- explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
+#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(z, n, data)\
+ template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
+ explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
base_type(true),\
- queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
- m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\
+ queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
+ m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
+ m_ActiveOperation(idle),\
m_StopRequested(false),\
m_FlushRequested(false)\
{\
- if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
+ if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
start_feeding_thread();\
}\
- template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
- explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
+ template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
+ explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
base_type(true),\
- queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
+ queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
m_pBackend(backend),\
+ m_ActiveOperation(idle),\
m_StopRequested(false),\
m_FlushRequested(false)\
{\
- if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
+ if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
start_feeding_thread();\
}
#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
- BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(n, data)
+ BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(z, n, data)
#endif // BOOST_LOG_DOXYGEN_PASS
* \brief Asynchronous logging sink frontend
*
* The frontend starts a separate thread on construction. All logging records are passed
- * to the backend in this dedicated thread only.
+ * to the backend in this dedicated thread.
+ *
+ * The user can prevent spawning the internal thread by specifying \c start_thread parameter
+ * with the value of \c false on construction. In this case log records will be buffered
+ * in the internal queue until the user calls \c run, \c feed_records or \c flush in his own
+ * thread. Log record queueing strategy is specified in the \c QueueingStrategyT template
+ * parameter.
*/
template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
class asynchronous_sink :
//! Frontend synchronization mutex type
typedef typename base_type::mutex_type frontend_mutex_type;
- //! A scope guard that implements thread ID management
- class scoped_thread_id
+ //! Operation bit mask
+ enum operation
+ {
+ idle = 0u,
+ feeding_records = 1u,
+ flushing = 3u
+ };
+
+ //! Function object to run the log record feeding thread
+ class run_func
{
+ public:
+ typedef void result_type;
+
private:
- frontend_mutex_type& m_Mutex;
- condition_variable_any& m_Cond;
- thread::id& m_ThreadID;
- boost::atomic< bool >& m_StopRequested;
+ asynchronous_sink* m_self;
public:
- //! Initializing constructor
- scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr)
- : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
+ explicit run_func(asynchronous_sink* self) BOOST_NOEXCEPT : m_self(self)
{
- lock_guard< frontend_mutex_type > lock(m_Mutex);
- if (m_ThreadID != thread::id())
- BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
- m_ThreadID = this_thread::get_id();
}
+
+ result_type operator()() const
+ {
+ m_self->run();
+ }
+ };
+
+ //! A scope guard that implements active operation management
+ class scoped_feeding_opereation
+ {
+ private:
+ asynchronous_sink& m_self;
+
+ public:
//! Initializing constructor
- scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr)
- : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
+ explicit scoped_feeding_opereation(asynchronous_sink& self) : m_self(self)
{
- unique_lock< frontend_mutex_type > lock(move(l));
- if (m_ThreadID != thread::id())
- BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
- m_ThreadID = this_thread::get_id();
}
//! Destructor
- ~scoped_thread_id()
+ ~scoped_feeding_opereation()
{
- try
- {
- lock_guard< frontend_mutex_type > lock(m_Mutex);
- m_StopRequested.store(false, boost::memory_order_release);
- m_ThreadID = thread::id();
- m_Cond.notify_all();
- }
- catch (...)
- {
- }
+ m_self.complete_feeding_operation();
}
- BOOST_DELETED_FUNCTION(scoped_thread_id(scoped_thread_id const&))
- BOOST_DELETED_FUNCTION(scoped_thread_id& operator= (scoped_thread_id const&))
+ BOOST_DELETED_FUNCTION(scoped_feeding_opereation(scoped_feeding_opereation const&))
+ BOOST_DELETED_FUNCTION(scoped_feeding_opereation& operator= (scoped_feeding_opereation const&))
};
//! A scope guard that resets a flag on destructor
try
{
lock_guard< frontend_mutex_type > lock(m_Mutex);
- m_Flag.store(false, boost::memory_order_release);
+ m_Flag.store(false, boost::memory_order_relaxed);
m_Cond.notify_all();
}
catch (...)
//! Dedicated record feeding thread
thread m_DedicatedFeedingThread;
- //! Feeding thread ID
- thread::id m_FeedingThreadID;
//! Condition variable to implement blocking operations
condition_variable_any m_BlockCond;
+ //! Currently active operation
+ operation m_ActiveOperation;
//! The flag indicates that the feeding loop has to be stopped
boost::atomic< bool > m_StopRequested;
//! The flag indicates that queue flush has been requested
* \param start_thread If \c true, the frontend creates a thread to feed
* log records to the backend. Otherwise no thread is
* started and it is assumed that the user will call
- * either \c run or \c feed_records himself.
+ * \c run, \c feed_records or \c flush himself.
*/
- asynchronous_sink(bool start_thread = true) :
+ explicit asynchronous_sink(bool start_thread = true) :
base_type(true),
m_pBackend(boost::make_shared< sink_backend_type >()),
+ m_ActiveOperation(idle),
m_StopRequested(false),
m_FlushRequested(false)
{
* \param start_thread If \c true, the frontend creates a thread to feed
* log records to the backend. Otherwise no thread is
* started and it is assumed that the user will call
- * either \c run or \c feed_records himself.
+ * \c run, \c feed_records or \c flush himself.
*
* \pre \a backend is not \c NULL.
*/
explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
base_type(true),
m_pBackend(backend),
+ m_ActiveOperation(idle),
m_StopRequested(false),
m_FlushRequested(false)
{
* \li start_thread - If \c true, the frontend creates a thread to feed
* log records to the backend. Otherwise no thread is
* started and it is assumed that the user will call
- * either \c run or \c feed_records himself.
+ * \c run, \c feed_records or \c flush himself.
*/
#ifndef BOOST_LOG_DOXYGEN_PASS
BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
/*!
* Destructor. Implicitly stops the dedicated feeding thread, if one is running.
*/
- ~asynchronous_sink() BOOST_NOEXCEPT
+ ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
{
try
{
/*!
* Enqueues the log record to the backend
*/
- void consume(record_view const& rec)
+ void consume(record_view const& rec) BOOST_OVERRIDE
{
if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
{
/*!
* The method attempts to pass logging record to the backend
*/
- bool try_consume(record_view const& rec)
+ bool try_consume(record_view const& rec) BOOST_OVERRIDE
{
if (!m_FlushRequested.load(boost::memory_order_acquire))
{
void run()
{
// First check that no other thread is running
- scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
+ {
+ unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
+ if (start_feeding_operation(lock, feeding_records))
+ return;
+ }
+
+ scoped_feeding_opereation guard(*this);
// Now start the feeding loop
while (true)
}
/*!
- * The method softly interrupts record feeding loop. This method must be called when the \c run
- * method execution has to be interrupted. Unlike regular thread interruption, calling
- * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend
- * will attempt to finish its business with the record in progress and return afterwards.
- * This method can be called either if the sink was created with a dedicated thread,
- * or if the feeding loop was initiated by user.
+ * The method softly interrupts record feeding loop. This method must be called when \c run,
+ * \c feed_records or \c flush method execution has to be interrupted. Unlike regular thread
+ * interruption, calling \c stop will not interrupt the record processing in the middle.
+ * Instead, the sink frontend will attempt to finish its business with the record in progress
+ * and return afterwards. This method can be called either if the sink was created with
+ * an internal dedicated thread, or if the feeding loop was initiated by user.
+ *
+ * If no record feeding operation is in progress, calling \c stop marks the sink frontend
+ * so that the next feeding operation stops immediately.
*
* \note Returning from this method does not guarantee that there are no records left buffered
* in the sink frontend. It is possible that log records keep coming during and after this
* method is called. At some point of execution of this method log records stop being processed,
* and all records that come after this point are put into the queue. These records will be
* processed upon further calls to \c run or \c feed_records.
+ *
+ * \note If the record feeding loop is being run in a user's thread (i.e. \c start_thread was specified
+ * as \c false on frontend construction), this method does not guarantee that upon return the thread
+ * has returned from the record feeding loop or that it won't enter it in the future. The method
+ * only ensures that the record feeding thread will eventually return from the feeding loop. It is
+ * user's responsibility to synchronize with the user's record feeding thread.
*/
void stop()
{
- unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
- if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
+ boost::thread feeding_thread;
{
- try
- {
- m_StopRequested.store(true, boost::memory_order_release);
- queue_base_type::interrupt_dequeue();
- while (m_StopRequested.load(boost::memory_order_acquire))
- m_BlockCond.wait(lock);
- }
- catch (...)
- {
- m_StopRequested.store(false, boost::memory_order_release);
- throw;
- }
+ lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
+
+ m_StopRequested.store(true, boost::memory_order_release);
+ queue_base_type::interrupt_dequeue();
- lock.unlock();
- m_DedicatedFeedingThread.join();
+ m_DedicatedFeedingThread.swap(feeding_thread);
}
+
+ if (feeding_thread.joinable())
+ feeding_thread.join();
}
/*!
void feed_records()
{
// First check that no other thread is running
- scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
+ {
+ unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
+ if (start_feeding_operation(lock, feeding_records))
+ return;
+ }
+
+ scoped_feeding_opereation guard(*this);
// Now start the feeding loop
do_feed_records();
/*!
* The method feeds all log records that may have been buffered to the backend and returns.
* Unlike \c feed_records, in case of ordering queueing the method also feeds records
- * that were enqueued during the ordering window, attempting to empty the queue completely.
+ * that were enqueued during the ordering window, attempting to drain the queue completely.
*/
- void flush()
+ void flush() BOOST_OVERRIDE
{
- unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
- if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
{
- // There is already a thread feeding records, let it do the job
- m_FlushRequested.store(true, boost::memory_order_release);
- queue_base_type::interrupt_dequeue();
- while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
- m_BlockCond.wait(lock);
+ unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
+ if (static_cast< unsigned int >(m_ActiveOperation & feeding_records) != 0u)
+ {
+ // There is already a thread feeding records, let it do the job
+ m_FlushRequested.store(true, boost::memory_order_release);
+ queue_base_type::interrupt_dequeue();
+ while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
+ m_BlockCond.wait(lock);
- // The condition may have been signalled when the feeding thread was finishing.
- // In that case records may not have been flushed, and we do the flush ourselves.
- if (m_FeedingThreadID != thread::id())
- return;
- }
+ // The condition may have been signalled when the feeding operation was finishing.
+ // In that case records may not have been flushed, and we do the flush ourselves.
+ if (m_ActiveOperation != idle)
+ return;
+ }
- m_FlushRequested.store(true, boost::memory_order_release);
+ m_ActiveOperation = flushing;
+ m_FlushRequested.store(true, boost::memory_order_relaxed);
+ }
- // Flush records ourselves. The guard releases the lock.
- scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested);
+ scoped_feeding_opereation guard(*this);
do_feed_records();
}
//! The method spawns record feeding thread
void start_feeding_thread()
{
- boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread);
+ boost::thread(run_func(this)).swap(m_DedicatedFeedingThread);
+ }
+
+ //! Starts record feeding operation. The method blocks or throws if another feeding operation is in progress.
+ bool start_feeding_operation(unique_lock< frontend_mutex_type >& lock, operation op)
+ {
+ while (m_ActiveOperation != idle)
+ {
+ if (BOOST_UNLIKELY(op == feeding_records && m_ActiveOperation == feeding_records))
+ BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
+
+ if (BOOST_UNLIKELY(m_StopRequested.load(boost::memory_order_relaxed)))
+ {
+ m_StopRequested.store(false, boost::memory_order_relaxed);
+ return true;
+ }
+
+ m_BlockCond.wait(lock);
+ }
+
+ m_ActiveOperation = op;
+
+ return false;
+ }
+
+ //! Completes record feeding operation
+ void complete_feeding_operation() BOOST_NOEXCEPT
+ {
+ try
+ {
+ lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
+ m_ActiveOperation = idle;
+ m_StopRequested.store(false, boost::memory_order_relaxed);
+ m_BlockCond.notify_all();
+ }
+ catch (...)
+ {
+ }
}
//! The record feeding loop