]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/opentelemetry-cpp/sdk/include/opentelemetry/sdk/common/circular_buffer.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / jaegertracing / opentelemetry-cpp / sdk / include / opentelemetry / sdk / common / circular_buffer.h
1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
3
4 #pragma once
5
6 #include <atomic>
7 #include <cstdint>
8 #include <memory>
9
10 #include "opentelemetry/sdk/common/atomic_unique_ptr.h"
11 #include "opentelemetry/sdk/common/circular_buffer_range.h"
12 #include "opentelemetry/version.h"
13
14 OPENTELEMETRY_BEGIN_NAMESPACE
15 namespace sdk
16 {
17 namespace common
18 {
19 /*
20 * A lock-free circular buffer that supports multiple concurrent producers
21 * and a single consumer.
22 */
23 template <class T>
24 class CircularBuffer
25 {
26 public:
27 explicit CircularBuffer(size_t max_size)
28 : data_{new AtomicUniquePtr<T>[max_size + 1]}, capacity_{max_size + 1}
29 {}
30
31 /**
32 * @return a range of the elements in the circular buffer
33 *
34 * Note: This method must only be called from the consumer thread.
35 */
36 CircularBufferRange<const AtomicUniquePtr<T>> Peek() const noexcept
37 {
38 return const_cast<CircularBuffer *>(this)->PeekImpl();
39 }
40
41 /**
42 * Consume elements from the circular buffer's tail.
43 * @param n the number of elements to consume
44 * @param callback the callback to invoke with an AtomicUniquePtr to each
45 * consumed element.
46 *
47 * Note: The callback must set the passed AtomicUniquePtr to null.
48 *
49 * Note: This method must only be called from the consumer thread.
50 */
51 template <class Callback>
52 void Consume(size_t n, Callback callback) noexcept
53 {
54 assert(n <= static_cast<size_t>(head_ - tail_));
55 auto range = PeekImpl().Take(n);
56 static_assert(noexcept(callback(range)), "callback not allowed to throw");
57 tail_ += n;
58 callback(range);
59 }
60
61 /**
62 * Consume elements from the circular buffer's tail.
63 * @param n the number of elements to consume
64 *
65 * Note: This method must only be called from the consumer thread.
66 */
67 void Consume(size_t n) noexcept
68 {
69 Consume(n, [](CircularBufferRange<AtomicUniquePtr<T>> &range) noexcept {
70 range.ForEach([](AtomicUniquePtr<T> &ptr) noexcept {
71 ptr.Reset();
72 return true;
73 });
74 });
75 }
76
77 /**
78 * Adds an element into the circular buffer.
79 * @param ptr a pointer to the element to add
80 * @return true if the element was successfully added; false, otherwise.
81 */
82 bool Add(std::unique_ptr<T> &ptr) noexcept
83 {
84 while (true)
85 {
86 uint64_t tail = tail_;
87 uint64_t head = head_;
88
89 // The circular buffer is full, so return false.
90 if (head - tail >= capacity_ - 1)
91 {
92 return false;
93 }
94
95 uint64_t head_index = head % capacity_;
96 if (data_[head_index].SwapIfNull(ptr))
97 {
98 auto new_head = head + 1;
99 auto expected_head = head;
100 if (head_.compare_exchange_weak(expected_head, new_head, std::memory_order_release,
101 std::memory_order_relaxed))
102 {
103 // free the swapped out value
104 ptr.reset();
105
106 return true;
107 }
108
109 // If we reached this point (unlikely), it means that between the last
110 // iteration elements were added and then consumed from the circular
111 // buffer, so we undo the swap and attempt to add again.
112 data_[head_index].Swap(ptr);
113 }
114 }
115 return true;
116 }
117
118 /**
119 * Clear the circular buffer.
120 *
121 * Note: This method must only be called from the consumer thread.
122 */
123 void Clear() noexcept { Consume(size()); }
124
125 /**
126 * @return the maximum number of bytes that can be stored in the buffer.
127 */
128 size_t max_size() const noexcept { return capacity_ - 1; }
129
130 /**
131 * @return true if the buffer is empty.
132 */
133 bool empty() const noexcept { return head_ == tail_; }
134
135 /**
136 * @return the number of bytes stored in the circular buffer.
137 *
138 * Note: this method will only return a correct snapshot of the size if called
139 * from the consumer thread.
140 */
141 size_t size() const noexcept
142 {
143 uint64_t tail = tail_;
144 uint64_t head = head_;
145 assert(tail <= head);
146 return head - tail;
147 }
148
149 /**
150 * @return the number of elements consumed from the circular buffer.
151 */
152 uint64_t consumption_count() const noexcept { return tail_; }
153
154 /**
155 * @return the number of elements added to the circular buffer.
156 */
157 uint64_t production_count() const noexcept { return head_; }
158
159 private:
160 std::unique_ptr<AtomicUniquePtr<T>[]> data_;
161 size_t capacity_;
162 std::atomic<uint64_t> head_{0};
163 std::atomic<uint64_t> tail_{0};
164
165 CircularBufferRange<AtomicUniquePtr<T>> PeekImpl() noexcept
166 {
167 uint64_t tail_index = tail_ % capacity_;
168 uint64_t head_index = head_ % capacity_;
169 if (head_index == tail_index)
170 {
171 return {};
172 }
173 auto data = data_.get();
174 if (tail_index < head_index)
175 {
176 return CircularBufferRange<AtomicUniquePtr<T>>{nostd::span<AtomicUniquePtr<T>>{
177 data + tail_index, static_cast<std::size_t>(head_index - tail_index)}};
178 }
179 return {nostd::span<AtomicUniquePtr<T>>{data + tail_index,
180 static_cast<std::size_t>(capacity_ - tail_index)},
181 nostd::span<AtomicUniquePtr<T>>{data, static_cast<std::size_t>(head_index)}};
182 }
183 };
184 } // namespace common
185 } // namespace sdk
186 OPENTELEMETRY_END_NAMESPACE