1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
4 #include "opentelemetry/sdk/trace/batch_span_processor.h"
5 #include "opentelemetry/sdk/trace/span_data.h"
6 #include "opentelemetry/sdk/trace/tracer.h"
8 #include <gtest/gtest.h>
12 OPENTELEMETRY_BEGIN_NAMESPACE
15 * Returns a mock span exporter meant exclusively for testing only
17 class MockSpanExporter final
: public sdk::trace::SpanExporter
21 std::shared_ptr
<std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>> spans_received
,
22 std::shared_ptr
<std::atomic
<bool>> is_shutdown
,
23 std::shared_ptr
<std::atomic
<bool>> is_export_completed
=
24 std::shared_ptr
<std::atomic
<bool>>(new std::atomic
<bool>(false)),
25 const std::chrono::milliseconds export_delay
= std::chrono::milliseconds(0)) noexcept
26 : spans_received_(spans_received
),
27 is_shutdown_(is_shutdown
),
28 is_export_completed_(is_export_completed
),
29 export_delay_(export_delay
)
32 std::unique_ptr
<sdk::trace::Recordable
> MakeRecordable() noexcept override
34 return std::unique_ptr
<sdk::trace::Recordable
>(new sdk::trace::SpanData
);
37 sdk::common::ExportResult
Export(
38 const nostd::span
<std::unique_ptr
<sdk::trace::Recordable
>> &recordables
) noexcept override
40 *is_export_completed_
= false;
42 std::this_thread::sleep_for(export_delay_
);
44 for (auto &recordable
: recordables
)
46 auto span
= std::unique_ptr
<sdk::trace::SpanData
>(
47 static_cast<sdk::trace::SpanData
*>(recordable
.release()));
51 spans_received_
->push_back(std::move(span
));
55 *is_export_completed_
= true;
56 return sdk::common::ExportResult::kSuccess
;
60 std::chrono::microseconds timeout
= std::chrono::microseconds::max()) noexcept override
66 bool IsExportCompleted() { return is_export_completed_
->load(); }
69 std::shared_ptr
<std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>> spans_received_
;
70 std::shared_ptr
<std::atomic
<bool>> is_shutdown_
;
71 std::shared_ptr
<std::atomic
<bool>> is_export_completed_
;
72 // Meant exclusively to test force flush timeout
73 const std::chrono::milliseconds export_delay_
;
79 class BatchSpanProcessorTestPeer
: public testing::Test
82 std::unique_ptr
<std::vector
<std::unique_ptr
<sdk::trace::Recordable
>>> GetTestSpans(
83 std::shared_ptr
<sdk::trace::SpanProcessor
> processor
,
86 std::unique_ptr
<std::vector
<std::unique_ptr
<sdk::trace::Recordable
>>> test_spans(
87 new std::vector
<std::unique_ptr
<sdk::trace::Recordable
>>);
89 for (int i
= 0; i
< num_spans
; ++i
)
91 test_spans
->push_back(processor
->MakeRecordable());
92 static_cast<sdk::trace::SpanData
*>(test_spans
->at(i
).get())
93 ->SetName("Span " + std::to_string(i
));
100 /* ################################## TESTS ############################################ */
102 TEST_F(BatchSpanProcessorTestPeer
, TestShutdown
)
104 std::shared_ptr
<std::atomic
<bool>> is_shutdown(new std::atomic
<bool>(false));
105 std::shared_ptr
<std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>> spans_received(
106 new std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>);
108 auto batch_processor
=
109 std::shared_ptr
<sdk::trace::BatchSpanProcessor
>(new sdk::trace::BatchSpanProcessor(
110 std::unique_ptr
<MockSpanExporter
>(new MockSpanExporter(spans_received
, is_shutdown
)),
111 sdk::trace::BatchSpanProcessorOptions()));
112 const int num_spans
= 3;
114 auto test_spans
= GetTestSpans(batch_processor
, num_spans
);
116 for (int i
= 0; i
< num_spans
; ++i
)
118 batch_processor
->OnEnd(std::move(test_spans
->at(i
)));
121 EXPECT_TRUE(batch_processor
->Shutdown());
122 // It's safe to shutdown again
123 EXPECT_TRUE(batch_processor
->Shutdown());
125 EXPECT_EQ(num_spans
, spans_received
->size());
126 for (int i
= 0; i
< num_spans
; ++i
)
128 EXPECT_EQ("Span " + std::to_string(i
), spans_received
->at(i
)->GetName());
131 EXPECT_TRUE(is_shutdown
->load());
134 TEST_F(BatchSpanProcessorTestPeer
, TestForceFlush
)
136 std::shared_ptr
<std::atomic
<bool>> is_shutdown(new std::atomic
<bool>(false));
137 std::shared_ptr
<std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>> spans_received(
138 new std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>);
140 auto batch_processor
=
141 std::shared_ptr
<sdk::trace::BatchSpanProcessor
>(new sdk::trace::BatchSpanProcessor(
142 std::unique_ptr
<MockSpanExporter
>(new MockSpanExporter(spans_received
, is_shutdown
)),
143 sdk::trace::BatchSpanProcessorOptions()));
144 const int num_spans
= 2048;
146 auto test_spans
= GetTestSpans(batch_processor
, num_spans
);
148 for (int i
= 0; i
< num_spans
; ++i
)
150 batch_processor
->OnEnd(std::move(test_spans
->at(i
)));
153 // Give some time to export
154 std::this_thread::sleep_for(std::chrono::milliseconds(50));
156 EXPECT_TRUE(batch_processor
->ForceFlush());
158 EXPECT_EQ(num_spans
, spans_received
->size());
159 for (int i
= 0; i
< num_spans
; ++i
)
161 EXPECT_EQ("Span " + std::to_string(i
), spans_received
->at(i
)->GetName());
164 // Create some more spans to make sure that the processor still works
165 auto more_test_spans
= GetTestSpans(batch_processor
, num_spans
);
166 for (int i
= 0; i
< num_spans
; ++i
)
168 batch_processor
->OnEnd(std::move(more_test_spans
->at(i
)));
171 // Give some time to export the spans
172 std::this_thread::sleep_for(std::chrono::milliseconds(50));
174 EXPECT_TRUE(batch_processor
->ForceFlush());
176 EXPECT_EQ(num_spans
* 2, spans_received
->size());
177 for (int i
= 0; i
< num_spans
; ++i
)
179 EXPECT_EQ("Span " + std::to_string(i
% num_spans
),
180 spans_received
->at(num_spans
+ i
)->GetName());
184 TEST_F(BatchSpanProcessorTestPeer
, TestManySpansLoss
)
186 /* Test that when exporting more than max_queue_size spans, some are most likely lost*/
188 std::shared_ptr
<std::atomic
<bool>> is_shutdown(new std::atomic
<bool>(false));
189 std::shared_ptr
<std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>> spans_received(
190 new std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>);
192 const int max_queue_size
= 4096;
194 auto batch_processor
=
195 std::shared_ptr
<sdk::trace::BatchSpanProcessor
>(new sdk::trace::BatchSpanProcessor(
196 std::unique_ptr
<MockSpanExporter
>(new MockSpanExporter(spans_received
, is_shutdown
)),
197 sdk::trace::BatchSpanProcessorOptions()));
199 auto test_spans
= GetTestSpans(batch_processor
, max_queue_size
);
201 for (int i
= 0; i
< max_queue_size
; ++i
)
203 batch_processor
->OnEnd(std::move(test_spans
->at(i
)));
206 // Give some time to export the spans
207 std::this_thread::sleep_for(std::chrono::milliseconds(700));
209 EXPECT_TRUE(batch_processor
->ForceFlush());
211 // Span should be exported by now
212 EXPECT_GE(max_queue_size
, spans_received
->size());
215 TEST_F(BatchSpanProcessorTestPeer
, TestManySpansLossLess
)
217 /* Test that no spans are lost when sending max_queue_size spans */
219 std::shared_ptr
<std::atomic
<bool>> is_shutdown(new std::atomic
<bool>(false));
220 std::shared_ptr
<std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>> spans_received(
221 new std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>);
223 const int num_spans
= 2048;
225 auto batch_processor
=
226 std::shared_ptr
<sdk::trace::BatchSpanProcessor
>(new sdk::trace::BatchSpanProcessor(
227 std::unique_ptr
<MockSpanExporter
>(new MockSpanExporter(spans_received
, is_shutdown
)),
228 sdk::trace::BatchSpanProcessorOptions()));
230 auto test_spans
= GetTestSpans(batch_processor
, num_spans
);
232 for (int i
= 0; i
< num_spans
; ++i
)
234 batch_processor
->OnEnd(std::move(test_spans
->at(i
)));
237 // Give some time to export the spans
238 std::this_thread::sleep_for(std::chrono::milliseconds(50));
240 EXPECT_TRUE(batch_processor
->ForceFlush());
242 EXPECT_EQ(num_spans
, spans_received
->size());
243 for (int i
= 0; i
< num_spans
; ++i
)
245 EXPECT_EQ("Span " + std::to_string(i
), spans_received
->at(i
)->GetName());
249 TEST_F(BatchSpanProcessorTestPeer
, TestScheduleDelayMillis
)
251 /* Test that max_export_batch_size spans are exported every schedule_delay_millis
254 std::shared_ptr
<std::atomic
<bool>> is_shutdown(new std::atomic
<bool>(false));
255 std::shared_ptr
<std::atomic
<bool>> is_export_completed(new std::atomic
<bool>(false));
256 std::shared_ptr
<std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>> spans_received(
257 new std::vector
<std::unique_ptr
<sdk::trace::SpanData
>>);
258 const std::chrono::milliseconds
export_delay(0);
259 const size_t max_export_batch_size
= 512;
260 sdk::trace::BatchSpanProcessorOptions options
{};
261 options
.schedule_delay_millis
= std::chrono::milliseconds(2000);
263 auto batch_processor
=
264 std::shared_ptr
<sdk::trace::BatchSpanProcessor
>(new sdk::trace::BatchSpanProcessor(
265 std::unique_ptr
<MockSpanExporter
>(
266 new MockSpanExporter(spans_received
, is_shutdown
, is_export_completed
, export_delay
)),
269 auto test_spans
= GetTestSpans(batch_processor
, max_export_batch_size
);
271 for (size_t i
= 0; i
< max_export_batch_size
; ++i
)
273 batch_processor
->OnEnd(std::move(test_spans
->at(i
)));
276 // Sleep for schedule_delay_millis milliseconds
277 std::this_thread::sleep_for(options
.schedule_delay_millis
);
279 // small delay to give time to export
280 std::this_thread::sleep_for(std::chrono::milliseconds(50));
282 // Spans should be exported by now
283 EXPECT_TRUE(is_export_completed
->load());
284 EXPECT_EQ(max_export_batch_size
, spans_received
->size());
285 for (size_t i
= 0; i
< max_export_batch_size
; ++i
)
287 EXPECT_EQ("Span " + std::to_string(i
), spans_received
->at(i
)->GetName());
291 OPENTELEMETRY_END_NAMESPACE