]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // Copyright The OpenTelemetry Authors |
2 | // SPDX-License-Identifier: Apache-2.0 | |
3 | ||
4 | #ifdef ENABLE_LOGS_PREVIEW | |
5 | ||
6 | # include "opentelemetry/sdk/logs/batch_log_processor.h" | |
7 | # include "opentelemetry/sdk/logs/exporter.h" | |
8 | # include "opentelemetry/sdk/logs/log_record.h" | |
9 | ||
10 | # include <gtest/gtest.h> | |
11 | # include <chrono> | |
12 | # include <thread> | |
13 | ||
14 | using namespace opentelemetry::sdk::logs; | |
15 | using namespace opentelemetry::sdk::common; | |
16 | ||
17 | /** | |
18 | * A sample log exporter | |
19 | * for testing the batch log processor | |
20 | */ | |
21 | class MockLogExporter final : public LogExporter | |
22 | { | |
23 | public: | |
24 | MockLogExporter(std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received, | |
25 | std::shared_ptr<std::atomic<bool>> is_shutdown, | |
26 | std::shared_ptr<std::atomic<bool>> is_export_completed, | |
27 | const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) | |
28 | : logs_received_(logs_received), | |
29 | is_shutdown_(is_shutdown), | |
30 | is_export_completed_(is_export_completed), | |
31 | export_delay_(export_delay) | |
32 | {} | |
33 | ||
34 | std::unique_ptr<Recordable> MakeRecordable() noexcept | |
35 | { | |
36 | return std::unique_ptr<Recordable>(new LogRecord()); | |
37 | } | |
38 | ||
39 | // Export method stores the logs received into a shared list of record names | |
40 | ExportResult Export( | |
41 | const opentelemetry::nostd::span<std::unique_ptr<Recordable>> &records) noexcept override | |
42 | { | |
43 | *is_export_completed_ = false; // Meant exclusively to test scheduled_delay_millis | |
44 | ||
45 | for (auto &record : records) | |
46 | { | |
47 | auto log = std::unique_ptr<LogRecord>(static_cast<LogRecord *>(record.release())); | |
48 | if (log != nullptr) | |
49 | { | |
50 | logs_received_->push_back(std::move(log)); | |
51 | } | |
52 | } | |
53 | ||
54 | *is_export_completed_ = true; | |
55 | return ExportResult::kSuccess; | |
56 | } | |
57 | ||
58 | // toggles the boolean flag marking this exporter as shut down | |
59 | bool Shutdown( | |
60 | std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override | |
61 | { | |
62 | *is_shutdown_ = true; | |
63 | return true; | |
64 | } | |
65 | ||
66 | private: | |
67 | std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received_; | |
68 | std::shared_ptr<std::atomic<bool>> is_shutdown_; | |
69 | std::shared_ptr<std::atomic<bool>> is_export_completed_; | |
70 | const std::chrono::milliseconds export_delay_; | |
71 | }; | |
72 | ||
73 | /** | |
74 | * A fixture class for testing the BatchLogProcessor class that uses the TestExporter defined above. | |
75 | */ | |
76 | class BatchLogProcessorTest : public testing::Test // ::testing::Test | |
77 | { | |
78 | public: | |
79 | // returns a batch log processor that received a batch of log records, a shared pointer to a | |
80 | // is_shutdown flag, and the processor configuration options (default if unspecified) | |
81 | std::shared_ptr<LogProcessor> GetMockProcessor( | |
82 | std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received, | |
83 | std::shared_ptr<std::atomic<bool>> is_shutdown, | |
84 | std::shared_ptr<std::atomic<bool>> is_export_completed = | |
85 | std::shared_ptr<std::atomic<bool>>(new std::atomic<bool>(false)), | |
86 | const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), | |
87 | const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), | |
88 | const size_t max_queue_size = 2048, | |
89 | const size_t max_export_batch_size = 512) | |
90 | { | |
91 | return std::shared_ptr<LogProcessor>( | |
92 | new BatchLogProcessor(std::unique_ptr<LogExporter>(new MockLogExporter( | |
93 | logs_received, is_shutdown, is_export_completed, export_delay)), | |
94 | max_queue_size, scheduled_delay_millis, max_export_batch_size)); | |
95 | } | |
96 | }; | |
97 | ||
98 | TEST_F(BatchLogProcessorTest, TestShutdown) | |
99 | { | |
100 | // initialize a batch log processor with the test exporter | |
101 | std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received( | |
102 | new std::vector<std::unique_ptr<LogRecord>>); | |
103 | std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false)); | |
104 | ||
105 | auto batch_processor = GetMockProcessor(logs_received, is_shutdown); | |
106 | ||
107 | // Create a few test log records and send them to the processor | |
108 | const int num_logs = 3; | |
109 | ||
110 | for (int i = 0; i < num_logs; ++i) | |
111 | { | |
112 | auto log = batch_processor->MakeRecordable(); | |
113 | log->SetBody("Log" + std::to_string(i)); | |
114 | batch_processor->OnReceive(std::move(log)); | |
115 | } | |
116 | ||
117 | // Test that shutting down the processor will first wait for the | |
118 | // current batch of logs to be sent to the log exporter | |
119 | // by checking the number of logs sent and the names of the logs sent | |
120 | EXPECT_EQ(true, batch_processor->Shutdown()); | |
121 | // It's safe to shutdown again | |
122 | EXPECT_TRUE(batch_processor->Shutdown()); | |
123 | ||
124 | EXPECT_EQ(num_logs, logs_received->size()); | |
125 | ||
126 | // Assume logs are received by exporter in same order as sent by processor | |
127 | for (int i = 0; i < num_logs; ++i) | |
128 | { | |
129 | EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody()); | |
130 | } | |
131 | ||
132 | // Also check that the processor is shut down at the end | |
133 | EXPECT_TRUE(is_shutdown->load()); | |
134 | } | |
135 | ||
136 | TEST_F(BatchLogProcessorTest, TestForceFlush) | |
137 | { | |
138 | std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false)); | |
139 | std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received( | |
140 | new std::vector<std::unique_ptr<LogRecord>>); | |
141 | ||
142 | auto batch_processor = GetMockProcessor(logs_received, is_shutdown); | |
143 | const int num_logs = 2048; | |
144 | ||
145 | for (int i = 0; i < num_logs; ++i) | |
146 | { | |
147 | auto log = batch_processor->MakeRecordable(); | |
148 | log->SetBody("Log" + std::to_string(i)); | |
149 | batch_processor->OnReceive(std::move(log)); | |
150 | } | |
151 | ||
152 | EXPECT_TRUE(batch_processor->ForceFlush()); | |
153 | ||
154 | EXPECT_EQ(num_logs, logs_received->size()); | |
155 | for (int i = 0; i < num_logs; ++i) | |
156 | { | |
157 | EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody()); | |
158 | } | |
159 | ||
160 | // Create some more logs to make sure that the processor still works | |
161 | for (int i = 0; i < num_logs; ++i) | |
162 | { | |
163 | auto log = batch_processor->MakeRecordable(); | |
164 | log->SetBody("Log" + std::to_string(i)); | |
165 | batch_processor->OnReceive(std::move(log)); | |
166 | } | |
167 | ||
168 | EXPECT_TRUE(batch_processor->ForceFlush()); | |
169 | ||
170 | EXPECT_EQ(num_logs * 2, logs_received->size()); | |
171 | for (int i = 0; i < num_logs * 2; ++i) | |
172 | { | |
173 | EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetBody()); | |
174 | } | |
175 | } | |
176 | ||
177 | TEST_F(BatchLogProcessorTest, TestManyLogsLoss) | |
178 | { | |
179 | /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ | |
180 | ||
181 | std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false)); | |
182 | std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received( | |
183 | new std::vector<std::unique_ptr<LogRecord>>); | |
184 | ||
185 | const int max_queue_size = 4096; | |
186 | ||
187 | auto batch_processor = GetMockProcessor(logs_received, is_shutdown); | |
188 | ||
189 | // Create max_queue_size log records | |
190 | for (int i = 0; i < max_queue_size; ++i) | |
191 | { | |
192 | auto log = batch_processor->MakeRecordable(); | |
193 | log->SetBody("Log" + std::to_string(i)); | |
194 | batch_processor->OnReceive(std::move(log)); | |
195 | } | |
196 | ||
197 | EXPECT_TRUE(batch_processor->ForceFlush()); | |
198 | ||
199 | // Log should be exported by now | |
200 | EXPECT_GE(max_queue_size, logs_received->size()); | |
201 | } | |
202 | ||
203 | TEST_F(BatchLogProcessorTest, TestManyLogsLossLess) | |
204 | { | |
205 | /* Test that no logs are lost when sending max_queue_size logs */ | |
206 | ||
207 | std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false)); | |
208 | std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received( | |
209 | new std::vector<std::unique_ptr<LogRecord>>); | |
210 | auto batch_processor = GetMockProcessor(logs_received, is_shutdown); | |
211 | ||
212 | const int num_logs = 2048; | |
213 | ||
214 | for (int i = 0; i < num_logs; ++i) | |
215 | { | |
216 | auto log = batch_processor->MakeRecordable(); | |
217 | log->SetBody("Log" + std::to_string(i)); | |
218 | batch_processor->OnReceive(std::move(log)); | |
219 | } | |
220 | ||
221 | EXPECT_TRUE(batch_processor->ForceFlush()); | |
222 | ||
223 | EXPECT_EQ(num_logs, logs_received->size()); | |
224 | for (int i = 0; i < num_logs; ++i) | |
225 | { | |
226 | EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody()); | |
227 | } | |
228 | } | |
229 | ||
230 | TEST_F(BatchLogProcessorTest, TestScheduledDelayMillis) | |
231 | { | |
232 | /* Test that max_export_batch_size logs are exported every scheduled_delay_millis | |
233 | seconds */ | |
234 | ||
235 | std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false)); | |
236 | std::shared_ptr<std::atomic<bool>> is_export_completed(new std::atomic<bool>(false)); | |
237 | std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received( | |
238 | new std::vector<std::unique_ptr<LogRecord>>); | |
239 | ||
240 | const std::chrono::milliseconds export_delay(0); | |
241 | const std::chrono::milliseconds scheduled_delay_millis(2000); | |
242 | const size_t max_export_batch_size = 512; | |
243 | ||
244 | auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, | |
245 | export_delay, scheduled_delay_millis); | |
246 | ||
247 | for (std::size_t i = 0; i < max_export_batch_size; ++i) | |
248 | { | |
249 | auto log = batch_processor->MakeRecordable(); | |
250 | log->SetBody("Log" + std::to_string(i)); | |
251 | batch_processor->OnReceive(std::move(log)); | |
252 | } | |
253 | // Sleep for scheduled_delay_millis milliseconds | |
254 | std::this_thread::sleep_for(scheduled_delay_millis); | |
255 | ||
256 | // small delay to give time to export, which is being performed | |
257 | // asynchronously by the worker thread (this thread will not | |
258 | // forcibly join() the main thread unless processor's shutdown() is called). | |
259 | std::this_thread::sleep_for(std::chrono::milliseconds(50)); | |
260 | ||
261 | // Logs should be exported by now | |
262 | EXPECT_TRUE(is_export_completed->load()); | |
263 | EXPECT_EQ(max_export_batch_size, logs_received->size()); | |
264 | for (size_t i = 0; i < max_export_batch_size; ++i) | |
265 | { | |
266 | EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody()); | |
267 | } | |
268 | } | |
269 | #endif |