1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
10 #include "opentelemetry/sdk/common/atomic_unique_ptr.h"
11 #include "opentelemetry/sdk/common/circular_buffer_range.h"
12 #include "opentelemetry/version.h"
14 OPENTELEMETRY_BEGIN_NAMESPACE
20 * A lock-free circular buffer that supports multiple concurrent producers
21 * and a single consumer.
27 explicit CircularBuffer(size_t max_size
)
28 : data_
{new AtomicUniquePtr
<T
>[max_size
+ 1]}, capacity_
{max_size
+ 1}
32 * @return a range of the elements in the circular buffer
34 * Note: This method must only be called from the consumer thread.
36 CircularBufferRange
<const AtomicUniquePtr
<T
>> Peek() const noexcept
38 return const_cast<CircularBuffer
*>(this)->PeekImpl();
42 * Consume elements from the circular buffer's tail.
43 * @param n the number of elements to consume
44 * @param callback the callback to invoke with an AtomicUniquePtr to each
47 * Note: The callback must set the passed AtomicUniquePtr to null.
49 * Note: This method must only be called from the consumer thread.
51 template <class Callback
>
52 void Consume(size_t n
, Callback callback
) noexcept
54 assert(n
<= static_cast<size_t>(head_
- tail_
));
55 auto range
= PeekImpl().Take(n
);
56 static_assert(noexcept(callback(range
)), "callback not allowed to throw");
62 * Consume elements from the circular buffer's tail.
63 * @param n the number of elements to consume
65 * Note: This method must only be called from the consumer thread.
67 void Consume(size_t n
) noexcept
69 Consume(n
, [](CircularBufferRange
<AtomicUniquePtr
<T
>> &range
) noexcept
{
70 range
.ForEach([](AtomicUniquePtr
<T
> &ptr
) noexcept
{
78 * Adds an element into the circular buffer.
79 * @param ptr a pointer to the element to add
80 * @return true if the element was successfully added; false, otherwise.
82 bool Add(std::unique_ptr
<T
> &ptr
) noexcept
86 uint64_t tail
= tail_
;
87 uint64_t head
= head_
;
89 // The circular buffer is full, so return false.
90 if (head
- tail
>= capacity_
- 1)
95 uint64_t head_index
= head
% capacity_
;
96 if (data_
[head_index
].SwapIfNull(ptr
))
98 auto new_head
= head
+ 1;
99 auto expected_head
= head
;
100 if (head_
.compare_exchange_weak(expected_head
, new_head
, std::memory_order_release
,
101 std::memory_order_relaxed
))
103 // free the swapped out value
109 // If we reached this point (unlikely), it means that between the last
110 // iteration elements were added and then consumed from the circular
111 // buffer, so we undo the swap and attempt to add again.
112 data_
[head_index
].Swap(ptr
);
119 * Clear the circular buffer.
121 * Note: This method must only be called from the consumer thread.
123 void Clear() noexcept
{ Consume(size()); }
126 * @return the maximum number of bytes that can be stored in the buffer.
128 size_t max_size() const noexcept
{ return capacity_
- 1; }
131 * @return true if the buffer is empty.
133 bool empty() const noexcept
{ return head_
== tail_
; }
136 * @return the number of bytes stored in the circular buffer.
138 * Note: this method will only return a correct snapshot of the size if called
139 * from the consumer thread.
141 size_t size() const noexcept
143 uint64_t tail
= tail_
;
144 uint64_t head
= head_
;
145 assert(tail
<= head
);
150 * @return the number of elements consumed from the circular buffer.
152 uint64_t consumption_count() const noexcept
{ return tail_
; }
155 * @return the number of elements added to the circular buffer.
157 uint64_t production_count() const noexcept
{ return head_
; }
160 std::unique_ptr
<AtomicUniquePtr
<T
>[]> data_
;
162 std::atomic
<uint64_t> head_
{0};
163 std::atomic
<uint64_t> tail_
{0};
165 CircularBufferRange
<AtomicUniquePtr
<T
>> PeekImpl() noexcept
167 uint64_t tail_index
= tail_
% capacity_
;
168 uint64_t head_index
= head_
% capacity_
;
169 if (head_index
== tail_index
)
173 auto data
= data_
.get();
174 if (tail_index
< head_index
)
176 return CircularBufferRange
<AtomicUniquePtr
<T
>>{nostd::span
<AtomicUniquePtr
<T
>>{
177 data
+ tail_index
, static_cast<std::size_t>(head_index
- tail_index
)}};
179 return {nostd::span
<AtomicUniquePtr
<T
>>{data
+ tail_index
,
180 static_cast<std::size_t>(capacity_
- tail_index
)},
181 nostd::span
<AtomicUniquePtr
<T
>>{data
, static_cast<std::size_t>(head_index
)}};
184 } // namespace common
186 OPENTELEMETRY_END_NAMESPACE