]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/log/include/boost/log/sinks/bounded_ordering_queue.hpp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / boost / libs / log / include / boost / log / sinks / bounded_ordering_queue.hpp
CommitLineData
7c673cae
FG
1/*
2 * Copyright Andrey Semashev 2007 - 2015.
3 * Distributed under the Boost Software License, Version 1.0.
4 * (See accompanying file LICENSE_1_0.txt or copy at
5 * http://www.boost.org/LICENSE_1_0.txt)
6 */
7/*!
8 * \file bounded_ordering_queue.hpp
9 * \author Andrey Semashev
10 * \date 06.01.2012
11 *
12 * The header contains implementation of bounded ordering queueing strategy for
13 * the asynchronous sink frontend.
14 */
15
16#ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
17#define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
18
19#include <boost/log/detail/config.hpp>
20
21#ifdef BOOST_HAS_PRAGMA_ONCE
22#pragma once
23#endif
24
25#if defined(BOOST_LOG_NO_THREADS)
26#error Boost.Log: This header content is only supported in multithreaded environment
27#endif
28
29#include <cstddef>
30#include <queue>
31#include <vector>
32#include <boost/cstdint.hpp>
33#include <boost/thread/locks.hpp>
34#include <boost/thread/mutex.hpp>
35#include <boost/thread/condition_variable.hpp>
36#include <boost/thread/thread_time.hpp>
37#include <boost/date_time/posix_time/posix_time_types.hpp>
38#include <boost/log/detail/timestamp.hpp>
39#include <boost/log/detail/enqueued_record.hpp>
40#include <boost/log/keywords/order.hpp>
41#include <boost/log/keywords/ordering_window.hpp>
42#include <boost/log/core/record_view.hpp>
43#include <boost/log/detail/header.hpp>
44
45namespace boost {
46
47BOOST_LOG_OPEN_NAMESPACE
48
49namespace sinks {
50
51/*!
52 * \brief Bounded ordering log record queueing strategy
53 *
54 * The \c bounded_ordering_queue class is intended to be used with
55 * the \c asynchronous_sink frontend as a log record queueing strategy.
56 *
57 * This strategy provides the following properties to the record queueing mechanism:
58 *
59 * \li The queue has limited capacity specified by the \c MaxQueueSizeV template parameter.
60 * \li Upon reaching the size limit, the queue invokes the overflow handling strategy
61 * specified in the \c OverflowStrategyT template parameter to handle the situation.
62 * The library provides overflow handling strategies for most common cases:
63 * \c drop_on_overflow will silently discard the log record, and \c block_on_overflow
64 * will put the enqueueing thread to wait until there is space in the queue.
65 * \li The queue has a fixed latency window. This means that each log record put
66 * into the queue will normally not be dequeued for a certain period of time.
67 * \li The queue performs stable record ordering within the latency window.
68 * The ordering predicate can be specified in the \c OrderT template parameter.
69 */
70template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT >
71class bounded_ordering_queue :
72 private OverflowStrategyT
73{
74private:
75 typedef OverflowStrategyT overflow_strategy;
76 typedef boost::mutex mutex_type;
77 typedef sinks::aux::enqueued_record enqueued_record;
78
79 typedef std::priority_queue<
80 enqueued_record,
81 std::vector< enqueued_record >,
82 enqueued_record::order< OrderT >
83 > queue_type;
84
85private:
86 //! Ordering window duration, in milliseconds
87 const uint64_t m_ordering_window;
88 //! Synchronization primitive
89 mutex_type m_mutex;
90 //! Condition to block the consuming thread on
91 condition_variable m_cond;
92 //! Log record queue
93 queue_type m_queue;
94 //! Interruption flag
95 bool m_interruption_requested;
96
97public:
98 /*!
99 * Returns ordering window size specified during initialization
100 */
101 posix_time::time_duration get_ordering_window() const
102 {
103 return posix_time::milliseconds(m_ordering_window);
104 }
105
106 /*!
107 * Returns default ordering window size.
108 * The default window size is specific to the operating system thread scheduling mechanism.
109 */
110 static posix_time::time_duration get_default_ordering_window()
111 {
112 // The main idea behind this parameter is that the ordering window should be large enough
113 // to allow the frontend to order records from different threads on an attribute
114 // that contains system time. Thus this value should be:
115 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
116 // For instance, on Windows it defaults to around 15-16 ms.
117 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
118 // switch threads on any known OS. It can be tuned for other platforms as needed.
119 return posix_time::milliseconds(30);
120 }
121
122protected:
123 //! Initializing constructor
124 template< typename ArgsT >
125 explicit bounded_ordering_queue(ArgsT const& args) :
126 m_ordering_window(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
127 m_queue(args[keywords::order]),
128 m_interruption_requested(false)
129 {
130 }
131
132 //! Enqueues log record to the queue
133 void enqueue(record_view const& rec)
134 {
135 unique_lock< mutex_type > lock(m_mutex);
136 std::size_t size = m_queue.size();
137 for (; size >= MaxQueueSizeV; size = m_queue.size())
138 {
139 if (!overflow_strategy::on_overflow(rec, lock))
140 return;
141 }
142
143 m_queue.push(enqueued_record(rec));
144 if (size == 0)
145 m_cond.notify_one();
146 }
147
148 //! Attempts to enqueue log record to the queue
149 bool try_enqueue(record_view const& rec)
150 {
151 unique_lock< mutex_type > lock(m_mutex, try_to_lock);
152 if (lock.owns_lock())
153 {
154 const std::size_t size = m_queue.size();
155
156 // Do not invoke the bounding strategy in case of overflow as it may block
157 if (size < MaxQueueSizeV)
158 {
159 m_queue.push(enqueued_record(rec));
160 if (size == 0)
161 m_cond.notify_one();
162 return true;
163 }
164 }
165
166 return false;
167 }
168
169 //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty
170 bool try_dequeue_ready(record_view& rec)
171 {
172 lock_guard< mutex_type > lock(m_mutex);
173 const std::size_t size = m_queue.size();
174 if (size > 0)
175 {
176 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
177 enqueued_record const& elem = m_queue.top();
178 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
179 {
180 // We got a new element
181 rec = elem.m_record;
182 m_queue.pop();
183 overflow_strategy::on_queue_space_available();
184 return true;
185 }
186 }
187
188 return false;
189 }
190
191 //! Attempts to dequeue log record from the queue, does not block if the queue is empty
192 bool try_dequeue(record_view& rec)
193 {
194 lock_guard< mutex_type > lock(m_mutex);
195 const std::size_t size = m_queue.size();
196 if (size > 0)
197 {
198 enqueued_record const& elem = m_queue.top();
199 rec = elem.m_record;
200 m_queue.pop();
201 overflow_strategy::on_queue_space_available();
202 return true;
203 }
204
205 return false;
206 }
207
208 //! Dequeues log record from the queue, blocks if the queue is empty
209 bool dequeue_ready(record_view& rec)
210 {
211 unique_lock< mutex_type > lock(m_mutex);
212
213 while (!m_interruption_requested)
214 {
215 const std::size_t size = m_queue.size();
216 if (size > 0)
217 {
218 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
219 enqueued_record const& elem = m_queue.top();
220 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
221 if (difference >= m_ordering_window)
222 {
223 rec = elem.m_record;
224 m_queue.pop();
225 overflow_strategy::on_queue_space_available();
226 return true;
227 }
228 else
229 {
230 // Wait until the element becomes ready to be processed
231 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
232 }
233 }
234 else
235 {
236 m_cond.wait(lock);
237 }
238 }
239 m_interruption_requested = false;
240
241 return false;
242 }
243
244 //! Wakes a thread possibly blocked in the \c dequeue method
245 void interrupt_dequeue()
246 {
247 lock_guard< mutex_type > lock(m_mutex);
248 m_interruption_requested = true;
249 overflow_strategy::interrupt();
250 m_cond.notify_one();
251 }
252};
253
254} // namespace sinks
255
256BOOST_LOG_CLOSE_NAMESPACE // namespace log
257
258} // namespace boost
259
260#include <boost/log/detail/footer.hpp>
261
262#endif // BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_