]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/opentelemetry-cpp/sdk/test/logs/batch_log_processor_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / jaegertracing / opentelemetry-cpp / sdk / test / logs / batch_log_processor_test.cc
CommitLineData
1e59de90
TL
1// Copyright The OpenTelemetry Authors
2// SPDX-License-Identifier: Apache-2.0
3
4#ifdef ENABLE_LOGS_PREVIEW
5
6# include "opentelemetry/sdk/logs/batch_log_processor.h"
7# include "opentelemetry/sdk/logs/exporter.h"
8# include "opentelemetry/sdk/logs/log_record.h"
9
10# include <gtest/gtest.h>
11# include <chrono>
12# include <thread>
13
14using namespace opentelemetry::sdk::logs;
15using namespace opentelemetry::sdk::common;
16
17/**
18 * A sample log exporter
19 * for testing the batch log processor
20 */
21class MockLogExporter final : public LogExporter
22{
23public:
24 MockLogExporter(std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received,
25 std::shared_ptr<std::atomic<bool>> is_shutdown,
26 std::shared_ptr<std::atomic<bool>> is_export_completed,
27 const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0))
28 : logs_received_(logs_received),
29 is_shutdown_(is_shutdown),
30 is_export_completed_(is_export_completed),
31 export_delay_(export_delay)
32 {}
33
34 std::unique_ptr<Recordable> MakeRecordable() noexcept
35 {
36 return std::unique_ptr<Recordable>(new LogRecord());
37 }
38
39 // Export method stores the logs received into a shared list of record names
40 ExportResult Export(
41 const opentelemetry::nostd::span<std::unique_ptr<Recordable>> &records) noexcept override
42 {
43 *is_export_completed_ = false; // Meant exclusively to test scheduled_delay_millis
44
45 for (auto &record : records)
46 {
47 auto log = std::unique_ptr<LogRecord>(static_cast<LogRecord *>(record.release()));
48 if (log != nullptr)
49 {
50 logs_received_->push_back(std::move(log));
51 }
52 }
53
54 *is_export_completed_ = true;
55 return ExportResult::kSuccess;
56 }
57
58 // toggles the boolean flag marking this exporter as shut down
59 bool Shutdown(
60 std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
61 {
62 *is_shutdown_ = true;
63 return true;
64 }
65
66private:
67 std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received_;
68 std::shared_ptr<std::atomic<bool>> is_shutdown_;
69 std::shared_ptr<std::atomic<bool>> is_export_completed_;
70 const std::chrono::milliseconds export_delay_;
71};
72
73/**
74 * A fixture class for testing the BatchLogProcessor class that uses the TestExporter defined above.
75 */
76class BatchLogProcessorTest : public testing::Test // ::testing::Test
77{
78public:
79 // returns a batch log processor that received a batch of log records, a shared pointer to a
80 // is_shutdown flag, and the processor configuration options (default if unspecified)
81 std::shared_ptr<LogProcessor> GetMockProcessor(
82 std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received,
83 std::shared_ptr<std::atomic<bool>> is_shutdown,
84 std::shared_ptr<std::atomic<bool>> is_export_completed =
85 std::shared_ptr<std::atomic<bool>>(new std::atomic<bool>(false)),
86 const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0),
87 const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000),
88 const size_t max_queue_size = 2048,
89 const size_t max_export_batch_size = 512)
90 {
91 return std::shared_ptr<LogProcessor>(
92 new BatchLogProcessor(std::unique_ptr<LogExporter>(new MockLogExporter(
93 logs_received, is_shutdown, is_export_completed, export_delay)),
94 max_queue_size, scheduled_delay_millis, max_export_batch_size));
95 }
96};
97
98TEST_F(BatchLogProcessorTest, TestShutdown)
99{
100 // initialize a batch log processor with the test exporter
101 std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received(
102 new std::vector<std::unique_ptr<LogRecord>>);
103 std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
104
105 auto batch_processor = GetMockProcessor(logs_received, is_shutdown);
106
107 // Create a few test log records and send them to the processor
108 const int num_logs = 3;
109
110 for (int i = 0; i < num_logs; ++i)
111 {
112 auto log = batch_processor->MakeRecordable();
113 log->SetBody("Log" + std::to_string(i));
114 batch_processor->OnReceive(std::move(log));
115 }
116
117 // Test that shutting down the processor will first wait for the
118 // current batch of logs to be sent to the log exporter
119 // by checking the number of logs sent and the names of the logs sent
120 EXPECT_EQ(true, batch_processor->Shutdown());
121 // It's safe to shutdown again
122 EXPECT_TRUE(batch_processor->Shutdown());
123
124 EXPECT_EQ(num_logs, logs_received->size());
125
126 // Assume logs are received by exporter in same order as sent by processor
127 for (int i = 0; i < num_logs; ++i)
128 {
129 EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody());
130 }
131
132 // Also check that the processor is shut down at the end
133 EXPECT_TRUE(is_shutdown->load());
134}
135
136TEST_F(BatchLogProcessorTest, TestForceFlush)
137{
138 std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
139 std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received(
140 new std::vector<std::unique_ptr<LogRecord>>);
141
142 auto batch_processor = GetMockProcessor(logs_received, is_shutdown);
143 const int num_logs = 2048;
144
145 for (int i = 0; i < num_logs; ++i)
146 {
147 auto log = batch_processor->MakeRecordable();
148 log->SetBody("Log" + std::to_string(i));
149 batch_processor->OnReceive(std::move(log));
150 }
151
152 EXPECT_TRUE(batch_processor->ForceFlush());
153
154 EXPECT_EQ(num_logs, logs_received->size());
155 for (int i = 0; i < num_logs; ++i)
156 {
157 EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody());
158 }
159
160 // Create some more logs to make sure that the processor still works
161 for (int i = 0; i < num_logs; ++i)
162 {
163 auto log = batch_processor->MakeRecordable();
164 log->SetBody("Log" + std::to_string(i));
165 batch_processor->OnReceive(std::move(log));
166 }
167
168 EXPECT_TRUE(batch_processor->ForceFlush());
169
170 EXPECT_EQ(num_logs * 2, logs_received->size());
171 for (int i = 0; i < num_logs * 2; ++i)
172 {
173 EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetBody());
174 }
175}
176
177TEST_F(BatchLogProcessorTest, TestManyLogsLoss)
178{
179 /* Test that when exporting more than max_queue_size logs, some are most likely lost*/
180
181 std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
182 std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received(
183 new std::vector<std::unique_ptr<LogRecord>>);
184
185 const int max_queue_size = 4096;
186
187 auto batch_processor = GetMockProcessor(logs_received, is_shutdown);
188
189 // Create max_queue_size log records
190 for (int i = 0; i < max_queue_size; ++i)
191 {
192 auto log = batch_processor->MakeRecordable();
193 log->SetBody("Log" + std::to_string(i));
194 batch_processor->OnReceive(std::move(log));
195 }
196
197 EXPECT_TRUE(batch_processor->ForceFlush());
198
199 // Log should be exported by now
200 EXPECT_GE(max_queue_size, logs_received->size());
201}
202
203TEST_F(BatchLogProcessorTest, TestManyLogsLossLess)
204{
205 /* Test that no logs are lost when sending max_queue_size logs */
206
207 std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
208 std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received(
209 new std::vector<std::unique_ptr<LogRecord>>);
210 auto batch_processor = GetMockProcessor(logs_received, is_shutdown);
211
212 const int num_logs = 2048;
213
214 for (int i = 0; i < num_logs; ++i)
215 {
216 auto log = batch_processor->MakeRecordable();
217 log->SetBody("Log" + std::to_string(i));
218 batch_processor->OnReceive(std::move(log));
219 }
220
221 EXPECT_TRUE(batch_processor->ForceFlush());
222
223 EXPECT_EQ(num_logs, logs_received->size());
224 for (int i = 0; i < num_logs; ++i)
225 {
226 EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody());
227 }
228}
229
230TEST_F(BatchLogProcessorTest, TestScheduledDelayMillis)
231{
232 /* Test that max_export_batch_size logs are exported every scheduled_delay_millis
233 seconds */
234
235 std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
236 std::shared_ptr<std::atomic<bool>> is_export_completed(new std::atomic<bool>(false));
237 std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received(
238 new std::vector<std::unique_ptr<LogRecord>>);
239
240 const std::chrono::milliseconds export_delay(0);
241 const std::chrono::milliseconds scheduled_delay_millis(2000);
242 const size_t max_export_batch_size = 512;
243
244 auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed,
245 export_delay, scheduled_delay_millis);
246
247 for (std::size_t i = 0; i < max_export_batch_size; ++i)
248 {
249 auto log = batch_processor->MakeRecordable();
250 log->SetBody("Log" + std::to_string(i));
251 batch_processor->OnReceive(std::move(log));
252 }
253 // Sleep for scheduled_delay_millis milliseconds
254 std::this_thread::sleep_for(scheduled_delay_millis);
255
256 // small delay to give time to export, which is being performed
257 // asynchronously by the worker thread (this thread will not
258 // forcibly join() the main thread unless processor's shutdown() is called).
259 std::this_thread::sleep_for(std::chrono::milliseconds(50));
260
261 // Logs should be exported by now
262 EXPECT_TRUE(is_export_completed->load());
263 EXPECT_EQ(max_export_batch_size, logs_received->size());
264 for (size_t i = 0; i < max_export_batch_size; ++i)
265 {
266 EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody());
267 }
268}
269#endif