]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/log/include/boost/log/sinks/async_frontend.hpp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / boost / libs / log / include / boost / log / sinks / async_frontend.hpp
1 /*
2 * Copyright Andrey Semashev 2007 - 2015.
3 * Distributed under the Boost Software License, Version 1.0.
4 * (See accompanying file LICENSE_1_0.txt or copy at
5 * http://www.boost.org/LICENSE_1_0.txt)
6 */
7 /*!
8 * \file async_frontend.hpp
9 * \author Andrey Semashev
10 * \date 14.07.2009
11 *
12 * The header contains implementation of asynchronous sink frontend.
13 */
14
15 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
16 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
17
18 #include <exception> // std::terminate
19 #include <boost/log/detail/config.hpp>
20
21 #ifdef BOOST_HAS_PRAGMA_ONCE
22 #pragma once
23 #endif
24
25 #if defined(BOOST_LOG_NO_THREADS)
26 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
27 #endif
28
29 #include <boost/bind.hpp>
30 #include <boost/static_assert.hpp>
31 #include <boost/memory_order.hpp>
32 #include <boost/atomic/atomic.hpp>
33 #include <boost/smart_ptr/shared_ptr.hpp>
34 #include <boost/smart_ptr/make_shared_object.hpp>
35 #include <boost/preprocessor/control/if.hpp>
36 #include <boost/preprocessor/comparison/equal.hpp>
37 #include <boost/thread/locks.hpp>
38 #include <boost/thread/recursive_mutex.hpp>
39 #include <boost/thread/thread.hpp>
40 #include <boost/thread/condition_variable.hpp>
41 #include <boost/log/exceptions.hpp>
42 #include <boost/log/detail/locking_ptr.hpp>
43 #include <boost/log/detail/parameter_tools.hpp>
44 #include <boost/log/core/record_view.hpp>
45 #include <boost/log/sinks/basic_sink_frontend.hpp>
46 #include <boost/log/sinks/frontend_requirements.hpp>
47 #include <boost/log/sinks/unbounded_fifo_queue.hpp>
48 #include <boost/log/keywords/start_thread.hpp>
49 #include <boost/log/detail/header.hpp>
50
51 namespace boost {
52
53 BOOST_LOG_OPEN_NAMESPACE
54
55 namespace sinks {
56
57 #ifndef BOOST_LOG_DOXYGEN_PASS
58
59 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(n, data)\
60 template< typename T0 >\
61 explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
62 base_type(true),\
63 queue_base_type(arg0),\
64 m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
65 m_StopRequested(false),\
66 m_FlushRequested(false)\
67 {\
68 if (arg0[keywords::start_thread | true])\
69 start_feeding_thread();\
70 }\
71 template< typename T0 >\
72 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\
73 base_type(true),\
74 queue_base_type(arg0),\
75 m_pBackend(backend),\
76 m_StopRequested(false),\
77 m_FlushRequested(false)\
78 {\
79 if (arg0[keywords::start_thread | true])\
80 start_feeding_thread();\
81 }
82
83 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(n, data)\
84 template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
85 explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
86 base_type(true),\
87 queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
88 m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\
89 m_StopRequested(false),\
90 m_FlushRequested(false)\
91 {\
92 if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
93 start_feeding_thread();\
94 }\
95 template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
96 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
97 base_type(true),\
98 queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
99 m_pBackend(backend),\
100 m_StopRequested(false),\
101 m_FlushRequested(false)\
102 {\
103 if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
104 start_feeding_thread();\
105 }
106
107 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
108 BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(n, data)
109
110 #endif // BOOST_LOG_DOXYGEN_PASS
111
112 /*!
113 * \brief Asynchronous logging sink frontend
114 *
115 * The frontend starts a separate thread on construction. All logging records are passed
116 * to the backend in this dedicated thread only.
117 */
118 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
119 class asynchronous_sink :
120 public aux::make_sink_frontend_base< SinkBackendT >::type,
121 public QueueingStrategyT
122 {
123 typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
124 typedef QueueingStrategyT queue_base_type;
125
126 private:
127 //! Backend synchronization mutex type
128 typedef boost::recursive_mutex backend_mutex_type;
129 //! Frontend synchronization mutex type
130 typedef typename base_type::mutex_type frontend_mutex_type;
131
132 //! A scope guard that implements thread ID management
133 class scoped_thread_id
134 {
135 private:
136 frontend_mutex_type& m_Mutex;
137 condition_variable_any& m_Cond;
138 thread::id& m_ThreadID;
139 boost::atomic< bool >& m_StopRequested;
140
141 public:
142 //! Initializing constructor
143 scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr)
144 : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
145 {
146 lock_guard< frontend_mutex_type > lock(m_Mutex);
147 if (m_ThreadID != thread::id())
148 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
149 m_ThreadID = this_thread::get_id();
150 }
151 //! Initializing constructor
152 scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr)
153 : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
154 {
155 unique_lock< frontend_mutex_type > lock(move(l));
156 if (m_ThreadID != thread::id())
157 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
158 m_ThreadID = this_thread::get_id();
159 }
160 //! Destructor
161 ~scoped_thread_id()
162 {
163 try
164 {
165 lock_guard< frontend_mutex_type > lock(m_Mutex);
166 m_StopRequested.store(false, boost::memory_order_release);
167 m_ThreadID = thread::id();
168 m_Cond.notify_all();
169 }
170 catch (...)
171 {
172 }
173 }
174
175 BOOST_DELETED_FUNCTION(scoped_thread_id(scoped_thread_id const&))
176 BOOST_DELETED_FUNCTION(scoped_thread_id& operator= (scoped_thread_id const&))
177 };
178
179 //! A scope guard that resets a flag on destructor
180 class scoped_flag
181 {
182 private:
183 frontend_mutex_type& m_Mutex;
184 condition_variable_any& m_Cond;
185 boost::atomic< bool >& m_Flag;
186
187 public:
188 explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, boost::atomic< bool >& f) :
189 m_Mutex(mut), m_Cond(cond), m_Flag(f)
190 {
191 }
192 ~scoped_flag()
193 {
194 try
195 {
196 lock_guard< frontend_mutex_type > lock(m_Mutex);
197 m_Flag.store(false, boost::memory_order_release);
198 m_Cond.notify_all();
199 }
200 catch (...)
201 {
202 }
203 }
204
205 BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&))
206 BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&))
207 };
208
209 public:
210 //! Sink implementation type
211 typedef SinkBackendT sink_backend_type;
212 //! \cond
213 BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
214 //! \endcond
215
216 #ifndef BOOST_LOG_DOXYGEN_PASS
217
218 //! A pointer type that locks the backend until it's destroyed
219 typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
220
221 #else // BOOST_LOG_DOXYGEN_PASS
222
223 //! A pointer type that locks the backend until it's destroyed
224 typedef implementation_defined locked_backend_ptr;
225
226 #endif // BOOST_LOG_DOXYGEN_PASS
227
228 private:
229 //! Synchronization mutex
230 backend_mutex_type m_BackendMutex;
231 //! Pointer to the backend
232 const shared_ptr< sink_backend_type > m_pBackend;
233
234 //! Dedicated record feeding thread
235 thread m_DedicatedFeedingThread;
236 //! Feeding thread ID
237 thread::id m_FeedingThreadID;
238 //! Condition variable to implement blocking operations
239 condition_variable_any m_BlockCond;
240
241 //! The flag indicates that the feeding loop has to be stopped
242 boost::atomic< bool > m_StopRequested;
243 //! The flag indicates that queue flush has been requested
244 boost::atomic< bool > m_FlushRequested;
245
246 public:
247 /*!
248 * Default constructor. Constructs the sink backend instance.
249 * Requires the backend to be default-constructible.
250 *
251 * \param start_thread If \c true, the frontend creates a thread to feed
252 * log records to the backend. Otherwise no thread is
253 * started and it is assumed that the user will call
254 * either \c run or \c feed_records himself.
255 */
256 asynchronous_sink(bool start_thread = true) :
257 base_type(true),
258 m_pBackend(boost::make_shared< sink_backend_type >()),
259 m_StopRequested(false),
260 m_FlushRequested(false)
261 {
262 if (start_thread)
263 start_feeding_thread();
264 }
265 /*!
266 * Constructor attaches user-constructed backend instance
267 *
268 * \param backend Pointer to the backend instance.
269 * \param start_thread If \c true, the frontend creates a thread to feed
270 * log records to the backend. Otherwise no thread is
271 * started and it is assumed that the user will call
272 * either \c run or \c feed_records himself.
273 *
274 * \pre \a backend is not \c NULL.
275 */
276 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
277 base_type(true),
278 m_pBackend(backend),
279 m_StopRequested(false),
280 m_FlushRequested(false)
281 {
282 if (start_thread)
283 start_feeding_thread();
284 }
285
286 /*!
287 * Constructor that passes arbitrary named parameters to the interprocess sink backend constructor.
288 * Refer to the backend documentation for the list of supported parameters.
289 *
290 * The frontend uses the following named parameters:
291 *
292 * \li start_thread - If \c true, the frontend creates a thread to feed
293 * log records to the backend. Otherwise no thread is
294 * started and it is assumed that the user will call
295 * either \c run or \c feed_records himself.
296 */
297 #ifndef BOOST_LOG_DOXYGEN_PASS
298 BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
299 #else
300 template< typename... Args >
301 explicit asynchronous_sink(Args&&... args);
302 #endif
303
304 /*!
305 * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
306 */
307 ~asynchronous_sink() BOOST_NOEXCEPT
308 {
309 try
310 {
311 boost::this_thread::disable_interruption no_interrupts;
312 stop();
313 }
314 catch (...)
315 {
316 std::terminate();
317 }
318 }
319
320 /*!
321 * Locking accessor to the attached backend
322 */
323 locked_backend_ptr locked_backend()
324 {
325 return locked_backend_ptr(m_pBackend, m_BackendMutex);
326 }
327
328 /*!
329 * Enqueues the log record to the backend
330 */
331 void consume(record_view const& rec)
332 {
333 if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
334 {
335 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
336 // Wait until flush is done
337 while (m_FlushRequested.load(boost::memory_order_acquire))
338 m_BlockCond.wait(lock);
339 }
340 queue_base_type::enqueue(rec);
341 }
342
343 /*!
344 * The method attempts to pass logging record to the backend
345 */
346 bool try_consume(record_view const& rec)
347 {
348 if (!m_FlushRequested.load(boost::memory_order_acquire))
349 {
350 return queue_base_type::try_enqueue(rec);
351 }
352 else
353 return false;
354 }
355
356 /*!
357 * The method starts record feeding loop and effectively blocks until either of this happens:
358 *
359 * \li the thread is interrupted due to either standard thread interruption or a call to \c stop
360 * \li an exception is thrown while processing a log record in the backend, and the exception is
361 * not terminated by the exception handler, if one is installed
362 *
363 * \pre The sink frontend must be constructed without spawning a dedicated thread
364 */
365 void run()
366 {
367 // First check that no other thread is running
368 scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
369
370 // Now start the feeding loop
371 while (true)
372 {
373 do_feed_records();
374 if (!m_StopRequested.load(boost::memory_order_acquire))
375 {
376 // Block until new record is available
377 record_view rec;
378 if (queue_base_type::dequeue_ready(rec))
379 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
380 }
381 else
382 break;
383 }
384 }
385
386 /*!
387 * The method softly interrupts record feeding loop. This method must be called when the \c run
388 * method execution has to be interrupted. Unlike regular thread interruption, calling
389 * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend
390 * will attempt to finish its business with the record in progress and return afterwards.
391 * This method can be called either if the sink was created with a dedicated thread,
392 * or if the feeding loop was initiated by user.
393 *
394 * \note Returning from this method does not guarantee that there are no records left buffered
395 * in the sink frontend. It is possible that log records keep coming during and after this
396 * method is called. At some point of execution of this method log records stop being processed,
397 * and all records that come after this point are put into the queue. These records will be
398 * processed upon further calls to \c run or \c feed_records.
399 */
400 void stop()
401 {
402 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
403 if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
404 {
405 try
406 {
407 m_StopRequested.store(true, boost::memory_order_release);
408 queue_base_type::interrupt_dequeue();
409 while (m_StopRequested.load(boost::memory_order_acquire))
410 m_BlockCond.wait(lock);
411 }
412 catch (...)
413 {
414 m_StopRequested.store(false, boost::memory_order_release);
415 throw;
416 }
417
418 lock.unlock();
419 m_DedicatedFeedingThread.join();
420 }
421 }
422
423 /*!
424 * The method feeds log records that may have been buffered to the backend and returns
425 *
426 * \pre The sink frontend must be constructed without spawning a dedicated thread
427 */
428 void feed_records()
429 {
430 // First check that no other thread is running
431 scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
432
433 // Now start the feeding loop
434 do_feed_records();
435 }
436
437 /*!
438 * The method feeds all log records that may have been buffered to the backend and returns.
439 * Unlike \c feed_records, in case of ordering queueing the method also feeds records
440 * that were enqueued during the ordering window, attempting to empty the queue completely.
441 */
442 void flush()
443 {
444 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
445 if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
446 {
447 // There is already a thread feeding records, let it do the job
448 m_FlushRequested.store(true, boost::memory_order_release);
449 queue_base_type::interrupt_dequeue();
450 while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
451 m_BlockCond.wait(lock);
452
453 // The condition may have been signalled when the feeding thread was finishing.
454 // In that case records may not have been flushed, and we do the flush ourselves.
455 if (m_FeedingThreadID != thread::id())
456 return;
457 }
458
459 m_FlushRequested.store(true, boost::memory_order_release);
460
461 // Flush records ourselves. The guard releases the lock.
462 scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested);
463
464 do_feed_records();
465 }
466
467 private:
468 #ifndef BOOST_LOG_DOXYGEN_PASS
469 //! The method spawns record feeding thread
470 void start_feeding_thread()
471 {
472 boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread);
473 }
474
475 //! The record feeding loop
476 void do_feed_records()
477 {
478 while (!m_StopRequested.load(boost::memory_order_acquire))
479 {
480 record_view rec;
481 bool dequeued = false;
482 if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire)))
483 dequeued = queue_base_type::try_dequeue_ready(rec);
484 else
485 dequeued = queue_base_type::try_dequeue(rec);
486
487 if (dequeued)
488 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
489 else
490 break;
491 }
492
493 if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
494 {
495 scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
496 base_type::flush_backend(m_BackendMutex, *m_pBackend);
497 }
498 }
499 #endif // BOOST_LOG_DOXYGEN_PASS
500 };
501
502 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1
503 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N
504 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
505
506 } // namespace sinks
507
508 BOOST_LOG_CLOSE_NAMESPACE // namespace log
509
510 } // namespace boost
511
512 #include <boost/log/detail/footer.hpp>
513
514 #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_