]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | /* |
2 | * Copyright Andrey Semashev 2007 - 2015. | |
3 | * Distributed under the Boost Software License, Version 1.0. | |
4 | * (See accompanying file LICENSE_1_0.txt or copy at | |
5 | * http://www.boost.org/LICENSE_1_0.txt) | |
6 | */ | |
7 | /*! | |
8 | * \file async_frontend.hpp | |
9 | * \author Andrey Semashev | |
10 | * \date 14.07.2009 | |
11 | * | |
12 | * The header contains implementation of asynchronous sink frontend. | |
13 | */ | |
14 | ||
15 | #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ | |
16 | #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ | |
17 | ||
18 | #include <exception> // std::terminate | |
19 | #include <boost/log/detail/config.hpp> | |
20 | ||
21 | #ifdef BOOST_HAS_PRAGMA_ONCE | |
22 | #pragma once | |
23 | #endif | |
24 | ||
25 | #if defined(BOOST_LOG_NO_THREADS) | |
26 | #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment | |
27 | #endif | |
28 | ||
29 | #include <boost/bind.hpp> | |
30 | #include <boost/static_assert.hpp> | |
31 | #include <boost/memory_order.hpp> | |
32 | #include <boost/atomic/atomic.hpp> | |
33 | #include <boost/smart_ptr/shared_ptr.hpp> | |
34 | #include <boost/smart_ptr/make_shared_object.hpp> | |
35 | #include <boost/preprocessor/control/if.hpp> | |
36 | #include <boost/preprocessor/comparison/equal.hpp> | |
37 | #include <boost/thread/locks.hpp> | |
38 | #include <boost/thread/recursive_mutex.hpp> | |
39 | #include <boost/thread/thread.hpp> | |
40 | #include <boost/thread/condition_variable.hpp> | |
41 | #include <boost/log/exceptions.hpp> | |
42 | #include <boost/log/detail/locking_ptr.hpp> | |
43 | #include <boost/log/detail/parameter_tools.hpp> | |
44 | #include <boost/log/core/record_view.hpp> | |
45 | #include <boost/log/sinks/basic_sink_frontend.hpp> | |
46 | #include <boost/log/sinks/frontend_requirements.hpp> | |
47 | #include <boost/log/sinks/unbounded_fifo_queue.hpp> | |
48 | #include <boost/log/keywords/start_thread.hpp> | |
49 | #include <boost/log/detail/header.hpp> | |
50 | ||
51 | namespace boost { | |
52 | ||
53 | BOOST_LOG_OPEN_NAMESPACE | |
54 | ||
55 | namespace sinks { | |
56 | ||
57 | #ifndef BOOST_LOG_DOXYGEN_PASS | |
58 | ||
59 | #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(n, data)\ | |
60 | template< typename T0 >\ | |
61 | explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\ | |
62 | base_type(true),\ | |
63 | queue_base_type(arg0),\ | |
64 | m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\ | |
65 | m_StopRequested(false),\ | |
66 | m_FlushRequested(false)\ | |
67 | {\ | |
68 | if (arg0[keywords::start_thread | true])\ | |
69 | start_feeding_thread();\ | |
70 | }\ | |
71 | template< typename T0 >\ | |
72 | explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\ | |
73 | base_type(true),\ | |
74 | queue_base_type(arg0),\ | |
75 | m_pBackend(backend),\ | |
76 | m_StopRequested(false),\ | |
77 | m_FlushRequested(false)\ | |
78 | {\ | |
79 | if (arg0[keywords::start_thread | true])\ | |
80 | start_feeding_thread();\ | |
81 | } | |
82 | ||
83 | #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(n, data)\ | |
84 | template< BOOST_PP_ENUM_PARAMS(n, typename T) >\ | |
85 | explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\ | |
86 | base_type(true),\ | |
87 | queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\ | |
88 | m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\ | |
89 | m_StopRequested(false),\ | |
90 | m_FlushRequested(false)\ | |
91 | {\ | |
92 | if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\ | |
93 | start_feeding_thread();\ | |
94 | }\ | |
95 | template< BOOST_PP_ENUM_PARAMS(n, typename T) >\ | |
96 | explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\ | |
97 | base_type(true),\ | |
98 | queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\ | |
99 | m_pBackend(backend),\ | |
100 | m_StopRequested(false),\ | |
101 | m_FlushRequested(false)\ | |
102 | {\ | |
103 | if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\ | |
104 | start_feeding_thread();\ | |
105 | } | |
106 | ||
107 | #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\ | |
108 | BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(n, data) | |
109 | ||
110 | #endif // BOOST_LOG_DOXYGEN_PASS | |
111 | ||
112 | /*! | |
113 | * \brief Asynchronous logging sink frontend | |
114 | * | |
115 | * The frontend starts a separate thread on construction. All logging records are passed | |
116 | * to the backend in this dedicated thread only. | |
117 | */ | |
118 | template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue > | |
119 | class asynchronous_sink : | |
120 | public aux::make_sink_frontend_base< SinkBackendT >::type, | |
121 | public QueueingStrategyT | |
122 | { | |
123 | typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type; | |
124 | typedef QueueingStrategyT queue_base_type; | |
125 | ||
126 | private: | |
127 | //! Backend synchronization mutex type | |
128 | typedef boost::recursive_mutex backend_mutex_type; | |
129 | //! Frontend synchronization mutex type | |
130 | typedef typename base_type::mutex_type frontend_mutex_type; | |
131 | ||
132 | //! A scope guard that implements thread ID management | |
133 | class scoped_thread_id | |
134 | { | |
135 | private: | |
136 | frontend_mutex_type& m_Mutex; | |
137 | condition_variable_any& m_Cond; | |
138 | thread::id& m_ThreadID; | |
139 | boost::atomic< bool >& m_StopRequested; | |
140 | ||
141 | public: | |
142 | //! Initializing constructor | |
143 | scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr) | |
144 | : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr) | |
145 | { | |
146 | lock_guard< frontend_mutex_type > lock(m_Mutex); | |
147 | if (m_ThreadID != thread::id()) | |
148 | BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread"); | |
149 | m_ThreadID = this_thread::get_id(); | |
150 | } | |
151 | //! Initializing constructor | |
152 | scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr) | |
153 | : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr) | |
154 | { | |
155 | unique_lock< frontend_mutex_type > lock(move(l)); | |
156 | if (m_ThreadID != thread::id()) | |
157 | BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread"); | |
158 | m_ThreadID = this_thread::get_id(); | |
159 | } | |
160 | //! Destructor | |
161 | ~scoped_thread_id() | |
162 | { | |
163 | try | |
164 | { | |
165 | lock_guard< frontend_mutex_type > lock(m_Mutex); | |
166 | m_StopRequested.store(false, boost::memory_order_release); | |
167 | m_ThreadID = thread::id(); | |
168 | m_Cond.notify_all(); | |
169 | } | |
170 | catch (...) | |
171 | { | |
172 | } | |
173 | } | |
174 | ||
175 | BOOST_DELETED_FUNCTION(scoped_thread_id(scoped_thread_id const&)) | |
176 | BOOST_DELETED_FUNCTION(scoped_thread_id& operator= (scoped_thread_id const&)) | |
177 | }; | |
178 | ||
179 | //! A scope guard that resets a flag on destructor | |
180 | class scoped_flag | |
181 | { | |
182 | private: | |
183 | frontend_mutex_type& m_Mutex; | |
184 | condition_variable_any& m_Cond; | |
185 | boost::atomic< bool >& m_Flag; | |
186 | ||
187 | public: | |
188 | explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, boost::atomic< bool >& f) : | |
189 | m_Mutex(mut), m_Cond(cond), m_Flag(f) | |
190 | { | |
191 | } | |
192 | ~scoped_flag() | |
193 | { | |
194 | try | |
195 | { | |
196 | lock_guard< frontend_mutex_type > lock(m_Mutex); | |
197 | m_Flag.store(false, boost::memory_order_release); | |
198 | m_Cond.notify_all(); | |
199 | } | |
200 | catch (...) | |
201 | { | |
202 | } | |
203 | } | |
204 | ||
205 | BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&)) | |
206 | BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&)) | |
207 | }; | |
208 | ||
209 | public: | |
210 | //! Sink implementation type | |
211 | typedef SinkBackendT sink_backend_type; | |
212 | //! \cond | |
213 | BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met"); | |
214 | //! \endcond | |
215 | ||
216 | #ifndef BOOST_LOG_DOXYGEN_PASS | |
217 | ||
218 | //! A pointer type that locks the backend until it's destroyed | |
219 | typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr; | |
220 | ||
221 | #else // BOOST_LOG_DOXYGEN_PASS | |
222 | ||
223 | //! A pointer type that locks the backend until it's destroyed | |
224 | typedef implementation_defined locked_backend_ptr; | |
225 | ||
226 | #endif // BOOST_LOG_DOXYGEN_PASS | |
227 | ||
228 | private: | |
229 | //! Synchronization mutex | |
230 | backend_mutex_type m_BackendMutex; | |
231 | //! Pointer to the backend | |
232 | const shared_ptr< sink_backend_type > m_pBackend; | |
233 | ||
234 | //! Dedicated record feeding thread | |
235 | thread m_DedicatedFeedingThread; | |
236 | //! Feeding thread ID | |
237 | thread::id m_FeedingThreadID; | |
238 | //! Condition variable to implement blocking operations | |
239 | condition_variable_any m_BlockCond; | |
240 | ||
241 | //! The flag indicates that the feeding loop has to be stopped | |
242 | boost::atomic< bool > m_StopRequested; | |
243 | //! The flag indicates that queue flush has been requested | |
244 | boost::atomic< bool > m_FlushRequested; | |
245 | ||
246 | public: | |
247 | /*! | |
248 | * Default constructor. Constructs the sink backend instance. | |
249 | * Requires the backend to be default-constructible. | |
250 | * | |
251 | * \param start_thread If \c true, the frontend creates a thread to feed | |
252 | * log records to the backend. Otherwise no thread is | |
253 | * started and it is assumed that the user will call | |
254 | * either \c run or \c feed_records himself. | |
255 | */ | |
256 | asynchronous_sink(bool start_thread = true) : | |
257 | base_type(true), | |
258 | m_pBackend(boost::make_shared< sink_backend_type >()), | |
259 | m_StopRequested(false), | |
260 | m_FlushRequested(false) | |
261 | { | |
262 | if (start_thread) | |
263 | start_feeding_thread(); | |
264 | } | |
265 | /*! | |
266 | * Constructor attaches user-constructed backend instance | |
267 | * | |
268 | * \param backend Pointer to the backend instance. | |
269 | * \param start_thread If \c true, the frontend creates a thread to feed | |
270 | * log records to the backend. Otherwise no thread is | |
271 | * started and it is assumed that the user will call | |
272 | * either \c run or \c feed_records himself. | |
273 | * | |
274 | * \pre \a backend is not \c NULL. | |
275 | */ | |
276 | explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) : | |
277 | base_type(true), | |
278 | m_pBackend(backend), | |
279 | m_StopRequested(false), | |
280 | m_FlushRequested(false) | |
281 | { | |
282 | if (start_thread) | |
283 | start_feeding_thread(); | |
284 | } | |
285 | ||
286 | /*! | |
287 | * Constructor that passes arbitrary named parameters to the interprocess sink backend constructor. | |
288 | * Refer to the backend documentation for the list of supported parameters. | |
289 | * | |
290 | * The frontend uses the following named parameters: | |
291 | * | |
292 | * \li start_thread - If \c true, the frontend creates a thread to feed | |
293 | * log records to the backend. Otherwise no thread is | |
294 | * started and it is assumed that the user will call | |
295 | * either \c run or \c feed_records himself. | |
296 | */ | |
297 | #ifndef BOOST_LOG_DOXYGEN_PASS | |
298 | BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~) | |
299 | #else | |
300 | template< typename... Args > | |
301 | explicit asynchronous_sink(Args&&... args); | |
302 | #endif | |
303 | ||
304 | /*! | |
305 | * Destructor. Implicitly stops the dedicated feeding thread, if one is running. | |
306 | */ | |
307 | ~asynchronous_sink() BOOST_NOEXCEPT | |
308 | { | |
309 | try | |
310 | { | |
311 | boost::this_thread::disable_interruption no_interrupts; | |
312 | stop(); | |
313 | } | |
314 | catch (...) | |
315 | { | |
316 | std::terminate(); | |
317 | } | |
318 | } | |
319 | ||
320 | /*! | |
321 | * Locking accessor to the attached backend | |
322 | */ | |
323 | locked_backend_ptr locked_backend() | |
324 | { | |
325 | return locked_backend_ptr(m_pBackend, m_BackendMutex); | |
326 | } | |
327 | ||
328 | /*! | |
329 | * Enqueues the log record to the backend | |
330 | */ | |
331 | void consume(record_view const& rec) | |
332 | { | |
333 | if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire))) | |
334 | { | |
335 | unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); | |
336 | // Wait until flush is done | |
337 | while (m_FlushRequested.load(boost::memory_order_acquire)) | |
338 | m_BlockCond.wait(lock); | |
339 | } | |
340 | queue_base_type::enqueue(rec); | |
341 | } | |
342 | ||
343 | /*! | |
344 | * The method attempts to pass logging record to the backend | |
345 | */ | |
346 | bool try_consume(record_view const& rec) | |
347 | { | |
348 | if (!m_FlushRequested.load(boost::memory_order_acquire)) | |
349 | { | |
350 | return queue_base_type::try_enqueue(rec); | |
351 | } | |
352 | else | |
353 | return false; | |
354 | } | |
355 | ||
356 | /*! | |
357 | * The method starts record feeding loop and effectively blocks until either of this happens: | |
358 | * | |
359 | * \li the thread is interrupted due to either standard thread interruption or a call to \c stop | |
360 | * \li an exception is thrown while processing a log record in the backend, and the exception is | |
361 | * not terminated by the exception handler, if one is installed | |
362 | * | |
363 | * \pre The sink frontend must be constructed without spawning a dedicated thread | |
364 | */ | |
365 | void run() | |
366 | { | |
367 | // First check that no other thread is running | |
368 | scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested); | |
369 | ||
370 | // Now start the feeding loop | |
371 | while (true) | |
372 | { | |
373 | do_feed_records(); | |
374 | if (!m_StopRequested.load(boost::memory_order_acquire)) | |
375 | { | |
376 | // Block until new record is available | |
377 | record_view rec; | |
378 | if (queue_base_type::dequeue_ready(rec)) | |
379 | base_type::feed_record(rec, m_BackendMutex, *m_pBackend); | |
380 | } | |
381 | else | |
382 | break; | |
383 | } | |
384 | } | |
385 | ||
386 | /*! | |
387 | * The method softly interrupts record feeding loop. This method must be called when the \c run | |
388 | * method execution has to be interrupted. Unlike regular thread interruption, calling | |
389 | * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend | |
390 | * will attempt to finish its business with the record in progress and return afterwards. | |
391 | * This method can be called either if the sink was created with a dedicated thread, | |
392 | * or if the feeding loop was initiated by user. | |
393 | * | |
394 | * \note Returning from this method does not guarantee that there are no records left buffered | |
395 | * in the sink frontend. It is possible that log records keep coming during and after this | |
396 | * method is called. At some point of execution of this method log records stop being processed, | |
397 | * and all records that come after this point are put into the queue. These records will be | |
398 | * processed upon further calls to \c run or \c feed_records. | |
399 | */ | |
400 | void stop() | |
401 | { | |
402 | unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); | |
403 | if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable()) | |
404 | { | |
405 | try | |
406 | { | |
407 | m_StopRequested.store(true, boost::memory_order_release); | |
408 | queue_base_type::interrupt_dequeue(); | |
409 | while (m_StopRequested.load(boost::memory_order_acquire)) | |
410 | m_BlockCond.wait(lock); | |
411 | } | |
412 | catch (...) | |
413 | { | |
414 | m_StopRequested.store(false, boost::memory_order_release); | |
415 | throw; | |
416 | } | |
417 | ||
418 | lock.unlock(); | |
419 | m_DedicatedFeedingThread.join(); | |
420 | } | |
421 | } | |
422 | ||
423 | /*! | |
424 | * The method feeds log records that may have been buffered to the backend and returns | |
425 | * | |
426 | * \pre The sink frontend must be constructed without spawning a dedicated thread | |
427 | */ | |
428 | void feed_records() | |
429 | { | |
430 | // First check that no other thread is running | |
431 | scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested); | |
432 | ||
433 | // Now start the feeding loop | |
434 | do_feed_records(); | |
435 | } | |
436 | ||
437 | /*! | |
438 | * The method feeds all log records that may have been buffered to the backend and returns. | |
439 | * Unlike \c feed_records, in case of ordering queueing the method also feeds records | |
440 | * that were enqueued during the ordering window, attempting to empty the queue completely. | |
441 | */ | |
442 | void flush() | |
443 | { | |
444 | unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); | |
445 | if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable()) | |
446 | { | |
447 | // There is already a thread feeding records, let it do the job | |
448 | m_FlushRequested.store(true, boost::memory_order_release); | |
449 | queue_base_type::interrupt_dequeue(); | |
450 | while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire)) | |
451 | m_BlockCond.wait(lock); | |
452 | ||
453 | // The condition may have been signalled when the feeding thread was finishing. | |
454 | // In that case records may not have been flushed, and we do the flush ourselves. | |
455 | if (m_FeedingThreadID != thread::id()) | |
456 | return; | |
457 | } | |
458 | ||
459 | m_FlushRequested.store(true, boost::memory_order_release); | |
460 | ||
461 | // Flush records ourselves. The guard releases the lock. | |
462 | scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested); | |
463 | ||
464 | do_feed_records(); | |
465 | } | |
466 | ||
467 | private: | |
468 | #ifndef BOOST_LOG_DOXYGEN_PASS | |
469 | //! The method spawns record feeding thread | |
470 | void start_feeding_thread() | |
471 | { | |
472 | boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread); | |
473 | } | |
474 | ||
475 | //! The record feeding loop | |
476 | void do_feed_records() | |
477 | { | |
478 | while (!m_StopRequested.load(boost::memory_order_acquire)) | |
479 | { | |
480 | record_view rec; | |
481 | bool dequeued = false; | |
482 | if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire))) | |
483 | dequeued = queue_base_type::try_dequeue_ready(rec); | |
484 | else | |
485 | dequeued = queue_base_type::try_dequeue(rec); | |
486 | ||
487 | if (dequeued) | |
488 | base_type::feed_record(rec, m_BackendMutex, *m_pBackend); | |
489 | else | |
490 | break; | |
491 | } | |
492 | ||
493 | if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire))) | |
494 | { | |
495 | scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested); | |
496 | base_type::flush_backend(m_BackendMutex, *m_pBackend); | |
497 | } | |
498 | } | |
499 | #endif // BOOST_LOG_DOXYGEN_PASS | |
500 | }; | |
501 | ||
502 | #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1 | |
503 | #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N | |
504 | #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL | |
505 | ||
506 | } // namespace sinks | |
507 | ||
508 | BOOST_LOG_CLOSE_NAMESPACE // namespace log | |
509 | ||
510 | } // namespace boost | |
511 | ||
512 | #include <boost/log/detail/footer.hpp> | |
513 | ||
514 | #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ |