]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/opentelemetry-cpp/sdk/test/trace/batch_span_processor_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / jaegertracing / opentelemetry-cpp / sdk / test / trace / batch_span_processor_test.cc
1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
3
4 #include "opentelemetry/sdk/trace/batch_span_processor.h"
5 #include "opentelemetry/sdk/trace/span_data.h"
6 #include "opentelemetry/sdk/trace/tracer.h"
7
8 #include <gtest/gtest.h>
9 #include <chrono>
10 #include <thread>
11
12 OPENTELEMETRY_BEGIN_NAMESPACE
13
14 /**
15 * Returns a mock span exporter meant exclusively for testing only
16 */
17 class MockSpanExporter final : public sdk::trace::SpanExporter
18 {
19 public:
20 MockSpanExporter(
21 std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received,
22 std::shared_ptr<std::atomic<bool>> is_shutdown,
23 std::shared_ptr<std::atomic<bool>> is_export_completed =
24 std::shared_ptr<std::atomic<bool>>(new std::atomic<bool>(false)),
25 const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) noexcept
26 : spans_received_(spans_received),
27 is_shutdown_(is_shutdown),
28 is_export_completed_(is_export_completed),
29 export_delay_(export_delay)
30 {}
31
32 std::unique_ptr<sdk::trace::Recordable> MakeRecordable() noexcept override
33 {
34 return std::unique_ptr<sdk::trace::Recordable>(new sdk::trace::SpanData);
35 }
36
37 sdk::common::ExportResult Export(
38 const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &recordables) noexcept override
39 {
40 *is_export_completed_ = false;
41
42 std::this_thread::sleep_for(export_delay_);
43
44 for (auto &recordable : recordables)
45 {
46 auto span = std::unique_ptr<sdk::trace::SpanData>(
47 static_cast<sdk::trace::SpanData *>(recordable.release()));
48
49 if (span != nullptr)
50 {
51 spans_received_->push_back(std::move(span));
52 }
53 }
54
55 *is_export_completed_ = true;
56 return sdk::common::ExportResult::kSuccess;
57 }
58
59 bool Shutdown(
60 std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
61 {
62 *is_shutdown_ = true;
63 return true;
64 }
65
66 bool IsExportCompleted() { return is_export_completed_->load(); }
67
68 private:
69 std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received_;
70 std::shared_ptr<std::atomic<bool>> is_shutdown_;
71 std::shared_ptr<std::atomic<bool>> is_export_completed_;
72 // Meant exclusively to test force flush timeout
73 const std::chrono::milliseconds export_delay_;
74 };
75
76 /**
77 * Fixture Class
78 */
79 class BatchSpanProcessorTestPeer : public testing::Test
80 {
81 public:
82 std::unique_ptr<std::vector<std::unique_ptr<sdk::trace::Recordable>>> GetTestSpans(
83 std::shared_ptr<sdk::trace::SpanProcessor> processor,
84 const int num_spans)
85 {
86 std::unique_ptr<std::vector<std::unique_ptr<sdk::trace::Recordable>>> test_spans(
87 new std::vector<std::unique_ptr<sdk::trace::Recordable>>);
88
89 for (int i = 0; i < num_spans; ++i)
90 {
91 test_spans->push_back(processor->MakeRecordable());
92 static_cast<sdk::trace::SpanData *>(test_spans->at(i).get())
93 ->SetName("Span " + std::to_string(i));
94 }
95
96 return test_spans;
97 }
98 };
99
100 /* ################################## TESTS ############################################ */
101
102 TEST_F(BatchSpanProcessorTestPeer, TestShutdown)
103 {
104 std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
105 std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
106 new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
107
108 auto batch_processor =
109 std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
110 std::unique_ptr<MockSpanExporter>(new MockSpanExporter(spans_received, is_shutdown)),
111 sdk::trace::BatchSpanProcessorOptions()));
112 const int num_spans = 3;
113
114 auto test_spans = GetTestSpans(batch_processor, num_spans);
115
116 for (int i = 0; i < num_spans; ++i)
117 {
118 batch_processor->OnEnd(std::move(test_spans->at(i)));
119 }
120
121 EXPECT_TRUE(batch_processor->Shutdown());
122 // It's safe to shutdown again
123 EXPECT_TRUE(batch_processor->Shutdown());
124
125 EXPECT_EQ(num_spans, spans_received->size());
126 for (int i = 0; i < num_spans; ++i)
127 {
128 EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
129 }
130
131 EXPECT_TRUE(is_shutdown->load());
132 }
133
134 TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
135 {
136 std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
137 std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
138 new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
139
140 auto batch_processor =
141 std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
142 std::unique_ptr<MockSpanExporter>(new MockSpanExporter(spans_received, is_shutdown)),
143 sdk::trace::BatchSpanProcessorOptions()));
144 const int num_spans = 2048;
145
146 auto test_spans = GetTestSpans(batch_processor, num_spans);
147
148 for (int i = 0; i < num_spans; ++i)
149 {
150 batch_processor->OnEnd(std::move(test_spans->at(i)));
151 }
152
153 // Give some time to export
154 std::this_thread::sleep_for(std::chrono::milliseconds(50));
155
156 EXPECT_TRUE(batch_processor->ForceFlush());
157
158 EXPECT_EQ(num_spans, spans_received->size());
159 for (int i = 0; i < num_spans; ++i)
160 {
161 EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
162 }
163
164 // Create some more spans to make sure that the processor still works
165 auto more_test_spans = GetTestSpans(batch_processor, num_spans);
166 for (int i = 0; i < num_spans; ++i)
167 {
168 batch_processor->OnEnd(std::move(more_test_spans->at(i)));
169 }
170
171 // Give some time to export the spans
172 std::this_thread::sleep_for(std::chrono::milliseconds(50));
173
174 EXPECT_TRUE(batch_processor->ForceFlush());
175
176 EXPECT_EQ(num_spans * 2, spans_received->size());
177 for (int i = 0; i < num_spans; ++i)
178 {
179 EXPECT_EQ("Span " + std::to_string(i % num_spans),
180 spans_received->at(num_spans + i)->GetName());
181 }
182 }
183
184 TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss)
185 {
186 /* Test that when exporting more than max_queue_size spans, some are most likely lost*/
187
188 std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
189 std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
190 new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
191
192 const int max_queue_size = 4096;
193
194 auto batch_processor =
195 std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
196 std::unique_ptr<MockSpanExporter>(new MockSpanExporter(spans_received, is_shutdown)),
197 sdk::trace::BatchSpanProcessorOptions()));
198
199 auto test_spans = GetTestSpans(batch_processor, max_queue_size);
200
201 for (int i = 0; i < max_queue_size; ++i)
202 {
203 batch_processor->OnEnd(std::move(test_spans->at(i)));
204 }
205
206 // Give some time to export the spans
207 std::this_thread::sleep_for(std::chrono::milliseconds(700));
208
209 EXPECT_TRUE(batch_processor->ForceFlush());
210
211 // Span should be exported by now
212 EXPECT_GE(max_queue_size, spans_received->size());
213 }
214
215 TEST_F(BatchSpanProcessorTestPeer, TestManySpansLossLess)
216 {
217 /* Test that no spans are lost when sending max_queue_size spans */
218
219 std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
220 std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
221 new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
222
223 const int num_spans = 2048;
224
225 auto batch_processor =
226 std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
227 std::unique_ptr<MockSpanExporter>(new MockSpanExporter(spans_received, is_shutdown)),
228 sdk::trace::BatchSpanProcessorOptions()));
229
230 auto test_spans = GetTestSpans(batch_processor, num_spans);
231
232 for (int i = 0; i < num_spans; ++i)
233 {
234 batch_processor->OnEnd(std::move(test_spans->at(i)));
235 }
236
237 // Give some time to export the spans
238 std::this_thread::sleep_for(std::chrono::milliseconds(50));
239
240 EXPECT_TRUE(batch_processor->ForceFlush());
241
242 EXPECT_EQ(num_spans, spans_received->size());
243 for (int i = 0; i < num_spans; ++i)
244 {
245 EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
246 }
247 }
248
249 TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis)
250 {
251 /* Test that max_export_batch_size spans are exported every schedule_delay_millis
252 seconds */
253
254 std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
255 std::shared_ptr<std::atomic<bool>> is_export_completed(new std::atomic<bool>(false));
256 std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
257 new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
258 const std::chrono::milliseconds export_delay(0);
259 const size_t max_export_batch_size = 512;
260 sdk::trace::BatchSpanProcessorOptions options{};
261 options.schedule_delay_millis = std::chrono::milliseconds(2000);
262
263 auto batch_processor =
264 std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
265 std::unique_ptr<MockSpanExporter>(
266 new MockSpanExporter(spans_received, is_shutdown, is_export_completed, export_delay)),
267 options));
268
269 auto test_spans = GetTestSpans(batch_processor, max_export_batch_size);
270
271 for (size_t i = 0; i < max_export_batch_size; ++i)
272 {
273 batch_processor->OnEnd(std::move(test_spans->at(i)));
274 }
275
276 // Sleep for schedule_delay_millis milliseconds
277 std::this_thread::sleep_for(options.schedule_delay_millis);
278
279 // small delay to give time to export
280 std::this_thread::sleep_for(std::chrono::milliseconds(50));
281
282 // Spans should be exported by now
283 EXPECT_TRUE(is_export_completed->load());
284 EXPECT_EQ(max_export_batch_size, spans_received->size());
285 for (size_t i = 0; i < max_export_batch_size; ++i)
286 {
287 EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
288 }
289 }
290
291 OPENTELEMETRY_END_NAMESPACE