]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | /* |
2 | * Copyright Andrey Semashev 2007 - 2015. | |
3 | * Distributed under the Boost Software License, Version 1.0. | |
4 | * (See accompanying file LICENSE_1_0.txt or copy at | |
5 | * http://www.boost.org/LICENSE_1_0.txt) | |
6 | */ | |
7 | /*! | |
8 | * \file unbounded_ordering_queue.hpp | |
9 | * \author Andrey Semashev | |
10 | * \date 24.07.2011 | |
11 | * | |
12 | * The header contains implementation of unbounded ordering record queueing strategy for | |
13 | * the asynchronous sink frontend. | |
14 | */ | |
15 | ||
16 | #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ | |
17 | #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ | |
18 | ||
19 | #include <boost/log/detail/config.hpp> | |
20 | ||
21 | #ifdef BOOST_HAS_PRAGMA_ONCE | |
22 | #pragma once | |
23 | #endif | |
24 | ||
25 | #if defined(BOOST_LOG_NO_THREADS) | |
26 | #error Boost.Log: This header content is only supported in multithreaded environment | |
27 | #endif | |
28 | ||
29 | #include <queue> | |
30 | #include <vector> | |
31 | #include <boost/cstdint.hpp> | |
32 | #include <boost/thread/locks.hpp> | |
33 | #include <boost/thread/mutex.hpp> | |
34 | #include <boost/thread/condition_variable.hpp> | |
35 | #include <boost/thread/thread_time.hpp> | |
36 | #include <boost/date_time/posix_time/posix_time_types.hpp> | |
37 | #include <boost/log/detail/timestamp.hpp> | |
38 | #include <boost/log/detail/enqueued_record.hpp> | |
39 | #include <boost/log/keywords/order.hpp> | |
40 | #include <boost/log/keywords/ordering_window.hpp> | |
41 | #include <boost/log/core/record_view.hpp> | |
42 | #include <boost/log/detail/header.hpp> | |
43 | ||
44 | namespace boost { | |
45 | ||
46 | BOOST_LOG_OPEN_NAMESPACE | |
47 | ||
48 | namespace sinks { | |
49 | ||
50 | /*! | |
51 | * \brief Unbounded ordering log record queueing strategy | |
52 | * | |
53 | * The \c unbounded_ordering_queue class is intended to be used with | |
54 | * the \c asynchronous_sink frontend as a log record queueing strategy. | |
55 | * | |
56 | * This strategy provides the following properties to the record queueing mechanism: | |
57 | * | |
58 | * \li The queue has no size limits. | |
59 | * \li The queue has a fixed latency window. This means that each log record put | |
60 | * into the queue will normally not be dequeued for a certain period of time. | |
61 | * \li The queue performs stable record ordering within the latency window. | |
62 | * The ordering predicate can be specified in the \c OrderT template parameter. | |
63 | * | |
64 | * Since this queue has no size limits, it may grow uncontrollably if sink backends | |
65 | * dequeue log records not fast enough. When this is an issue, it is recommended to | |
66 | * use one of the bounded strategies. | |
67 | */ | |
68 | template< typename OrderT > | |
69 | class unbounded_ordering_queue | |
70 | { | |
71 | private: | |
72 | typedef boost::mutex mutex_type; | |
73 | typedef sinks::aux::enqueued_record enqueued_record; | |
74 | ||
75 | typedef std::priority_queue< | |
76 | enqueued_record, | |
77 | std::vector< enqueued_record >, | |
78 | enqueued_record::order< OrderT > | |
79 | > queue_type; | |
80 | ||
81 | private: | |
82 | //! Ordering window duration, in milliseconds | |
83 | const uint64_t m_ordering_window; | |
84 | //! Synchronization mutex | |
85 | mutex_type m_mutex; | |
86 | //! Condition for blocking | |
87 | condition_variable m_cond; | |
88 | //! Thread-safe queue | |
89 | queue_type m_queue; | |
90 | //! Interruption flag | |
91 | bool m_interruption_requested; | |
92 | ||
93 | public: | |
94 | /*! | |
95 | * Returns ordering window size specified during initialization | |
96 | */ | |
97 | posix_time::time_duration get_ordering_window() const | |
98 | { | |
99 | return posix_time::milliseconds(m_ordering_window); | |
100 | } | |
101 | ||
102 | /*! | |
103 | * Returns default ordering window size. | |
104 | * The default window size is specific to the operating system thread scheduling mechanism. | |
105 | */ | |
106 | static posix_time::time_duration get_default_ordering_window() | |
107 | { | |
108 | // The main idea behind this parameter is that the ordering window should be large enough | |
109 | // to allow the frontend to order records from different threads on an attribute | |
110 | // that contains system time. Thus this value should be: | |
111 | // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS. | |
112 | // For instance, on Windows it defaults to around 15-16 ms. | |
113 | // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to | |
114 | // switch threads on any known OS. It can be tuned for other platforms as needed. | |
115 | return posix_time::milliseconds(30); | |
116 | } | |
117 | ||
118 | protected: | |
119 | //! Initializing constructor | |
120 | template< typename ArgsT > | |
121 | explicit unbounded_ordering_queue(ArgsT const& args) : | |
122 | m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()), | |
123 | m_queue(args[keywords::order]), | |
124 | m_interruption_requested(false) | |
125 | { | |
126 | } | |
127 | ||
128 | //! Enqueues log record to the queue | |
129 | void enqueue(record_view const& rec) | |
130 | { | |
131 | lock_guard< mutex_type > lock(m_mutex); | |
132 | enqueue_unlocked(rec); | |
133 | } | |
134 | ||
135 | //! Attempts to enqueue log record to the queue | |
136 | bool try_enqueue(record_view const& rec) | |
137 | { | |
138 | unique_lock< mutex_type > lock(m_mutex, try_to_lock); | |
139 | if (lock.owns_lock()) | |
140 | { | |
141 | enqueue_unlocked(rec); | |
142 | return true; | |
143 | } | |
144 | else | |
145 | return false; | |
146 | } | |
147 | ||
148 | //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed | |
149 | bool try_dequeue_ready(record_view& rec) | |
150 | { | |
151 | lock_guard< mutex_type > lock(m_mutex); | |
152 | if (!m_queue.empty()) | |
153 | { | |
154 | const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); | |
155 | enqueued_record const& elem = m_queue.top(); | |
156 | if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window) | |
157 | { | |
158 | // We got a new element | |
159 | rec = elem.m_record; | |
160 | m_queue.pop(); | |
161 | return true; | |
162 | } | |
163 | } | |
164 | ||
165 | return false; | |
166 | } | |
167 | ||
168 | //! Attempts to dequeue log record from the queue, does not block. | |
169 | bool try_dequeue(record_view& rec) | |
170 | { | |
171 | lock_guard< mutex_type > lock(m_mutex); | |
172 | if (!m_queue.empty()) | |
173 | { | |
174 | enqueued_record const& elem = m_queue.top(); | |
175 | rec = elem.m_record; | |
176 | m_queue.pop(); | |
177 | return true; | |
178 | } | |
179 | ||
180 | return false; | |
181 | } | |
182 | ||
183 | //! Dequeues log record from the queue, blocks if no log records are ready to be processed | |
184 | bool dequeue_ready(record_view& rec) | |
185 | { | |
186 | unique_lock< mutex_type > lock(m_mutex); | |
187 | while (!m_interruption_requested) | |
188 | { | |
189 | if (!m_queue.empty()) | |
190 | { | |
191 | const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); | |
192 | enqueued_record const& elem = m_queue.top(); | |
193 | const uint64_t difference = (now - elem.m_timestamp).milliseconds(); | |
194 | if (difference >= m_ordering_window) | |
195 | { | |
196 | // We got a new element | |
197 | rec = elem.m_record; | |
198 | m_queue.pop(); | |
199 | return true; | |
200 | } | |
201 | else | |
202 | { | |
203 | // Wait until the element becomes ready to be processed | |
204 | m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference)); | |
205 | } | |
206 | } | |
207 | else | |
208 | { | |
209 | // Wait for an element to come | |
210 | m_cond.wait(lock); | |
211 | } | |
212 | } | |
213 | m_interruption_requested = false; | |
214 | ||
215 | return false; | |
216 | } | |
217 | ||
218 | //! Wakes a thread possibly blocked in the \c dequeue method | |
219 | void interrupt_dequeue() | |
220 | { | |
221 | lock_guard< mutex_type > lock(m_mutex); | |
222 | m_interruption_requested = true; | |
223 | m_cond.notify_one(); | |
224 | } | |
225 | ||
226 | private: | |
227 | //! Enqueues a log record | |
228 | void enqueue_unlocked(record_view const& rec) | |
229 | { | |
230 | const bool was_empty = m_queue.empty(); | |
231 | m_queue.push(enqueued_record(rec)); | |
232 | if (was_empty) | |
233 | m_cond.notify_one(); | |
234 | } | |
235 | }; | |
236 | ||
237 | } // namespace sinks | |
238 | ||
239 | BOOST_LOG_CLOSE_NAMESPACE // namespace log | |
240 | ||
241 | } // namespace boost | |
242 | ||
243 | #include <boost/log/detail/footer.hpp> | |
244 | ||
245 | #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ |