]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | /* |
2 | * Copyright Andrey Semashev 2007 - 2015. | |
3 | * Distributed under the Boost Software License, Version 1.0. | |
4 | * (See accompanying file LICENSE_1_0.txt or copy at | |
5 | * http://www.boost.org/LICENSE_1_0.txt) | |
6 | */ | |
7 | /*! | |
8 | * \file bounded_ordering_queue.hpp | |
9 | * \author Andrey Semashev | |
10 | * \date 06.01.2012 | |
11 | * | |
12 | * The header contains implementation of bounded ordering queueing strategy for | |
13 | * the asynchronous sink frontend. | |
14 | */ | |
15 | ||
16 | #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ | |
17 | #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ | |
18 | ||
19 | #include <boost/log/detail/config.hpp> | |
20 | ||
21 | #ifdef BOOST_HAS_PRAGMA_ONCE | |
22 | #pragma once | |
23 | #endif | |
24 | ||
25 | #if defined(BOOST_LOG_NO_THREADS) | |
26 | #error Boost.Log: This header content is only supported in multithreaded environment | |
27 | #endif | |
28 | ||
29 | #include <cstddef> | |
30 | #include <queue> | |
31 | #include <vector> | |
32 | #include <boost/cstdint.hpp> | |
33 | #include <boost/thread/locks.hpp> | |
34 | #include <boost/thread/mutex.hpp> | |
35 | #include <boost/thread/condition_variable.hpp> | |
36 | #include <boost/thread/thread_time.hpp> | |
37 | #include <boost/date_time/posix_time/posix_time_types.hpp> | |
38 | #include <boost/log/detail/timestamp.hpp> | |
39 | #include <boost/log/detail/enqueued_record.hpp> | |
40 | #include <boost/log/keywords/order.hpp> | |
41 | #include <boost/log/keywords/ordering_window.hpp> | |
42 | #include <boost/log/core/record_view.hpp> | |
43 | #include <boost/log/detail/header.hpp> | |
44 | ||
45 | namespace boost { | |
46 | ||
47 | BOOST_LOG_OPEN_NAMESPACE | |
48 | ||
49 | namespace sinks { | |
50 | ||
51 | /*! | |
52 | * \brief Bounded ordering log record queueing strategy | |
53 | * | |
54 | * The \c bounded_ordering_queue class is intended to be used with | |
55 | * the \c asynchronous_sink frontend as a log record queueing strategy. | |
56 | * | |
57 | * This strategy provides the following properties to the record queueing mechanism: | |
58 | * | |
59 | * \li The queue has limited capacity specified by the \c MaxQueueSizeV template parameter. | |
60 | * \li Upon reaching the size limit, the queue invokes the overflow handling strategy | |
61 | * specified in the \c OverflowStrategyT template parameter to handle the situation. | |
62 | * The library provides overflow handling strategies for most common cases: | |
63 | * \c drop_on_overflow will silently discard the log record, and \c block_on_overflow | |
64 | * will put the enqueueing thread to wait until there is space in the queue. | |
65 | * \li The queue has a fixed latency window. This means that each log record put | |
66 | * into the queue will normally not be dequeued for a certain period of time. | |
67 | * \li The queue performs stable record ordering within the latency window. | |
68 | * The ordering predicate can be specified in the \c OrderT template parameter. | |
69 | */ | |
70 | template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT > | |
71 | class bounded_ordering_queue : | |
72 | private OverflowStrategyT | |
73 | { | |
74 | private: | |
75 | typedef OverflowStrategyT overflow_strategy; | |
76 | typedef boost::mutex mutex_type; | |
77 | typedef sinks::aux::enqueued_record enqueued_record; | |
78 | ||
79 | typedef std::priority_queue< | |
80 | enqueued_record, | |
81 | std::vector< enqueued_record >, | |
82 | enqueued_record::order< OrderT > | |
83 | > queue_type; | |
84 | ||
85 | private: | |
86 | //! Ordering window duration, in milliseconds | |
87 | const uint64_t m_ordering_window; | |
88 | //! Synchronization primitive | |
89 | mutex_type m_mutex; | |
90 | //! Condition to block the consuming thread on | |
91 | condition_variable m_cond; | |
92 | //! Log record queue | |
93 | queue_type m_queue; | |
94 | //! Interruption flag | |
95 | bool m_interruption_requested; | |
96 | ||
97 | public: | |
98 | /*! | |
99 | * Returns ordering window size specified during initialization | |
100 | */ | |
101 | posix_time::time_duration get_ordering_window() const | |
102 | { | |
103 | return posix_time::milliseconds(m_ordering_window); | |
104 | } | |
105 | ||
106 | /*! | |
107 | * Returns default ordering window size. | |
108 | * The default window size is specific to the operating system thread scheduling mechanism. | |
109 | */ | |
110 | static posix_time::time_duration get_default_ordering_window() | |
111 | { | |
112 | // The main idea behind this parameter is that the ordering window should be large enough | |
113 | // to allow the frontend to order records from different threads on an attribute | |
114 | // that contains system time. Thus this value should be: | |
115 | // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS. | |
116 | // For instance, on Windows it defaults to around 15-16 ms. | |
117 | // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to | |
118 | // switch threads on any known OS. It can be tuned for other platforms as needed. | |
119 | return posix_time::milliseconds(30); | |
120 | } | |
121 | ||
122 | protected: | |
123 | //! Initializing constructor | |
124 | template< typename ArgsT > | |
125 | explicit bounded_ordering_queue(ArgsT const& args) : | |
126 | m_ordering_window(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window].total_milliseconds()), | |
127 | m_queue(args[keywords::order]), | |
128 | m_interruption_requested(false) | |
129 | { | |
130 | } | |
131 | ||
132 | //! Enqueues log record to the queue | |
133 | void enqueue(record_view const& rec) | |
134 | { | |
135 | unique_lock< mutex_type > lock(m_mutex); | |
136 | std::size_t size = m_queue.size(); | |
137 | for (; size >= MaxQueueSizeV; size = m_queue.size()) | |
138 | { | |
139 | if (!overflow_strategy::on_overflow(rec, lock)) | |
140 | return; | |
141 | } | |
142 | ||
143 | m_queue.push(enqueued_record(rec)); | |
144 | if (size == 0) | |
145 | m_cond.notify_one(); | |
146 | } | |
147 | ||
148 | //! Attempts to enqueue log record to the queue | |
149 | bool try_enqueue(record_view const& rec) | |
150 | { | |
151 | unique_lock< mutex_type > lock(m_mutex, try_to_lock); | |
152 | if (lock.owns_lock()) | |
153 | { | |
154 | const std::size_t size = m_queue.size(); | |
155 | ||
156 | // Do not invoke the bounding strategy in case of overflow as it may block | |
157 | if (size < MaxQueueSizeV) | |
158 | { | |
159 | m_queue.push(enqueued_record(rec)); | |
160 | if (size == 0) | |
161 | m_cond.notify_one(); | |
162 | return true; | |
163 | } | |
164 | } | |
165 | ||
166 | return false; | |
167 | } | |
168 | ||
169 | //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty | |
170 | bool try_dequeue_ready(record_view& rec) | |
171 | { | |
172 | lock_guard< mutex_type > lock(m_mutex); | |
173 | const std::size_t size = m_queue.size(); | |
174 | if (size > 0) | |
175 | { | |
176 | const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); | |
177 | enqueued_record const& elem = m_queue.top(); | |
178 | if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window) | |
179 | { | |
180 | // We got a new element | |
181 | rec = elem.m_record; | |
182 | m_queue.pop(); | |
183 | overflow_strategy::on_queue_space_available(); | |
184 | return true; | |
185 | } | |
186 | } | |
187 | ||
188 | return false; | |
189 | } | |
190 | ||
191 | //! Attempts to dequeue log record from the queue, does not block if the queue is empty | |
192 | bool try_dequeue(record_view& rec) | |
193 | { | |
194 | lock_guard< mutex_type > lock(m_mutex); | |
195 | const std::size_t size = m_queue.size(); | |
196 | if (size > 0) | |
197 | { | |
198 | enqueued_record const& elem = m_queue.top(); | |
199 | rec = elem.m_record; | |
200 | m_queue.pop(); | |
201 | overflow_strategy::on_queue_space_available(); | |
202 | return true; | |
203 | } | |
204 | ||
205 | return false; | |
206 | } | |
207 | ||
208 | //! Dequeues log record from the queue, blocks if the queue is empty | |
209 | bool dequeue_ready(record_view& rec) | |
210 | { | |
211 | unique_lock< mutex_type > lock(m_mutex); | |
212 | ||
213 | while (!m_interruption_requested) | |
214 | { | |
215 | const std::size_t size = m_queue.size(); | |
216 | if (size > 0) | |
217 | { | |
218 | const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); | |
219 | enqueued_record const& elem = m_queue.top(); | |
220 | const uint64_t difference = (now - elem.m_timestamp).milliseconds(); | |
221 | if (difference >= m_ordering_window) | |
222 | { | |
223 | rec = elem.m_record; | |
224 | m_queue.pop(); | |
225 | overflow_strategy::on_queue_space_available(); | |
226 | return true; | |
227 | } | |
228 | else | |
229 | { | |
230 | // Wait until the element becomes ready to be processed | |
231 | m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference)); | |
232 | } | |
233 | } | |
234 | else | |
235 | { | |
236 | m_cond.wait(lock); | |
237 | } | |
238 | } | |
239 | m_interruption_requested = false; | |
240 | ||
241 | return false; | |
242 | } | |
243 | ||
244 | //! Wakes a thread possibly blocked in the \c dequeue method | |
245 | void interrupt_dequeue() | |
246 | { | |
247 | lock_guard< mutex_type > lock(m_mutex); | |
248 | m_interruption_requested = true; | |
249 | overflow_strategy::interrupt(); | |
250 | m_cond.notify_one(); | |
251 | } | |
252 | }; | |
253 | ||
254 | } // namespace sinks | |
255 | ||
256 | BOOST_LOG_CLOSE_NAMESPACE // namespace log | |
257 | ||
258 | } // namespace boost | |
259 | ||
260 | #include <boost/log/detail/footer.hpp> | |
261 | ||
262 | #endif // BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ |