]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/log_reader.h" | |
11 | #include "db/log_writer.h" | |
f67539c2 TL |
12 | #include "file/sequence_file_reader.h" |
13 | #include "file/writable_file_writer.h" | |
7c673cae | 14 | #include "rocksdb/env.h" |
f67539c2 TL |
15 | #include "test_util/testharness.h" |
16 | #include "test_util/testutil.h" | |
7c673cae FG |
17 | #include "util/coding.h" |
18 | #include "util/crc32c.h" | |
7c673cae | 19 | #include "util/random.h" |
1e59de90 | 20 | #include "utilities/memory_allocators.h" |
7c673cae | 21 | |
f67539c2 | 22 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
23 | namespace log { |
24 | ||
25 | // Construct a string of the specified length made out of the supplied | |
26 | // partial string. | |
27 | static std::string BigString(const std::string& partial_string, size_t n) { | |
28 | std::string result; | |
29 | while (result.size() < n) { | |
30 | result.append(partial_string); | |
31 | } | |
32 | result.resize(n); | |
33 | return result; | |
34 | } | |
35 | ||
36 | // Construct a string from a number | |
37 | static std::string NumberString(int n) { | |
38 | char buf[50]; | |
39 | snprintf(buf, sizeof(buf), "%d.", n); | |
40 | return std::string(buf); | |
41 | } | |
42 | ||
43 | // Return a skewed potentially long string | |
44 | static std::string RandomSkewedString(int i, Random* rnd) { | |
45 | return BigString(NumberString(i), rnd->Skewed(17)); | |
46 | } | |
47 | ||
494da23a TL |
48 | // Param type is tuple<int, bool> |
49 | // get<0>(tuple): non-zero if recycling log, zero if regular log | |
50 | // get<1>(tuple): true if allow retry after read EOF, false otherwise | |
1e59de90 TL |
51 | class LogTest |
52 | : public ::testing::TestWithParam<std::tuple<int, bool, CompressionType>> { | |
7c673cae | 53 | private: |
1e59de90 | 54 | class StringSource : public FSSequentialFile { |
7c673cae FG |
55 | public: |
56 | Slice& contents_; | |
57 | bool force_error_; | |
58 | size_t force_error_position_; | |
59 | bool force_eof_; | |
60 | size_t force_eof_position_; | |
61 | bool returned_partial_; | |
494da23a TL |
62 | bool fail_after_read_partial_; |
63 | explicit StringSource(Slice& contents, bool fail_after_read_partial) | |
64 | : contents_(contents), | |
65 | force_error_(false), | |
66 | force_error_position_(0), | |
67 | force_eof_(false), | |
68 | force_eof_position_(0), | |
69 | returned_partial_(false), | |
70 | fail_after_read_partial_(fail_after_read_partial) {} | |
71 | ||
1e59de90 TL |
72 | IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result, |
73 | char* scratch, IODebugContext* /*dbg*/) override { | |
494da23a TL |
74 | if (fail_after_read_partial_) { |
75 | EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error"; | |
76 | } | |
7c673cae FG |
77 | |
78 | if (force_error_) { | |
79 | if (force_error_position_ >= n) { | |
80 | force_error_position_ -= n; | |
81 | } else { | |
82 | *result = Slice(contents_.data(), force_error_position_); | |
83 | contents_.remove_prefix(force_error_position_); | |
84 | force_error_ = false; | |
85 | returned_partial_ = true; | |
1e59de90 | 86 | return IOStatus::Corruption("read error"); |
7c673cae FG |
87 | } |
88 | } | |
89 | ||
90 | if (contents_.size() < n) { | |
91 | n = contents_.size(); | |
92 | returned_partial_ = true; | |
93 | } | |
94 | ||
95 | if (force_eof_) { | |
96 | if (force_eof_position_ >= n) { | |
97 | force_eof_position_ -= n; | |
98 | } else { | |
99 | force_eof_ = false; | |
100 | n = force_eof_position_; | |
101 | returned_partial_ = true; | |
102 | } | |
103 | } | |
104 | ||
105 | // By using scratch we ensure that caller has control over the | |
106 | // lifetime of result.data() | |
107 | memcpy(scratch, contents_.data(), n); | |
108 | *result = Slice(scratch, n); | |
109 | ||
110 | contents_.remove_prefix(n); | |
1e59de90 | 111 | return IOStatus::OK(); |
7c673cae FG |
112 | } |
113 | ||
1e59de90 | 114 | IOStatus Skip(uint64_t n) override { |
7c673cae FG |
115 | if (n > contents_.size()) { |
116 | contents_.clear(); | |
1e59de90 | 117 | return IOStatus::NotFound("in-memory file skipepd past end"); |
7c673cae FG |
118 | } |
119 | ||
120 | contents_.remove_prefix(n); | |
121 | ||
1e59de90 | 122 | return IOStatus::OK(); |
7c673cae FG |
123 | } |
124 | }; | |
125 | ||
126 | class ReportCollector : public Reader::Reporter { | |
127 | public: | |
128 | size_t dropped_bytes_; | |
129 | std::string message_; | |
130 | ||
1e59de90 | 131 | ReportCollector() : dropped_bytes_(0) {} |
494da23a | 132 | void Corruption(size_t bytes, const Status& status) override { |
7c673cae FG |
133 | dropped_bytes_ += bytes; |
134 | message_.append(status.ToString()); | |
135 | } | |
136 | }; | |
137 | ||
1e59de90 | 138 | std::string& dest_contents() { return sink_->contents_; } |
7c673cae | 139 | |
1e59de90 | 140 | const std::string& dest_contents() const { return sink_->contents_; } |
7c673cae | 141 | |
1e59de90 | 142 | void reset_source_contents() { source_->contents_ = dest_contents(); } |
7c673cae FG |
143 | |
144 | Slice reader_contents_; | |
1e59de90 TL |
145 | test::StringSink* sink_; |
146 | StringSource* source_; | |
7c673cae | 147 | ReportCollector report_; |
7c673cae | 148 | |
494da23a | 149 | protected: |
1e59de90 TL |
150 | std::unique_ptr<Writer> writer_; |
151 | std::unique_ptr<Reader> reader_; | |
494da23a | 152 | bool allow_retry_read_; |
1e59de90 | 153 | CompressionType compression_type_; |
7c673cae FG |
154 | |
155 | public: | |
156 | LogTest() | |
157 | : reader_contents_(), | |
1e59de90 TL |
158 | sink_(new test::StringSink(&reader_contents_)), |
159 | source_(new StringSource(reader_contents_, !std::get<1>(GetParam()))), | |
160 | allow_retry_read_(std::get<1>(GetParam())), | |
161 | compression_type_(std::get<2>(GetParam())) { | |
162 | std::unique_ptr<FSWritableFile> sink_holder(sink_); | |
163 | std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( | |
164 | std::move(sink_holder), "" /* don't care */, FileOptions())); | |
165 | Writer* writer = | |
166 | new Writer(std::move(file_writer), 123, std::get<0>(GetParam()), false, | |
167 | compression_type_); | |
168 | writer_.reset(writer); | |
169 | std::unique_ptr<FSSequentialFile> source_holder(source_); | |
170 | std::unique_ptr<SequentialFileReader> file_reader( | |
171 | new SequentialFileReader(std::move(source_holder), "" /* file name */)); | |
494da23a | 172 | if (allow_retry_read_) { |
1e59de90 TL |
173 | reader_.reset(new FragmentBufferedReader(nullptr, std::move(file_reader), |
174 | &report_, true /* checksum */, | |
175 | 123 /* log_number */)); | |
494da23a | 176 | } else { |
1e59de90 | 177 | reader_.reset(new Reader(nullptr, std::move(file_reader), &report_, |
494da23a TL |
178 | true /* checksum */, 123 /* log_number */)); |
179 | } | |
7c673cae FG |
180 | } |
181 | ||
182 | Slice* get_reader_contents() { return &reader_contents_; } | |
183 | ||
184 | void Write(const std::string& msg) { | |
1e59de90 | 185 | ASSERT_OK(writer_->AddRecord(Slice(msg))); |
7c673cae FG |
186 | } |
187 | ||
1e59de90 | 188 | size_t WrittenBytes() const { return dest_contents().size(); } |
7c673cae FG |
189 | |
190 | std::string Read(const WALRecoveryMode wal_recovery_mode = | |
191 | WALRecoveryMode::kTolerateCorruptedTailRecords) { | |
192 | std::string scratch; | |
193 | Slice record; | |
494da23a | 194 | bool ret = false; |
1e59de90 TL |
195 | uint64_t record_checksum; |
196 | ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode, | |
197 | &record_checksum); | |
494da23a | 198 | if (ret) { |
1e59de90 TL |
199 | if (!allow_retry_read_) { |
200 | // allow_retry_read_ means using FragmentBufferedReader which does not | |
201 | // support record checksum yet. | |
202 | uint64_t actual_record_checksum = | |
203 | XXH3_64bits(record.data(), record.size()); | |
204 | assert(actual_record_checksum == record_checksum); | |
205 | } | |
7c673cae FG |
206 | return record.ToString(); |
207 | } else { | |
208 | return "EOF"; | |
209 | } | |
210 | } | |
211 | ||
11fdf7f2 | 212 | void IncrementByte(int offset, char delta) { |
7c673cae FG |
213 | dest_contents()[offset] += delta; |
214 | } | |
215 | ||
216 | void SetByte(int offset, char new_byte) { | |
217 | dest_contents()[offset] = new_byte; | |
218 | } | |
219 | ||
1e59de90 | 220 | void ShrinkSize(int bytes) { sink_->Drop(bytes); } |
7c673cae FG |
221 | |
222 | void FixChecksum(int header_offset, int len, bool recyclable) { | |
223 | // Compute crc of type/len/data | |
224 | int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize; | |
225 | uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6], | |
226 | header_size - 6 + len); | |
227 | crc = crc32c::Mask(crc); | |
228 | EncodeFixed32(&dest_contents()[header_offset], crc); | |
229 | } | |
230 | ||
231 | void ForceError(size_t position = 0) { | |
1e59de90 TL |
232 | source_->force_error_ = true; |
233 | source_->force_error_position_ = position; | |
7c673cae FG |
234 | } |
235 | ||
1e59de90 | 236 | size_t DroppedBytes() const { return report_.dropped_bytes_; } |
7c673cae | 237 | |
1e59de90 | 238 | std::string ReportMessage() const { return report_.message_; } |
7c673cae FG |
239 | |
240 | void ForceEOF(size_t position = 0) { | |
1e59de90 TL |
241 | source_->force_eof_ = true; |
242 | source_->force_eof_position_ = position; | |
7c673cae FG |
243 | } |
244 | ||
245 | void UnmarkEOF() { | |
1e59de90 | 246 | source_->returned_partial_ = false; |
494da23a | 247 | reader_->UnmarkEOF(); |
7c673cae FG |
248 | } |
249 | ||
494da23a | 250 | bool IsEOF() { return reader_->IsEOF(); } |
7c673cae FG |
251 | |
252 | // Returns OK iff recorded error message contains "msg" | |
253 | std::string MatchError(const std::string& msg) const { | |
254 | if (report_.message_.find(msg) == std::string::npos) { | |
255 | return report_.message_; | |
256 | } else { | |
257 | return "OK"; | |
258 | } | |
259 | } | |
7c673cae FG |
260 | }; |
261 | ||
7c673cae FG |
262 | TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } |
263 | ||
264 | TEST_P(LogTest, ReadWrite) { | |
265 | Write("foo"); | |
266 | Write("bar"); | |
267 | Write(""); | |
268 | Write("xxxx"); | |
269 | ASSERT_EQ("foo", Read()); | |
270 | ASSERT_EQ("bar", Read()); | |
271 | ASSERT_EQ("", Read()); | |
272 | ASSERT_EQ("xxxx", Read()); | |
273 | ASSERT_EQ("EOF", Read()); | |
274 | ASSERT_EQ("EOF", Read()); // Make sure reads at eof work | |
275 | } | |
276 | ||
277 | TEST_P(LogTest, ManyBlocks) { | |
278 | for (int i = 0; i < 100000; i++) { | |
279 | Write(NumberString(i)); | |
280 | } | |
281 | for (int i = 0; i < 100000; i++) { | |
282 | ASSERT_EQ(NumberString(i), Read()); | |
283 | } | |
284 | ASSERT_EQ("EOF", Read()); | |
285 | } | |
286 | ||
287 | TEST_P(LogTest, Fragmentation) { | |
288 | Write("small"); | |
289 | Write(BigString("medium", 50000)); | |
290 | Write(BigString("large", 100000)); | |
291 | ASSERT_EQ("small", Read()); | |
292 | ASSERT_EQ(BigString("medium", 50000), Read()); | |
293 | ASSERT_EQ(BigString("large", 100000), Read()); | |
294 | ASSERT_EQ("EOF", Read()); | |
295 | } | |
296 | ||
297 | TEST_P(LogTest, MarginalTrailer) { | |
298 | // Make a trailer that is exactly the same length as an empty record. | |
494da23a TL |
299 | int header_size = |
300 | std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
301 | const int n = kBlockSize - 2 * header_size; |
302 | Write(BigString("foo", n)); | |
303 | ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes()); | |
304 | Write(""); | |
305 | Write("bar"); | |
306 | ASSERT_EQ(BigString("foo", n), Read()); | |
307 | ASSERT_EQ("", Read()); | |
308 | ASSERT_EQ("bar", Read()); | |
309 | ASSERT_EQ("EOF", Read()); | |
310 | } | |
311 | ||
312 | TEST_P(LogTest, MarginalTrailer2) { | |
313 | // Make a trailer that is exactly the same length as an empty record. | |
494da23a TL |
314 | int header_size = |
315 | std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
316 | const int n = kBlockSize - 2 * header_size; |
317 | Write(BigString("foo", n)); | |
318 | ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes()); | |
319 | Write("bar"); | |
320 | ASSERT_EQ(BigString("foo", n), Read()); | |
321 | ASSERT_EQ("bar", Read()); | |
322 | ASSERT_EQ("EOF", Read()); | |
323 | ASSERT_EQ(0U, DroppedBytes()); | |
324 | ASSERT_EQ("", ReportMessage()); | |
325 | } | |
326 | ||
327 | TEST_P(LogTest, ShortTrailer) { | |
494da23a TL |
328 | int header_size = |
329 | std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
330 | const int n = kBlockSize - 2 * header_size + 4; |
331 | Write(BigString("foo", n)); | |
332 | ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes()); | |
333 | Write(""); | |
334 | Write("bar"); | |
335 | ASSERT_EQ(BigString("foo", n), Read()); | |
336 | ASSERT_EQ("", Read()); | |
337 | ASSERT_EQ("bar", Read()); | |
338 | ASSERT_EQ("EOF", Read()); | |
339 | } | |
340 | ||
341 | TEST_P(LogTest, AlignedEof) { | |
494da23a TL |
342 | int header_size = |
343 | std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
344 | const int n = kBlockSize - 2 * header_size + 4; |
345 | Write(BigString("foo", n)); | |
346 | ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes()); | |
347 | ASSERT_EQ(BigString("foo", n), Read()); | |
348 | ASSERT_EQ("EOF", Read()); | |
349 | } | |
350 | ||
351 | TEST_P(LogTest, RandomRead) { | |
352 | const int N = 500; | |
353 | Random write_rnd(301); | |
354 | for (int i = 0; i < N; i++) { | |
355 | Write(RandomSkewedString(i, &write_rnd)); | |
356 | } | |
357 | Random read_rnd(301); | |
358 | for (int i = 0; i < N; i++) { | |
359 | ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read()); | |
360 | } | |
361 | ASSERT_EQ("EOF", Read()); | |
362 | } | |
363 | ||
364 | // Tests of all the error paths in log_reader.cc follow: | |
365 | ||
366 | TEST_P(LogTest, ReadError) { | |
367 | Write("foo"); | |
368 | ForceError(); | |
369 | ASSERT_EQ("EOF", Read()); | |
370 | ASSERT_EQ((unsigned int)kBlockSize, DroppedBytes()); | |
371 | ASSERT_EQ("OK", MatchError("read error")); | |
372 | } | |
373 | ||
374 | TEST_P(LogTest, BadRecordType) { | |
375 | Write("foo"); | |
376 | // Type is stored in header[6] | |
377 | IncrementByte(6, 100); | |
378 | FixChecksum(0, 3, false); | |
379 | ASSERT_EQ("EOF", Read()); | |
380 | ASSERT_EQ(3U, DroppedBytes()); | |
381 | ASSERT_EQ("OK", MatchError("unknown record type")); | |
382 | } | |
383 | ||
384 | TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) { | |
385 | Write("foo"); | |
1e59de90 | 386 | ShrinkSize(4); // Drop all payload as well as a header byte |
7c673cae FG |
387 | ASSERT_EQ("EOF", Read()); |
388 | // Truncated last record is ignored, not treated as an error | |
389 | ASSERT_EQ(0U, DroppedBytes()); | |
390 | ASSERT_EQ("", ReportMessage()); | |
391 | } | |
392 | ||
393 | TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { | |
494da23a TL |
394 | if (allow_retry_read_) { |
395 | // If read retry is allowed, then truncated trailing record should not | |
396 | // raise an error. | |
397 | return; | |
398 | } | |
7c673cae FG |
399 | Write("foo"); |
400 | ShrinkSize(4); // Drop all payload as well as a header byte | |
401 | ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); | |
402 | // Truncated last record is ignored, not treated as an error | |
403 | ASSERT_GT(DroppedBytes(), 0U); | |
404 | ASSERT_EQ("OK", MatchError("Corruption: truncated header")); | |
405 | } | |
406 | ||
407 | TEST_P(LogTest, BadLength) { | |
494da23a TL |
408 | if (allow_retry_read_) { |
409 | // If read retry is allowed, then we should not raise an error when the | |
410 | // record length specified in header is longer than data currently | |
411 | // available. It's possible that the body of the record is not written yet. | |
412 | return; | |
413 | } | |
414 | bool recyclable_log = (std::get<0>(GetParam()) != 0); | |
415 | int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
416 | const int kPayloadSize = kBlockSize - header_size; |
417 | Write(BigString("bar", kPayloadSize)); | |
418 | Write("foo"); | |
419 | // Least significant size byte is stored in header[4]. | |
420 | IncrementByte(4, 1); | |
494da23a | 421 | if (!recyclable_log) { |
7c673cae FG |
422 | ASSERT_EQ("foo", Read()); |
423 | ASSERT_EQ(kBlockSize, DroppedBytes()); | |
424 | ASSERT_EQ("OK", MatchError("bad record length")); | |
425 | } else { | |
426 | ASSERT_EQ("EOF", Read()); | |
427 | } | |
428 | } | |
429 | ||
430 | TEST_P(LogTest, BadLengthAtEndIsIgnored) { | |
494da23a TL |
431 | if (allow_retry_read_) { |
432 | // If read retry is allowed, then we should not raise an error when the | |
433 | // record length specified in header is longer than data currently | |
434 | // available. It's possible that the body of the record is not written yet. | |
435 | return; | |
436 | } | |
7c673cae FG |
437 | Write("foo"); |
438 | ShrinkSize(1); | |
439 | ASSERT_EQ("EOF", Read()); | |
440 | ASSERT_EQ(0U, DroppedBytes()); | |
441 | ASSERT_EQ("", ReportMessage()); | |
442 | } | |
443 | ||
444 | TEST_P(LogTest, BadLengthAtEndIsNotIgnored) { | |
494da23a TL |
445 | if (allow_retry_read_) { |
446 | // If read retry is allowed, then we should not raise an error when the | |
447 | // record length specified in header is longer than data currently | |
448 | // available. It's possible that the body of the record is not written yet. | |
449 | return; | |
450 | } | |
7c673cae FG |
451 | Write("foo"); |
452 | ShrinkSize(1); | |
453 | ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); | |
454 | ASSERT_GT(DroppedBytes(), 0U); | |
20effc67 | 455 | ASSERT_EQ("OK", MatchError("Corruption: truncated record body")); |
7c673cae FG |
456 | } |
457 | ||
458 | TEST_P(LogTest, ChecksumMismatch) { | |
459 | Write("foooooo"); | |
460 | IncrementByte(0, 14); | |
461 | ASSERT_EQ("EOF", Read()); | |
494da23a TL |
462 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
463 | if (!recyclable_log) { | |
7c673cae FG |
464 | ASSERT_EQ(14U, DroppedBytes()); |
465 | ASSERT_EQ("OK", MatchError("checksum mismatch")); | |
466 | } else { | |
467 | ASSERT_EQ(0U, DroppedBytes()); | |
468 | ASSERT_EQ("", ReportMessage()); | |
469 | } | |
470 | } | |
471 | ||
472 | TEST_P(LogTest, UnexpectedMiddleType) { | |
473 | Write("foo"); | |
494da23a TL |
474 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
475 | SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType | |
476 | : kMiddleType)); | |
477 | FixChecksum(0, 3, !!recyclable_log); | |
7c673cae FG |
478 | ASSERT_EQ("EOF", Read()); |
479 | ASSERT_EQ(3U, DroppedBytes()); | |
480 | ASSERT_EQ("OK", MatchError("missing start")); | |
481 | } | |
482 | ||
483 | TEST_P(LogTest, UnexpectedLastType) { | |
484 | Write("foo"); | |
494da23a TL |
485 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
486 | SetByte(6, | |
487 | static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType)); | |
488 | FixChecksum(0, 3, !!recyclable_log); | |
7c673cae FG |
489 | ASSERT_EQ("EOF", Read()); |
490 | ASSERT_EQ(3U, DroppedBytes()); | |
491 | ASSERT_EQ("OK", MatchError("missing start")); | |
492 | } | |
493 | ||
494 | TEST_P(LogTest, UnexpectedFullType) { | |
495 | Write("foo"); | |
496 | Write("bar"); | |
494da23a TL |
497 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
498 | SetByte( | |
499 | 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType)); | |
500 | FixChecksum(0, 3, !!recyclable_log); | |
7c673cae FG |
501 | ASSERT_EQ("bar", Read()); |
502 | ASSERT_EQ("EOF", Read()); | |
503 | ASSERT_EQ(3U, DroppedBytes()); | |
504 | ASSERT_EQ("OK", MatchError("partial record without end")); | |
505 | } | |
506 | ||
507 | TEST_P(LogTest, UnexpectedFirstType) { | |
508 | Write("foo"); | |
509 | Write(BigString("bar", 100000)); | |
494da23a TL |
510 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
511 | SetByte( | |
512 | 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType)); | |
513 | FixChecksum(0, 3, !!recyclable_log); | |
7c673cae FG |
514 | ASSERT_EQ(BigString("bar", 100000), Read()); |
515 | ASSERT_EQ("EOF", Read()); | |
516 | ASSERT_EQ(3U, DroppedBytes()); | |
517 | ASSERT_EQ("OK", MatchError("partial record without end")); | |
518 | } | |
519 | ||
520 | TEST_P(LogTest, MissingLastIsIgnored) { | |
521 | Write(BigString("bar", kBlockSize)); | |
522 | // Remove the LAST block, including header. | |
523 | ShrinkSize(14); | |
524 | ASSERT_EQ("EOF", Read()); | |
525 | ASSERT_EQ("", ReportMessage()); | |
526 | ASSERT_EQ(0U, DroppedBytes()); | |
527 | } | |
528 | ||
529 | TEST_P(LogTest, MissingLastIsNotIgnored) { | |
494da23a TL |
530 | if (allow_retry_read_) { |
531 | // If read retry is allowed, then truncated trailing record should not | |
532 | // raise an error. | |
533 | return; | |
534 | } | |
7c673cae FG |
535 | Write(BigString("bar", kBlockSize)); |
536 | // Remove the LAST block, including header. | |
537 | ShrinkSize(14); | |
538 | ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); | |
539 | ASSERT_GT(DroppedBytes(), 0U); | |
540 | ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data")); | |
541 | } | |
542 | ||
543 | TEST_P(LogTest, PartialLastIsIgnored) { | |
544 | Write(BigString("bar", kBlockSize)); | |
545 | // Cause a bad record length in the LAST block. | |
546 | ShrinkSize(1); | |
547 | ASSERT_EQ("EOF", Read()); | |
548 | ASSERT_EQ("", ReportMessage()); | |
549 | ASSERT_EQ(0U, DroppedBytes()); | |
550 | } | |
551 | ||
552 | TEST_P(LogTest, PartialLastIsNotIgnored) { | |
494da23a TL |
553 | if (allow_retry_read_) { |
554 | // If read retry is allowed, then truncated trailing record should not | |
555 | // raise an error. | |
556 | return; | |
557 | } | |
7c673cae FG |
558 | Write(BigString("bar", kBlockSize)); |
559 | // Cause a bad record length in the LAST block. | |
560 | ShrinkSize(1); | |
561 | ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); | |
562 | ASSERT_GT(DroppedBytes(), 0U); | |
20effc67 | 563 | ASSERT_EQ("OK", MatchError("Corruption: truncated record body")); |
7c673cae FG |
564 | } |
565 | ||
566 | TEST_P(LogTest, ErrorJoinsRecords) { | |
567 | // Consider two fragmented records: | |
568 | // first(R1) last(R1) first(R2) last(R2) | |
569 | // where the middle two fragments disappear. We do not want | |
570 | // first(R1),last(R2) to get joined and returned as a valid record. | |
571 | ||
572 | // Write records that span two blocks | |
573 | Write(BigString("foo", kBlockSize)); | |
574 | Write(BigString("bar", kBlockSize)); | |
575 | Write("correct"); | |
576 | ||
577 | // Wipe the middle block | |
1e59de90 | 578 | for (unsigned int offset = kBlockSize; offset < 2 * kBlockSize; offset++) { |
7c673cae FG |
579 | SetByte(offset, 'x'); |
580 | } | |
581 | ||
494da23a TL |
582 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
583 | if (!recyclable_log) { | |
7c673cae FG |
584 | ASSERT_EQ("correct", Read()); |
585 | ASSERT_EQ("EOF", Read()); | |
586 | size_t dropped = DroppedBytes(); | |
587 | ASSERT_LE(dropped, 2 * kBlockSize + 100); | |
588 | ASSERT_GE(dropped, 2 * kBlockSize); | |
589 | } else { | |
590 | ASSERT_EQ("EOF", Read()); | |
591 | } | |
592 | } | |
593 | ||
7c673cae FG |
594 | TEST_P(LogTest, ClearEofSingleBlock) { |
595 | Write("foo"); | |
596 | Write("bar"); | |
494da23a TL |
597 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
598 | int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
599 | ForceEOF(3 + header_size + 2); |
600 | ASSERT_EQ("foo", Read()); | |
601 | UnmarkEOF(); | |
602 | ASSERT_EQ("bar", Read()); | |
603 | ASSERT_TRUE(IsEOF()); | |
604 | ASSERT_EQ("EOF", Read()); | |
605 | Write("xxx"); | |
606 | UnmarkEOF(); | |
607 | ASSERT_EQ("xxx", Read()); | |
608 | ASSERT_TRUE(IsEOF()); | |
609 | } | |
610 | ||
611 | TEST_P(LogTest, ClearEofMultiBlock) { | |
612 | size_t num_full_blocks = 5; | |
494da23a TL |
613 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
614 | int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
615 | size_t n = (kBlockSize - header_size) * num_full_blocks + 25; |
616 | Write(BigString("foo", n)); | |
617 | Write(BigString("bar", n)); | |
618 | ForceEOF(n + num_full_blocks * header_size + header_size + 3); | |
619 | ASSERT_EQ(BigString("foo", n), Read()); | |
620 | ASSERT_TRUE(IsEOF()); | |
621 | UnmarkEOF(); | |
622 | ASSERT_EQ(BigString("bar", n), Read()); | |
623 | ASSERT_TRUE(IsEOF()); | |
624 | Write(BigString("xxx", n)); | |
625 | UnmarkEOF(); | |
626 | ASSERT_EQ(BigString("xxx", n), Read()); | |
627 | ASSERT_TRUE(IsEOF()); | |
628 | } | |
629 | ||
630 | TEST_P(LogTest, ClearEofError) { | |
631 | // If an error occurs during Read() in UnmarkEOF(), the records contained | |
632 | // in the buffer should be returned on subsequent calls of ReadRecord() | |
633 | // until no more full records are left, whereafter ReadRecord() should return | |
634 | // false to indicate that it cannot read any further. | |
635 | ||
636 | Write("foo"); | |
637 | Write("bar"); | |
638 | UnmarkEOF(); | |
639 | ASSERT_EQ("foo", Read()); | |
640 | ASSERT_TRUE(IsEOF()); | |
641 | Write("xxx"); | |
642 | ForceError(0); | |
643 | UnmarkEOF(); | |
644 | ASSERT_EQ("bar", Read()); | |
645 | ASSERT_EQ("EOF", Read()); | |
646 | } | |
647 | ||
648 | TEST_P(LogTest, ClearEofError2) { | |
649 | Write("foo"); | |
650 | Write("bar"); | |
651 | UnmarkEOF(); | |
652 | ASSERT_EQ("foo", Read()); | |
653 | Write("xxx"); | |
654 | ForceError(3); | |
655 | UnmarkEOF(); | |
656 | ASSERT_EQ("bar", Read()); | |
657 | ASSERT_EQ("EOF", Read()); | |
658 | ASSERT_EQ(3U, DroppedBytes()); | |
659 | ASSERT_EQ("OK", MatchError("read error")); | |
660 | } | |
661 | ||
662 | TEST_P(LogTest, Recycle) { | |
494da23a TL |
663 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
664 | if (!recyclable_log) { | |
7c673cae FG |
665 | return; // test is only valid for recycled logs |
666 | } | |
667 | Write("foo"); | |
668 | Write("bar"); | |
669 | Write("baz"); | |
670 | Write("bif"); | |
671 | Write("blitz"); | |
672 | while (get_reader_contents()->size() < log::kBlockSize * 2) { | |
673 | Write("xxxxxxxxxxxxxxxx"); | |
674 | } | |
1e59de90 TL |
675 | std::unique_ptr<FSWritableFile> sink( |
676 | new test::OverwritingStringSink(get_reader_contents())); | |
677 | std::unique_ptr<WritableFileWriter> dest_holder(new WritableFileWriter( | |
678 | std::move(sink), "" /* don't care */, FileOptions())); | |
7c673cae | 679 | Writer recycle_writer(std::move(dest_holder), 123, true); |
1e59de90 TL |
680 | ASSERT_OK(recycle_writer.AddRecord(Slice("foooo"))); |
681 | ASSERT_OK(recycle_writer.AddRecord(Slice("bar"))); | |
7c673cae FG |
682 | ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2); |
683 | ASSERT_EQ("foooo", Read()); | |
684 | ASSERT_EQ("bar", Read()); | |
685 | ASSERT_EQ("EOF", Read()); | |
686 | } | |
687 | ||
1e59de90 TL |
688 | // Do NOT enable compression for this instantiation. |
689 | INSTANTIATE_TEST_CASE_P( | |
690 | Log, LogTest, | |
691 | ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(), | |
692 | ::testing::Values(CompressionType::kNoCompression))); | |
494da23a TL |
693 | |
694 | class RetriableLogTest : public ::testing::TestWithParam<int> { | |
695 | private: | |
696 | class ReportCollector : public Reader::Reporter { | |
697 | public: | |
698 | size_t dropped_bytes_; | |
699 | std::string message_; | |
700 | ||
701 | ReportCollector() : dropped_bytes_(0) {} | |
702 | void Corruption(size_t bytes, const Status& status) override { | |
703 | dropped_bytes_ += bytes; | |
704 | message_.append(status.ToString()); | |
705 | } | |
706 | }; | |
707 | ||
708 | Slice contents_; | |
1e59de90 | 709 | test::StringSink* sink_; |
494da23a TL |
710 | std::unique_ptr<Writer> log_writer_; |
711 | Env* env_; | |
494da23a TL |
712 | const std::string test_dir_; |
713 | const std::string log_file_; | |
714 | std::unique_ptr<WritableFileWriter> writer_; | |
715 | std::unique_ptr<SequentialFileReader> reader_; | |
716 | ReportCollector report_; | |
717 | std::unique_ptr<FragmentBufferedReader> log_reader_; | |
718 | ||
719 | public: | |
720 | RetriableLogTest() | |
721 | : contents_(), | |
1e59de90 | 722 | sink_(new test::StringSink(&contents_)), |
494da23a TL |
723 | log_writer_(nullptr), |
724 | env_(Env::Default()), | |
725 | test_dir_(test::PerThreadDBPath("retriable_log_test")), | |
726 | log_file_(test_dir_ + "/log"), | |
727 | writer_(nullptr), | |
728 | reader_(nullptr), | |
1e59de90 TL |
729 | log_reader_(nullptr) { |
730 | std::unique_ptr<FSWritableFile> sink_holder(sink_); | |
731 | std::unique_ptr<WritableFileWriter> wfw(new WritableFileWriter( | |
732 | std::move(sink_holder), "" /* file name */, FileOptions())); | |
733 | log_writer_.reset(new Writer(std::move(wfw), 123, GetParam())); | |
734 | } | |
494da23a TL |
735 | |
736 | Status SetupTestEnv() { | |
494da23a | 737 | Status s; |
1e59de90 TL |
738 | FileOptions fopts; |
739 | auto fs = env_->GetFileSystem(); | |
740 | s = fs->CreateDirIfMissing(test_dir_, IOOptions(), nullptr); | |
741 | std::unique_ptr<FSWritableFile> writable_file; | |
494da23a | 742 | if (s.ok()) { |
1e59de90 | 743 | s = fs->NewWritableFile(log_file_, fopts, &writable_file, nullptr); |
494da23a TL |
744 | } |
745 | if (s.ok()) { | |
1e59de90 TL |
746 | writer_.reset( |
747 | new WritableFileWriter(std::move(writable_file), log_file_, fopts)); | |
748 | EXPECT_NE(writer_, nullptr); | |
494da23a | 749 | } |
1e59de90 | 750 | std::unique_ptr<FSSequentialFile> seq_file; |
494da23a | 751 | if (s.ok()) { |
1e59de90 | 752 | s = fs->NewSequentialFile(log_file_, fopts, &seq_file, nullptr); |
494da23a TL |
753 | } |
754 | if (s.ok()) { | |
1e59de90 TL |
755 | reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_)); |
756 | EXPECT_NE(reader_, nullptr); | |
494da23a TL |
757 | log_reader_.reset(new FragmentBufferedReader( |
758 | nullptr, std::move(reader_), &report_, true /* checksum */, | |
759 | 123 /* log_number */)); | |
1e59de90 | 760 | EXPECT_NE(log_reader_, nullptr); |
494da23a TL |
761 | } |
762 | return s; | |
763 | } | |
764 | ||
1e59de90 | 765 | std::string contents() { return sink_->contents_; } |
494da23a | 766 | |
1e59de90 TL |
767 | void Encode(const std::string& msg) { |
768 | ASSERT_OK(log_writer_->AddRecord(Slice(msg))); | |
769 | } | |
494da23a TL |
770 | |
771 | void Write(const Slice& data) { | |
1e59de90 TL |
772 | ASSERT_OK(writer_->Append(data)); |
773 | ASSERT_OK(writer_->Sync(true)); | |
494da23a TL |
774 | } |
775 | ||
776 | bool TryRead(std::string* result) { | |
777 | assert(result != nullptr); | |
778 | result->clear(); | |
779 | std::string scratch; | |
780 | Slice record; | |
781 | bool r = log_reader_->ReadRecord(&record, &scratch); | |
782 | if (r) { | |
783 | result->assign(record.data(), record.size()); | |
784 | return true; | |
785 | } else { | |
786 | return false; | |
787 | } | |
788 | } | |
789 | }; | |
790 | ||
791 | TEST_P(RetriableLogTest, TailLog_PartialHeader) { | |
792 | ASSERT_OK(SetupTestEnv()); | |
793 | std::vector<int> remaining_bytes_in_last_record; | |
794 | size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; | |
795 | bool eof = false; | |
796 | SyncPoint::GetInstance()->DisableProcessing(); | |
797 | SyncPoint::GetInstance()->LoadDependency( | |
798 | {{"RetriableLogTest::TailLog:AfterPart1", | |
799 | "RetriableLogTest::TailLog:BeforeReadRecord"}, | |
800 | {"FragmentBufferedLogReader::TryReadMore:FirstEOF", | |
801 | "RetriableLogTest::TailLog:BeforePart2"}}); | |
802 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
803 | SyncPoint::GetInstance()->SetCallBack( | |
804 | "FragmentBufferedLogReader::TryReadMore:FirstEOF", | |
805 | [&](void* /*arg*/) { eof = true; }); | |
806 | SyncPoint::GetInstance()->EnableProcessing(); | |
807 | ||
808 | size_t delta = header_size - 1; | |
809 | port::Thread log_writer_thread([&]() { | |
810 | size_t old_sz = contents().size(); | |
811 | Encode("foo"); | |
812 | size_t new_sz = contents().size(); | |
813 | std::string part1 = contents().substr(old_sz, delta); | |
814 | std::string part2 = | |
815 | contents().substr(old_sz + delta, new_sz - old_sz - delta); | |
816 | Write(Slice(part1)); | |
817 | TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1"); | |
818 | TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2"); | |
819 | Write(Slice(part2)); | |
820 | }); | |
821 | ||
822 | std::string record; | |
823 | port::Thread log_reader_thread([&]() { | |
824 | TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); | |
825 | while (!TryRead(&record)) { | |
826 | } | |
827 | }); | |
828 | log_reader_thread.join(); | |
829 | log_writer_thread.join(); | |
830 | ASSERT_EQ("foo", record); | |
831 | ASSERT_TRUE(eof); | |
832 | } | |
833 | ||
834 | TEST_P(RetriableLogTest, TailLog_FullHeader) { | |
835 | ASSERT_OK(SetupTestEnv()); | |
836 | std::vector<int> remaining_bytes_in_last_record; | |
837 | size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; | |
838 | bool eof = false; | |
839 | SyncPoint::GetInstance()->DisableProcessing(); | |
840 | SyncPoint::GetInstance()->LoadDependency( | |
841 | {{"RetriableLogTest::TailLog:AfterPart1", | |
842 | "RetriableLogTest::TailLog:BeforeReadRecord"}, | |
843 | {"FragmentBufferedLogReader::TryReadMore:FirstEOF", | |
844 | "RetriableLogTest::TailLog:BeforePart2"}}); | |
845 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
846 | SyncPoint::GetInstance()->SetCallBack( | |
847 | "FragmentBufferedLogReader::TryReadMore:FirstEOF", | |
848 | [&](void* /*arg*/) { eof = true; }); | |
849 | SyncPoint::GetInstance()->EnableProcessing(); | |
850 | ||
851 | size_t delta = header_size + 1; | |
852 | port::Thread log_writer_thread([&]() { | |
853 | size_t old_sz = contents().size(); | |
854 | Encode("foo"); | |
855 | size_t new_sz = contents().size(); | |
856 | std::string part1 = contents().substr(old_sz, delta); | |
857 | std::string part2 = | |
858 | contents().substr(old_sz + delta, new_sz - old_sz - delta); | |
859 | Write(Slice(part1)); | |
860 | TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1"); | |
861 | TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2"); | |
862 | Write(Slice(part2)); | |
863 | ASSERT_TRUE(eof); | |
864 | }); | |
865 | ||
866 | std::string record; | |
867 | port::Thread log_reader_thread([&]() { | |
868 | TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); | |
869 | while (!TryRead(&record)) { | |
870 | } | |
871 | }); | |
872 | log_reader_thread.join(); | |
873 | log_writer_thread.join(); | |
874 | ASSERT_EQ("foo", record); | |
875 | } | |
876 | ||
877 | TEST_P(RetriableLogTest, NonBlockingReadFullRecord) { | |
878 | // Clear all sync point callbacks even if this test does not use sync point. | |
879 | // It is necessary, otherwise the execute of this test may hit a sync point | |
880 | // with which a callback is registered. The registered callback may access | |
881 | // some dead variable, causing segfault. | |
882 | SyncPoint::GetInstance()->DisableProcessing(); | |
883 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
884 | ASSERT_OK(SetupTestEnv()); | |
885 | size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; | |
886 | size_t delta = header_size - 1; | |
887 | size_t old_sz = contents().size(); | |
888 | Encode("foo-bar"); | |
889 | size_t new_sz = contents().size(); | |
890 | std::string part1 = contents().substr(old_sz, delta); | |
891 | std::string part2 = | |
892 | contents().substr(old_sz + delta, new_sz - old_sz - delta); | |
893 | Write(Slice(part1)); | |
894 | std::string record; | |
895 | ASSERT_FALSE(TryRead(&record)); | |
896 | ASSERT_TRUE(record.empty()); | |
897 | Write(Slice(part2)); | |
898 | ASSERT_TRUE(TryRead(&record)); | |
899 | ASSERT_EQ("foo-bar", record); | |
900 | } | |
901 | ||
902 | INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2)); | |
7c673cae | 903 | |
1e59de90 TL |
904 | class CompressionLogTest : public LogTest { |
905 | public: | |
906 | Status SetupTestEnv() { return writer_->AddCompressionTypeRecord(); } | |
907 | }; | |
908 | ||
909 | TEST_P(CompressionLogTest, Empty) { | |
910 | CompressionType compression_type = std::get<2>(GetParam()); | |
911 | if (!StreamingCompressionTypeSupported(compression_type)) { | |
912 | ROCKSDB_GTEST_SKIP("Test requires support for compression type"); | |
913 | return; | |
914 | } | |
915 | ASSERT_OK(SetupTestEnv()); | |
916 | const bool compression_enabled = | |
917 | std::get<2>(GetParam()) == kNoCompression ? false : true; | |
918 | // If WAL compression is enabled, a record is added for the compression type | |
919 | const int compression_record_size = compression_enabled ? kHeaderSize + 4 : 0; | |
920 | ASSERT_EQ(compression_record_size, WrittenBytes()); | |
921 | ASSERT_EQ("EOF", Read()); | |
922 | } | |
923 | ||
924 | TEST_P(CompressionLogTest, ReadWrite) { | |
925 | CompressionType compression_type = std::get<2>(GetParam()); | |
926 | if (!StreamingCompressionTypeSupported(compression_type)) { | |
927 | ROCKSDB_GTEST_SKIP("Test requires support for compression type"); | |
928 | return; | |
929 | } | |
930 | ASSERT_OK(SetupTestEnv()); | |
931 | Write("foo"); | |
932 | Write("bar"); | |
933 | Write(""); | |
934 | Write("xxxx"); | |
935 | ASSERT_EQ("foo", Read()); | |
936 | ASSERT_EQ("bar", Read()); | |
937 | ASSERT_EQ("", Read()); | |
938 | ASSERT_EQ("xxxx", Read()); | |
939 | ASSERT_EQ("EOF", Read()); | |
940 | ASSERT_EQ("EOF", Read()); // Make sure reads at eof work | |
941 | } | |
942 | ||
943 | TEST_P(CompressionLogTest, ManyBlocks) { | |
944 | CompressionType compression_type = std::get<2>(GetParam()); | |
945 | if (!StreamingCompressionTypeSupported(compression_type)) { | |
946 | ROCKSDB_GTEST_SKIP("Test requires support for compression type"); | |
947 | return; | |
948 | } | |
949 | ASSERT_OK(SetupTestEnv()); | |
950 | for (int i = 0; i < 100000; i++) { | |
951 | Write(NumberString(i)); | |
952 | } | |
953 | for (int i = 0; i < 100000; i++) { | |
954 | ASSERT_EQ(NumberString(i), Read()); | |
955 | } | |
956 | ASSERT_EQ("EOF", Read()); | |
957 | } | |
958 | ||
959 | TEST_P(CompressionLogTest, Fragmentation) { | |
960 | CompressionType compression_type = std::get<2>(GetParam()); | |
961 | if (!StreamingCompressionTypeSupported(compression_type)) { | |
962 | ROCKSDB_GTEST_SKIP("Test requires support for compression type"); | |
963 | return; | |
964 | } | |
965 | ASSERT_OK(SetupTestEnv()); | |
966 | Random rnd(301); | |
967 | const std::vector<std::string> wal_entries = { | |
968 | "small", | |
969 | rnd.RandomBinaryString(3 * kBlockSize / 2), // Spans into block 2 | |
970 | rnd.RandomBinaryString(3 * kBlockSize), // Spans into block 5 | |
971 | }; | |
972 | for (const std::string& wal_entry : wal_entries) { | |
973 | Write(wal_entry); | |
974 | } | |
975 | ||
976 | for (const std::string& wal_entry : wal_entries) { | |
977 | ASSERT_EQ(wal_entry, Read()); | |
978 | } | |
979 | ASSERT_EQ("EOF", Read()); | |
980 | } | |
981 | ||
982 | INSTANTIATE_TEST_CASE_P( | |
983 | Compression, CompressionLogTest, | |
984 | ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(), | |
985 | ::testing::Values(CompressionType::kNoCompression, | |
986 | CompressionType::kZSTD))); | |
987 | ||
988 | class StreamingCompressionTest | |
989 | : public ::testing::TestWithParam<std::tuple<int, CompressionType>> {}; | |
990 | ||
991 | TEST_P(StreamingCompressionTest, Basic) { | |
992 | size_t input_size = std::get<0>(GetParam()); | |
993 | CompressionType compression_type = std::get<1>(GetParam()); | |
994 | if (!StreamingCompressionTypeSupported(compression_type)) { | |
995 | ROCKSDB_GTEST_SKIP("Test requires support for compression type"); | |
996 | return; | |
997 | } | |
998 | CompressionOptions opts; | |
999 | constexpr uint32_t compression_format_version = 2; | |
1000 | StreamingCompress* compress = StreamingCompress::Create( | |
1001 | compression_type, opts, compression_format_version, kBlockSize); | |
1002 | StreamingUncompress* uncompress = StreamingUncompress::Create( | |
1003 | compression_type, compression_format_version, kBlockSize); | |
1004 | MemoryAllocator* allocator = new DefaultMemoryAllocator(); | |
1005 | std::string input_buffer = BigString("abc", input_size); | |
1006 | std::vector<std::string> compressed_buffers; | |
1007 | size_t remaining; | |
1008 | // Call compress till the entire input is consumed | |
1009 | do { | |
1010 | char* output_buffer = (char*)allocator->Allocate(kBlockSize); | |
1011 | size_t output_pos; | |
1012 | remaining = compress->Compress(input_buffer.c_str(), input_size, | |
1013 | output_buffer, &output_pos); | |
1014 | if (output_pos > 0) { | |
1015 | std::string compressed_buffer; | |
1016 | compressed_buffer.assign(output_buffer, output_pos); | |
1017 | compressed_buffers.emplace_back(std::move(compressed_buffer)); | |
1018 | } | |
1019 | allocator->Deallocate((void*)output_buffer); | |
1020 | } while (remaining > 0); | |
1021 | std::string uncompressed_buffer = ""; | |
1022 | int ret_val = 0; | |
1023 | size_t output_pos; | |
1024 | char* uncompressed_output_buffer = (char*)allocator->Allocate(kBlockSize); | |
1025 | // Uncompress the fragments and concatenate them. | |
1026 | for (int i = 0; i < (int)compressed_buffers.size(); i++) { | |
1027 | // Call uncompress till either the entire input is consumed or the output | |
1028 | // buffer size is equal to the allocated output buffer size. | |
1029 | do { | |
1030 | ret_val = uncompress->Uncompress(compressed_buffers[i].c_str(), | |
1031 | compressed_buffers[i].size(), | |
1032 | uncompressed_output_buffer, &output_pos); | |
1033 | if (output_pos > 0) { | |
1034 | std::string uncompressed_fragment; | |
1035 | uncompressed_fragment.assign(uncompressed_output_buffer, output_pos); | |
1036 | uncompressed_buffer += uncompressed_fragment; | |
1037 | } | |
1038 | } while (ret_val > 0 || output_pos == kBlockSize); | |
1039 | } | |
1040 | allocator->Deallocate((void*)uncompressed_output_buffer); | |
1041 | delete allocator; | |
1042 | delete compress; | |
1043 | delete uncompress; | |
1044 | // The final return value from uncompress() should be 0. | |
1045 | ASSERT_EQ(ret_val, 0); | |
1046 | ASSERT_EQ(input_buffer, uncompressed_buffer); | |
1047 | } | |
1048 | ||
1049 | INSTANTIATE_TEST_CASE_P( | |
1050 | StreamingCompression, StreamingCompressionTest, | |
1051 | ::testing::Combine(::testing::Values(10, 100, 1000, kBlockSize, | |
1052 | kBlockSize * 2), | |
1053 | ::testing::Values(CompressionType::kZSTD))); | |
1054 | ||
7c673cae | 1055 | } // namespace log |
f67539c2 | 1056 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
1057 | |
1058 | int main(int argc, char** argv) { | |
1e59de90 | 1059 | ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); |
7c673cae FG |
1060 | ::testing::InitGoogleTest(&argc, argv); |
1061 | return RUN_ALL_TESTS(); | |
1062 | } |