]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/log_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / log_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/log_reader.h"
11#include "db/log_writer.h"
f67539c2
TL
12#include "file/sequence_file_reader.h"
13#include "file/writable_file_writer.h"
7c673cae 14#include "rocksdb/env.h"
f67539c2
TL
15#include "test_util/testharness.h"
16#include "test_util/testutil.h"
7c673cae
FG
17#include "util/coding.h"
18#include "util/crc32c.h"
7c673cae 19#include "util/random.h"
1e59de90 20#include "utilities/memory_allocators.h"
7c673cae 21
f67539c2 22namespace ROCKSDB_NAMESPACE {
7c673cae
FG
23namespace log {
24
25// Construct a string of the specified length made out of the supplied
26// partial string.
27static std::string BigString(const std::string& partial_string, size_t n) {
28 std::string result;
29 while (result.size() < n) {
30 result.append(partial_string);
31 }
32 result.resize(n);
33 return result;
34}
35
36// Construct a string from a number
37static std::string NumberString(int n) {
38 char buf[50];
39 snprintf(buf, sizeof(buf), "%d.", n);
40 return std::string(buf);
41}
42
43// Return a skewed potentially long string
44static std::string RandomSkewedString(int i, Random* rnd) {
45 return BigString(NumberString(i), rnd->Skewed(17));
46}
47
494da23a
TL
48// Param type is tuple<int, bool>
49// get<0>(tuple): non-zero if recycling log, zero if regular log
50// get<1>(tuple): true if allow retry after read EOF, false otherwise
1e59de90
TL
51class LogTest
52 : public ::testing::TestWithParam<std::tuple<int, bool, CompressionType>> {
7c673cae 53 private:
1e59de90 54 class StringSource : public FSSequentialFile {
7c673cae
FG
55 public:
56 Slice& contents_;
57 bool force_error_;
58 size_t force_error_position_;
59 bool force_eof_;
60 size_t force_eof_position_;
61 bool returned_partial_;
494da23a
TL
62 bool fail_after_read_partial_;
63 explicit StringSource(Slice& contents, bool fail_after_read_partial)
64 : contents_(contents),
65 force_error_(false),
66 force_error_position_(0),
67 force_eof_(false),
68 force_eof_position_(0),
69 returned_partial_(false),
70 fail_after_read_partial_(fail_after_read_partial) {}
71
1e59de90
TL
72 IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result,
73 char* scratch, IODebugContext* /*dbg*/) override {
494da23a
TL
74 if (fail_after_read_partial_) {
75 EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
76 }
7c673cae
FG
77
78 if (force_error_) {
79 if (force_error_position_ >= n) {
80 force_error_position_ -= n;
81 } else {
82 *result = Slice(contents_.data(), force_error_position_);
83 contents_.remove_prefix(force_error_position_);
84 force_error_ = false;
85 returned_partial_ = true;
1e59de90 86 return IOStatus::Corruption("read error");
7c673cae
FG
87 }
88 }
89
90 if (contents_.size() < n) {
91 n = contents_.size();
92 returned_partial_ = true;
93 }
94
95 if (force_eof_) {
96 if (force_eof_position_ >= n) {
97 force_eof_position_ -= n;
98 } else {
99 force_eof_ = false;
100 n = force_eof_position_;
101 returned_partial_ = true;
102 }
103 }
104
105 // By using scratch we ensure that caller has control over the
106 // lifetime of result.data()
107 memcpy(scratch, contents_.data(), n);
108 *result = Slice(scratch, n);
109
110 contents_.remove_prefix(n);
1e59de90 111 return IOStatus::OK();
7c673cae
FG
112 }
113
1e59de90 114 IOStatus Skip(uint64_t n) override {
7c673cae
FG
115 if (n > contents_.size()) {
116 contents_.clear();
1e59de90 117 return IOStatus::NotFound("in-memory file skipepd past end");
7c673cae
FG
118 }
119
120 contents_.remove_prefix(n);
121
1e59de90 122 return IOStatus::OK();
7c673cae
FG
123 }
124 };
125
126 class ReportCollector : public Reader::Reporter {
127 public:
128 size_t dropped_bytes_;
129 std::string message_;
130
1e59de90 131 ReportCollector() : dropped_bytes_(0) {}
494da23a 132 void Corruption(size_t bytes, const Status& status) override {
7c673cae
FG
133 dropped_bytes_ += bytes;
134 message_.append(status.ToString());
135 }
136 };
137
1e59de90 138 std::string& dest_contents() { return sink_->contents_; }
7c673cae 139
1e59de90 140 const std::string& dest_contents() const { return sink_->contents_; }
7c673cae 141
1e59de90 142 void reset_source_contents() { source_->contents_ = dest_contents(); }
7c673cae
FG
143
144 Slice reader_contents_;
1e59de90
TL
145 test::StringSink* sink_;
146 StringSource* source_;
7c673cae 147 ReportCollector report_;
7c673cae 148
494da23a 149 protected:
1e59de90
TL
150 std::unique_ptr<Writer> writer_;
151 std::unique_ptr<Reader> reader_;
494da23a 152 bool allow_retry_read_;
1e59de90 153 CompressionType compression_type_;
7c673cae
FG
154
155 public:
156 LogTest()
157 : reader_contents_(),
1e59de90
TL
158 sink_(new test::StringSink(&reader_contents_)),
159 source_(new StringSource(reader_contents_, !std::get<1>(GetParam()))),
160 allow_retry_read_(std::get<1>(GetParam())),
161 compression_type_(std::get<2>(GetParam())) {
162 std::unique_ptr<FSWritableFile> sink_holder(sink_);
163 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
164 std::move(sink_holder), "" /* don't care */, FileOptions()));
165 Writer* writer =
166 new Writer(std::move(file_writer), 123, std::get<0>(GetParam()), false,
167 compression_type_);
168 writer_.reset(writer);
169 std::unique_ptr<FSSequentialFile> source_holder(source_);
170 std::unique_ptr<SequentialFileReader> file_reader(
171 new SequentialFileReader(std::move(source_holder), "" /* file name */));
494da23a 172 if (allow_retry_read_) {
1e59de90
TL
173 reader_.reset(new FragmentBufferedReader(nullptr, std::move(file_reader),
174 &report_, true /* checksum */,
175 123 /* log_number */));
494da23a 176 } else {
1e59de90 177 reader_.reset(new Reader(nullptr, std::move(file_reader), &report_,
494da23a
TL
178 true /* checksum */, 123 /* log_number */));
179 }
7c673cae
FG
180 }
181
182 Slice* get_reader_contents() { return &reader_contents_; }
183
184 void Write(const std::string& msg) {
1e59de90 185 ASSERT_OK(writer_->AddRecord(Slice(msg)));
7c673cae
FG
186 }
187
1e59de90 188 size_t WrittenBytes() const { return dest_contents().size(); }
7c673cae
FG
189
190 std::string Read(const WALRecoveryMode wal_recovery_mode =
191 WALRecoveryMode::kTolerateCorruptedTailRecords) {
192 std::string scratch;
193 Slice record;
494da23a 194 bool ret = false;
1e59de90
TL
195 uint64_t record_checksum;
196 ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode,
197 &record_checksum);
494da23a 198 if (ret) {
1e59de90
TL
199 if (!allow_retry_read_) {
200 // allow_retry_read_ means using FragmentBufferedReader which does not
201 // support record checksum yet.
202 uint64_t actual_record_checksum =
203 XXH3_64bits(record.data(), record.size());
204 assert(actual_record_checksum == record_checksum);
205 }
7c673cae
FG
206 return record.ToString();
207 } else {
208 return "EOF";
209 }
210 }
211
11fdf7f2 212 void IncrementByte(int offset, char delta) {
7c673cae
FG
213 dest_contents()[offset] += delta;
214 }
215
216 void SetByte(int offset, char new_byte) {
217 dest_contents()[offset] = new_byte;
218 }
219
1e59de90 220 void ShrinkSize(int bytes) { sink_->Drop(bytes); }
7c673cae
FG
221
222 void FixChecksum(int header_offset, int len, bool recyclable) {
223 // Compute crc of type/len/data
224 int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize;
225 uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6],
226 header_size - 6 + len);
227 crc = crc32c::Mask(crc);
228 EncodeFixed32(&dest_contents()[header_offset], crc);
229 }
230
231 void ForceError(size_t position = 0) {
1e59de90
TL
232 source_->force_error_ = true;
233 source_->force_error_position_ = position;
7c673cae
FG
234 }
235
1e59de90 236 size_t DroppedBytes() const { return report_.dropped_bytes_; }
7c673cae 237
1e59de90 238 std::string ReportMessage() const { return report_.message_; }
7c673cae
FG
239
240 void ForceEOF(size_t position = 0) {
1e59de90
TL
241 source_->force_eof_ = true;
242 source_->force_eof_position_ = position;
7c673cae
FG
243 }
244
245 void UnmarkEOF() {
1e59de90 246 source_->returned_partial_ = false;
494da23a 247 reader_->UnmarkEOF();
7c673cae
FG
248 }
249
494da23a 250 bool IsEOF() { return reader_->IsEOF(); }
7c673cae
FG
251
252 // Returns OK iff recorded error message contains "msg"
253 std::string MatchError(const std::string& msg) const {
254 if (report_.message_.find(msg) == std::string::npos) {
255 return report_.message_;
256 } else {
257 return "OK";
258 }
259 }
7c673cae
FG
260};
261
7c673cae
FG
262TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
263
264TEST_P(LogTest, ReadWrite) {
265 Write("foo");
266 Write("bar");
267 Write("");
268 Write("xxxx");
269 ASSERT_EQ("foo", Read());
270 ASSERT_EQ("bar", Read());
271 ASSERT_EQ("", Read());
272 ASSERT_EQ("xxxx", Read());
273 ASSERT_EQ("EOF", Read());
274 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
275}
276
277TEST_P(LogTest, ManyBlocks) {
278 for (int i = 0; i < 100000; i++) {
279 Write(NumberString(i));
280 }
281 for (int i = 0; i < 100000; i++) {
282 ASSERT_EQ(NumberString(i), Read());
283 }
284 ASSERT_EQ("EOF", Read());
285}
286
287TEST_P(LogTest, Fragmentation) {
288 Write("small");
289 Write(BigString("medium", 50000));
290 Write(BigString("large", 100000));
291 ASSERT_EQ("small", Read());
292 ASSERT_EQ(BigString("medium", 50000), Read());
293 ASSERT_EQ(BigString("large", 100000), Read());
294 ASSERT_EQ("EOF", Read());
295}
296
297TEST_P(LogTest, MarginalTrailer) {
298 // Make a trailer that is exactly the same length as an empty record.
494da23a
TL
299 int header_size =
300 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
301 const int n = kBlockSize - 2 * header_size;
302 Write(BigString("foo", n));
303 ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
304 Write("");
305 Write("bar");
306 ASSERT_EQ(BigString("foo", n), Read());
307 ASSERT_EQ("", Read());
308 ASSERT_EQ("bar", Read());
309 ASSERT_EQ("EOF", Read());
310}
311
312TEST_P(LogTest, MarginalTrailer2) {
313 // Make a trailer that is exactly the same length as an empty record.
494da23a
TL
314 int header_size =
315 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
316 const int n = kBlockSize - 2 * header_size;
317 Write(BigString("foo", n));
318 ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
319 Write("bar");
320 ASSERT_EQ(BigString("foo", n), Read());
321 ASSERT_EQ("bar", Read());
322 ASSERT_EQ("EOF", Read());
323 ASSERT_EQ(0U, DroppedBytes());
324 ASSERT_EQ("", ReportMessage());
325}
326
327TEST_P(LogTest, ShortTrailer) {
494da23a
TL
328 int header_size =
329 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
330 const int n = kBlockSize - 2 * header_size + 4;
331 Write(BigString("foo", n));
332 ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
333 Write("");
334 Write("bar");
335 ASSERT_EQ(BigString("foo", n), Read());
336 ASSERT_EQ("", Read());
337 ASSERT_EQ("bar", Read());
338 ASSERT_EQ("EOF", Read());
339}
340
341TEST_P(LogTest, AlignedEof) {
494da23a
TL
342 int header_size =
343 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
344 const int n = kBlockSize - 2 * header_size + 4;
345 Write(BigString("foo", n));
346 ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
347 ASSERT_EQ(BigString("foo", n), Read());
348 ASSERT_EQ("EOF", Read());
349}
350
351TEST_P(LogTest, RandomRead) {
352 const int N = 500;
353 Random write_rnd(301);
354 for (int i = 0; i < N; i++) {
355 Write(RandomSkewedString(i, &write_rnd));
356 }
357 Random read_rnd(301);
358 for (int i = 0; i < N; i++) {
359 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
360 }
361 ASSERT_EQ("EOF", Read());
362}
363
364// Tests of all the error paths in log_reader.cc follow:
365
366TEST_P(LogTest, ReadError) {
367 Write("foo");
368 ForceError();
369 ASSERT_EQ("EOF", Read());
370 ASSERT_EQ((unsigned int)kBlockSize, DroppedBytes());
371 ASSERT_EQ("OK", MatchError("read error"));
372}
373
374TEST_P(LogTest, BadRecordType) {
375 Write("foo");
376 // Type is stored in header[6]
377 IncrementByte(6, 100);
378 FixChecksum(0, 3, false);
379 ASSERT_EQ("EOF", Read());
380 ASSERT_EQ(3U, DroppedBytes());
381 ASSERT_EQ("OK", MatchError("unknown record type"));
382}
383
384TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
385 Write("foo");
1e59de90 386 ShrinkSize(4); // Drop all payload as well as a header byte
7c673cae
FG
387 ASSERT_EQ("EOF", Read());
388 // Truncated last record is ignored, not treated as an error
389 ASSERT_EQ(0U, DroppedBytes());
390 ASSERT_EQ("", ReportMessage());
391}
392
393TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
494da23a
TL
394 if (allow_retry_read_) {
395 // If read retry is allowed, then truncated trailing record should not
396 // raise an error.
397 return;
398 }
7c673cae
FG
399 Write("foo");
400 ShrinkSize(4); // Drop all payload as well as a header byte
401 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
402 // Truncated last record is ignored, not treated as an error
403 ASSERT_GT(DroppedBytes(), 0U);
404 ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
405}
406
407TEST_P(LogTest, BadLength) {
494da23a
TL
408 if (allow_retry_read_) {
409 // If read retry is allowed, then we should not raise an error when the
410 // record length specified in header is longer than data currently
411 // available. It's possible that the body of the record is not written yet.
412 return;
413 }
414 bool recyclable_log = (std::get<0>(GetParam()) != 0);
415 int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
416 const int kPayloadSize = kBlockSize - header_size;
417 Write(BigString("bar", kPayloadSize));
418 Write("foo");
419 // Least significant size byte is stored in header[4].
420 IncrementByte(4, 1);
494da23a 421 if (!recyclable_log) {
7c673cae
FG
422 ASSERT_EQ("foo", Read());
423 ASSERT_EQ(kBlockSize, DroppedBytes());
424 ASSERT_EQ("OK", MatchError("bad record length"));
425 } else {
426 ASSERT_EQ("EOF", Read());
427 }
428}
429
430TEST_P(LogTest, BadLengthAtEndIsIgnored) {
494da23a
TL
431 if (allow_retry_read_) {
432 // If read retry is allowed, then we should not raise an error when the
433 // record length specified in header is longer than data currently
434 // available. It's possible that the body of the record is not written yet.
435 return;
436 }
7c673cae
FG
437 Write("foo");
438 ShrinkSize(1);
439 ASSERT_EQ("EOF", Read());
440 ASSERT_EQ(0U, DroppedBytes());
441 ASSERT_EQ("", ReportMessage());
442}
443
444TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
494da23a
TL
445 if (allow_retry_read_) {
446 // If read retry is allowed, then we should not raise an error when the
447 // record length specified in header is longer than data currently
448 // available. It's possible that the body of the record is not written yet.
449 return;
450 }
7c673cae
FG
451 Write("foo");
452 ShrinkSize(1);
453 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
454 ASSERT_GT(DroppedBytes(), 0U);
20effc67 455 ASSERT_EQ("OK", MatchError("Corruption: truncated record body"));
7c673cae
FG
456}
457
458TEST_P(LogTest, ChecksumMismatch) {
459 Write("foooooo");
460 IncrementByte(0, 14);
461 ASSERT_EQ("EOF", Read());
494da23a
TL
462 bool recyclable_log = (std::get<0>(GetParam()) != 0);
463 if (!recyclable_log) {
7c673cae
FG
464 ASSERT_EQ(14U, DroppedBytes());
465 ASSERT_EQ("OK", MatchError("checksum mismatch"));
466 } else {
467 ASSERT_EQ(0U, DroppedBytes());
468 ASSERT_EQ("", ReportMessage());
469 }
470}
471
472TEST_P(LogTest, UnexpectedMiddleType) {
473 Write("foo");
494da23a
TL
474 bool recyclable_log = (std::get<0>(GetParam()) != 0);
475 SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType
476 : kMiddleType));
477 FixChecksum(0, 3, !!recyclable_log);
7c673cae
FG
478 ASSERT_EQ("EOF", Read());
479 ASSERT_EQ(3U, DroppedBytes());
480 ASSERT_EQ("OK", MatchError("missing start"));
481}
482
483TEST_P(LogTest, UnexpectedLastType) {
484 Write("foo");
494da23a
TL
485 bool recyclable_log = (std::get<0>(GetParam()) != 0);
486 SetByte(6,
487 static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType));
488 FixChecksum(0, 3, !!recyclable_log);
7c673cae
FG
489 ASSERT_EQ("EOF", Read());
490 ASSERT_EQ(3U, DroppedBytes());
491 ASSERT_EQ("OK", MatchError("missing start"));
492}
493
494TEST_P(LogTest, UnexpectedFullType) {
495 Write("foo");
496 Write("bar");
494da23a
TL
497 bool recyclable_log = (std::get<0>(GetParam()) != 0);
498 SetByte(
499 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
500 FixChecksum(0, 3, !!recyclable_log);
7c673cae
FG
501 ASSERT_EQ("bar", Read());
502 ASSERT_EQ("EOF", Read());
503 ASSERT_EQ(3U, DroppedBytes());
504 ASSERT_EQ("OK", MatchError("partial record without end"));
505}
506
507TEST_P(LogTest, UnexpectedFirstType) {
508 Write("foo");
509 Write(BigString("bar", 100000));
494da23a
TL
510 bool recyclable_log = (std::get<0>(GetParam()) != 0);
511 SetByte(
512 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
513 FixChecksum(0, 3, !!recyclable_log);
7c673cae
FG
514 ASSERT_EQ(BigString("bar", 100000), Read());
515 ASSERT_EQ("EOF", Read());
516 ASSERT_EQ(3U, DroppedBytes());
517 ASSERT_EQ("OK", MatchError("partial record without end"));
518}
519
520TEST_P(LogTest, MissingLastIsIgnored) {
521 Write(BigString("bar", kBlockSize));
522 // Remove the LAST block, including header.
523 ShrinkSize(14);
524 ASSERT_EQ("EOF", Read());
525 ASSERT_EQ("", ReportMessage());
526 ASSERT_EQ(0U, DroppedBytes());
527}
528
529TEST_P(LogTest, MissingLastIsNotIgnored) {
494da23a
TL
530 if (allow_retry_read_) {
531 // If read retry is allowed, then truncated trailing record should not
532 // raise an error.
533 return;
534 }
7c673cae
FG
535 Write(BigString("bar", kBlockSize));
536 // Remove the LAST block, including header.
537 ShrinkSize(14);
538 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
539 ASSERT_GT(DroppedBytes(), 0U);
540 ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
541}
542
543TEST_P(LogTest, PartialLastIsIgnored) {
544 Write(BigString("bar", kBlockSize));
545 // Cause a bad record length in the LAST block.
546 ShrinkSize(1);
547 ASSERT_EQ("EOF", Read());
548 ASSERT_EQ("", ReportMessage());
549 ASSERT_EQ(0U, DroppedBytes());
550}
551
552TEST_P(LogTest, PartialLastIsNotIgnored) {
494da23a
TL
553 if (allow_retry_read_) {
554 // If read retry is allowed, then truncated trailing record should not
555 // raise an error.
556 return;
557 }
7c673cae
FG
558 Write(BigString("bar", kBlockSize));
559 // Cause a bad record length in the LAST block.
560 ShrinkSize(1);
561 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
562 ASSERT_GT(DroppedBytes(), 0U);
20effc67 563 ASSERT_EQ("OK", MatchError("Corruption: truncated record body"));
7c673cae
FG
564}
565
566TEST_P(LogTest, ErrorJoinsRecords) {
567 // Consider two fragmented records:
568 // first(R1) last(R1) first(R2) last(R2)
569 // where the middle two fragments disappear. We do not want
570 // first(R1),last(R2) to get joined and returned as a valid record.
571
572 // Write records that span two blocks
573 Write(BigString("foo", kBlockSize));
574 Write(BigString("bar", kBlockSize));
575 Write("correct");
576
577 // Wipe the middle block
1e59de90 578 for (unsigned int offset = kBlockSize; offset < 2 * kBlockSize; offset++) {
7c673cae
FG
579 SetByte(offset, 'x');
580 }
581
494da23a
TL
582 bool recyclable_log = (std::get<0>(GetParam()) != 0);
583 if (!recyclable_log) {
7c673cae
FG
584 ASSERT_EQ("correct", Read());
585 ASSERT_EQ("EOF", Read());
586 size_t dropped = DroppedBytes();
587 ASSERT_LE(dropped, 2 * kBlockSize + 100);
588 ASSERT_GE(dropped, 2 * kBlockSize);
589 } else {
590 ASSERT_EQ("EOF", Read());
591 }
592}
593
7c673cae
FG
594TEST_P(LogTest, ClearEofSingleBlock) {
595 Write("foo");
596 Write("bar");
494da23a
TL
597 bool recyclable_log = (std::get<0>(GetParam()) != 0);
598 int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
599 ForceEOF(3 + header_size + 2);
600 ASSERT_EQ("foo", Read());
601 UnmarkEOF();
602 ASSERT_EQ("bar", Read());
603 ASSERT_TRUE(IsEOF());
604 ASSERT_EQ("EOF", Read());
605 Write("xxx");
606 UnmarkEOF();
607 ASSERT_EQ("xxx", Read());
608 ASSERT_TRUE(IsEOF());
609}
610
611TEST_P(LogTest, ClearEofMultiBlock) {
612 size_t num_full_blocks = 5;
494da23a
TL
613 bool recyclable_log = (std::get<0>(GetParam()) != 0);
614 int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
615 size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
616 Write(BigString("foo", n));
617 Write(BigString("bar", n));
618 ForceEOF(n + num_full_blocks * header_size + header_size + 3);
619 ASSERT_EQ(BigString("foo", n), Read());
620 ASSERT_TRUE(IsEOF());
621 UnmarkEOF();
622 ASSERT_EQ(BigString("bar", n), Read());
623 ASSERT_TRUE(IsEOF());
624 Write(BigString("xxx", n));
625 UnmarkEOF();
626 ASSERT_EQ(BigString("xxx", n), Read());
627 ASSERT_TRUE(IsEOF());
628}
629
630TEST_P(LogTest, ClearEofError) {
631 // If an error occurs during Read() in UnmarkEOF(), the records contained
632 // in the buffer should be returned on subsequent calls of ReadRecord()
633 // until no more full records are left, whereafter ReadRecord() should return
634 // false to indicate that it cannot read any further.
635
636 Write("foo");
637 Write("bar");
638 UnmarkEOF();
639 ASSERT_EQ("foo", Read());
640 ASSERT_TRUE(IsEOF());
641 Write("xxx");
642 ForceError(0);
643 UnmarkEOF();
644 ASSERT_EQ("bar", Read());
645 ASSERT_EQ("EOF", Read());
646}
647
648TEST_P(LogTest, ClearEofError2) {
649 Write("foo");
650 Write("bar");
651 UnmarkEOF();
652 ASSERT_EQ("foo", Read());
653 Write("xxx");
654 ForceError(3);
655 UnmarkEOF();
656 ASSERT_EQ("bar", Read());
657 ASSERT_EQ("EOF", Read());
658 ASSERT_EQ(3U, DroppedBytes());
659 ASSERT_EQ("OK", MatchError("read error"));
660}
661
662TEST_P(LogTest, Recycle) {
494da23a
TL
663 bool recyclable_log = (std::get<0>(GetParam()) != 0);
664 if (!recyclable_log) {
7c673cae
FG
665 return; // test is only valid for recycled logs
666 }
667 Write("foo");
668 Write("bar");
669 Write("baz");
670 Write("bif");
671 Write("blitz");
672 while (get_reader_contents()->size() < log::kBlockSize * 2) {
673 Write("xxxxxxxxxxxxxxxx");
674 }
1e59de90
TL
675 std::unique_ptr<FSWritableFile> sink(
676 new test::OverwritingStringSink(get_reader_contents()));
677 std::unique_ptr<WritableFileWriter> dest_holder(new WritableFileWriter(
678 std::move(sink), "" /* don't care */, FileOptions()));
7c673cae 679 Writer recycle_writer(std::move(dest_holder), 123, true);
1e59de90
TL
680 ASSERT_OK(recycle_writer.AddRecord(Slice("foooo")));
681 ASSERT_OK(recycle_writer.AddRecord(Slice("bar")));
7c673cae
FG
682 ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2);
683 ASSERT_EQ("foooo", Read());
684 ASSERT_EQ("bar", Read());
685 ASSERT_EQ("EOF", Read());
686}
687
1e59de90
TL
688// Do NOT enable compression for this instantiation.
689INSTANTIATE_TEST_CASE_P(
690 Log, LogTest,
691 ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
692 ::testing::Values(CompressionType::kNoCompression)));
494da23a
TL
693
694class RetriableLogTest : public ::testing::TestWithParam<int> {
695 private:
696 class ReportCollector : public Reader::Reporter {
697 public:
698 size_t dropped_bytes_;
699 std::string message_;
700
701 ReportCollector() : dropped_bytes_(0) {}
702 void Corruption(size_t bytes, const Status& status) override {
703 dropped_bytes_ += bytes;
704 message_.append(status.ToString());
705 }
706 };
707
708 Slice contents_;
1e59de90 709 test::StringSink* sink_;
494da23a
TL
710 std::unique_ptr<Writer> log_writer_;
711 Env* env_;
494da23a
TL
712 const std::string test_dir_;
713 const std::string log_file_;
714 std::unique_ptr<WritableFileWriter> writer_;
715 std::unique_ptr<SequentialFileReader> reader_;
716 ReportCollector report_;
717 std::unique_ptr<FragmentBufferedReader> log_reader_;
718
719 public:
720 RetriableLogTest()
721 : contents_(),
1e59de90 722 sink_(new test::StringSink(&contents_)),
494da23a
TL
723 log_writer_(nullptr),
724 env_(Env::Default()),
725 test_dir_(test::PerThreadDBPath("retriable_log_test")),
726 log_file_(test_dir_ + "/log"),
727 writer_(nullptr),
728 reader_(nullptr),
1e59de90
TL
729 log_reader_(nullptr) {
730 std::unique_ptr<FSWritableFile> sink_holder(sink_);
731 std::unique_ptr<WritableFileWriter> wfw(new WritableFileWriter(
732 std::move(sink_holder), "" /* file name */, FileOptions()));
733 log_writer_.reset(new Writer(std::move(wfw), 123, GetParam()));
734 }
494da23a
TL
735
736 Status SetupTestEnv() {
494da23a 737 Status s;
1e59de90
TL
738 FileOptions fopts;
739 auto fs = env_->GetFileSystem();
740 s = fs->CreateDirIfMissing(test_dir_, IOOptions(), nullptr);
741 std::unique_ptr<FSWritableFile> writable_file;
494da23a 742 if (s.ok()) {
1e59de90 743 s = fs->NewWritableFile(log_file_, fopts, &writable_file, nullptr);
494da23a
TL
744 }
745 if (s.ok()) {
1e59de90
TL
746 writer_.reset(
747 new WritableFileWriter(std::move(writable_file), log_file_, fopts));
748 EXPECT_NE(writer_, nullptr);
494da23a 749 }
1e59de90 750 std::unique_ptr<FSSequentialFile> seq_file;
494da23a 751 if (s.ok()) {
1e59de90 752 s = fs->NewSequentialFile(log_file_, fopts, &seq_file, nullptr);
494da23a
TL
753 }
754 if (s.ok()) {
1e59de90
TL
755 reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_));
756 EXPECT_NE(reader_, nullptr);
494da23a
TL
757 log_reader_.reset(new FragmentBufferedReader(
758 nullptr, std::move(reader_), &report_, true /* checksum */,
759 123 /* log_number */));
1e59de90 760 EXPECT_NE(log_reader_, nullptr);
494da23a
TL
761 }
762 return s;
763 }
764
1e59de90 765 std::string contents() { return sink_->contents_; }
494da23a 766
1e59de90
TL
767 void Encode(const std::string& msg) {
768 ASSERT_OK(log_writer_->AddRecord(Slice(msg)));
769 }
494da23a
TL
770
771 void Write(const Slice& data) {
1e59de90
TL
772 ASSERT_OK(writer_->Append(data));
773 ASSERT_OK(writer_->Sync(true));
494da23a
TL
774 }
775
776 bool TryRead(std::string* result) {
777 assert(result != nullptr);
778 result->clear();
779 std::string scratch;
780 Slice record;
781 bool r = log_reader_->ReadRecord(&record, &scratch);
782 if (r) {
783 result->assign(record.data(), record.size());
784 return true;
785 } else {
786 return false;
787 }
788 }
789};
790
791TEST_P(RetriableLogTest, TailLog_PartialHeader) {
792 ASSERT_OK(SetupTestEnv());
793 std::vector<int> remaining_bytes_in_last_record;
794 size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
795 bool eof = false;
796 SyncPoint::GetInstance()->DisableProcessing();
797 SyncPoint::GetInstance()->LoadDependency(
798 {{"RetriableLogTest::TailLog:AfterPart1",
799 "RetriableLogTest::TailLog:BeforeReadRecord"},
800 {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
801 "RetriableLogTest::TailLog:BeforePart2"}});
802 SyncPoint::GetInstance()->ClearAllCallBacks();
803 SyncPoint::GetInstance()->SetCallBack(
804 "FragmentBufferedLogReader::TryReadMore:FirstEOF",
805 [&](void* /*arg*/) { eof = true; });
806 SyncPoint::GetInstance()->EnableProcessing();
807
808 size_t delta = header_size - 1;
809 port::Thread log_writer_thread([&]() {
810 size_t old_sz = contents().size();
811 Encode("foo");
812 size_t new_sz = contents().size();
813 std::string part1 = contents().substr(old_sz, delta);
814 std::string part2 =
815 contents().substr(old_sz + delta, new_sz - old_sz - delta);
816 Write(Slice(part1));
817 TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
818 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
819 Write(Slice(part2));
820 });
821
822 std::string record;
823 port::Thread log_reader_thread([&]() {
824 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
825 while (!TryRead(&record)) {
826 }
827 });
828 log_reader_thread.join();
829 log_writer_thread.join();
830 ASSERT_EQ("foo", record);
831 ASSERT_TRUE(eof);
832}
833
834TEST_P(RetriableLogTest, TailLog_FullHeader) {
835 ASSERT_OK(SetupTestEnv());
836 std::vector<int> remaining_bytes_in_last_record;
837 size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
838 bool eof = false;
839 SyncPoint::GetInstance()->DisableProcessing();
840 SyncPoint::GetInstance()->LoadDependency(
841 {{"RetriableLogTest::TailLog:AfterPart1",
842 "RetriableLogTest::TailLog:BeforeReadRecord"},
843 {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
844 "RetriableLogTest::TailLog:BeforePart2"}});
845 SyncPoint::GetInstance()->ClearAllCallBacks();
846 SyncPoint::GetInstance()->SetCallBack(
847 "FragmentBufferedLogReader::TryReadMore:FirstEOF",
848 [&](void* /*arg*/) { eof = true; });
849 SyncPoint::GetInstance()->EnableProcessing();
850
851 size_t delta = header_size + 1;
852 port::Thread log_writer_thread([&]() {
853 size_t old_sz = contents().size();
854 Encode("foo");
855 size_t new_sz = contents().size();
856 std::string part1 = contents().substr(old_sz, delta);
857 std::string part2 =
858 contents().substr(old_sz + delta, new_sz - old_sz - delta);
859 Write(Slice(part1));
860 TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
861 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
862 Write(Slice(part2));
863 ASSERT_TRUE(eof);
864 });
865
866 std::string record;
867 port::Thread log_reader_thread([&]() {
868 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
869 while (!TryRead(&record)) {
870 }
871 });
872 log_reader_thread.join();
873 log_writer_thread.join();
874 ASSERT_EQ("foo", record);
875}
876
877TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
878 // Clear all sync point callbacks even if this test does not use sync point.
879 // It is necessary, otherwise the execute of this test may hit a sync point
880 // with which a callback is registered. The registered callback may access
881 // some dead variable, causing segfault.
882 SyncPoint::GetInstance()->DisableProcessing();
883 SyncPoint::GetInstance()->ClearAllCallBacks();
884 ASSERT_OK(SetupTestEnv());
885 size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
886 size_t delta = header_size - 1;
887 size_t old_sz = contents().size();
888 Encode("foo-bar");
889 size_t new_sz = contents().size();
890 std::string part1 = contents().substr(old_sz, delta);
891 std::string part2 =
892 contents().substr(old_sz + delta, new_sz - old_sz - delta);
893 Write(Slice(part1));
894 std::string record;
895 ASSERT_FALSE(TryRead(&record));
896 ASSERT_TRUE(record.empty());
897 Write(Slice(part2));
898 ASSERT_TRUE(TryRead(&record));
899 ASSERT_EQ("foo-bar", record);
900}
901
902INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
7c673cae 903
1e59de90
TL
904class CompressionLogTest : public LogTest {
905 public:
906 Status SetupTestEnv() { return writer_->AddCompressionTypeRecord(); }
907};
908
909TEST_P(CompressionLogTest, Empty) {
910 CompressionType compression_type = std::get<2>(GetParam());
911 if (!StreamingCompressionTypeSupported(compression_type)) {
912 ROCKSDB_GTEST_SKIP("Test requires support for compression type");
913 return;
914 }
915 ASSERT_OK(SetupTestEnv());
916 const bool compression_enabled =
917 std::get<2>(GetParam()) == kNoCompression ? false : true;
918 // If WAL compression is enabled, a record is added for the compression type
919 const int compression_record_size = compression_enabled ? kHeaderSize + 4 : 0;
920 ASSERT_EQ(compression_record_size, WrittenBytes());
921 ASSERT_EQ("EOF", Read());
922}
923
924TEST_P(CompressionLogTest, ReadWrite) {
925 CompressionType compression_type = std::get<2>(GetParam());
926 if (!StreamingCompressionTypeSupported(compression_type)) {
927 ROCKSDB_GTEST_SKIP("Test requires support for compression type");
928 return;
929 }
930 ASSERT_OK(SetupTestEnv());
931 Write("foo");
932 Write("bar");
933 Write("");
934 Write("xxxx");
935 ASSERT_EQ("foo", Read());
936 ASSERT_EQ("bar", Read());
937 ASSERT_EQ("", Read());
938 ASSERT_EQ("xxxx", Read());
939 ASSERT_EQ("EOF", Read());
940 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
941}
942
943TEST_P(CompressionLogTest, ManyBlocks) {
944 CompressionType compression_type = std::get<2>(GetParam());
945 if (!StreamingCompressionTypeSupported(compression_type)) {
946 ROCKSDB_GTEST_SKIP("Test requires support for compression type");
947 return;
948 }
949 ASSERT_OK(SetupTestEnv());
950 for (int i = 0; i < 100000; i++) {
951 Write(NumberString(i));
952 }
953 for (int i = 0; i < 100000; i++) {
954 ASSERT_EQ(NumberString(i), Read());
955 }
956 ASSERT_EQ("EOF", Read());
957}
958
959TEST_P(CompressionLogTest, Fragmentation) {
960 CompressionType compression_type = std::get<2>(GetParam());
961 if (!StreamingCompressionTypeSupported(compression_type)) {
962 ROCKSDB_GTEST_SKIP("Test requires support for compression type");
963 return;
964 }
965 ASSERT_OK(SetupTestEnv());
966 Random rnd(301);
967 const std::vector<std::string> wal_entries = {
968 "small",
969 rnd.RandomBinaryString(3 * kBlockSize / 2), // Spans into block 2
970 rnd.RandomBinaryString(3 * kBlockSize), // Spans into block 5
971 };
972 for (const std::string& wal_entry : wal_entries) {
973 Write(wal_entry);
974 }
975
976 for (const std::string& wal_entry : wal_entries) {
977 ASSERT_EQ(wal_entry, Read());
978 }
979 ASSERT_EQ("EOF", Read());
980}
981
982INSTANTIATE_TEST_CASE_P(
983 Compression, CompressionLogTest,
984 ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
985 ::testing::Values(CompressionType::kNoCompression,
986 CompressionType::kZSTD)));
987
988class StreamingCompressionTest
989 : public ::testing::TestWithParam<std::tuple<int, CompressionType>> {};
990
991TEST_P(StreamingCompressionTest, Basic) {
992 size_t input_size = std::get<0>(GetParam());
993 CompressionType compression_type = std::get<1>(GetParam());
994 if (!StreamingCompressionTypeSupported(compression_type)) {
995 ROCKSDB_GTEST_SKIP("Test requires support for compression type");
996 return;
997 }
998 CompressionOptions opts;
999 constexpr uint32_t compression_format_version = 2;
1000 StreamingCompress* compress = StreamingCompress::Create(
1001 compression_type, opts, compression_format_version, kBlockSize);
1002 StreamingUncompress* uncompress = StreamingUncompress::Create(
1003 compression_type, compression_format_version, kBlockSize);
1004 MemoryAllocator* allocator = new DefaultMemoryAllocator();
1005 std::string input_buffer = BigString("abc", input_size);
1006 std::vector<std::string> compressed_buffers;
1007 size_t remaining;
1008 // Call compress till the entire input is consumed
1009 do {
1010 char* output_buffer = (char*)allocator->Allocate(kBlockSize);
1011 size_t output_pos;
1012 remaining = compress->Compress(input_buffer.c_str(), input_size,
1013 output_buffer, &output_pos);
1014 if (output_pos > 0) {
1015 std::string compressed_buffer;
1016 compressed_buffer.assign(output_buffer, output_pos);
1017 compressed_buffers.emplace_back(std::move(compressed_buffer));
1018 }
1019 allocator->Deallocate((void*)output_buffer);
1020 } while (remaining > 0);
1021 std::string uncompressed_buffer = "";
1022 int ret_val = 0;
1023 size_t output_pos;
1024 char* uncompressed_output_buffer = (char*)allocator->Allocate(kBlockSize);
1025 // Uncompress the fragments and concatenate them.
1026 for (int i = 0; i < (int)compressed_buffers.size(); i++) {
1027 // Call uncompress till either the entire input is consumed or the output
1028 // buffer size is equal to the allocated output buffer size.
1029 do {
1030 ret_val = uncompress->Uncompress(compressed_buffers[i].c_str(),
1031 compressed_buffers[i].size(),
1032 uncompressed_output_buffer, &output_pos);
1033 if (output_pos > 0) {
1034 std::string uncompressed_fragment;
1035 uncompressed_fragment.assign(uncompressed_output_buffer, output_pos);
1036 uncompressed_buffer += uncompressed_fragment;
1037 }
1038 } while (ret_val > 0 || output_pos == kBlockSize);
1039 }
1040 allocator->Deallocate((void*)uncompressed_output_buffer);
1041 delete allocator;
1042 delete compress;
1043 delete uncompress;
1044 // The final return value from uncompress() should be 0.
1045 ASSERT_EQ(ret_val, 0);
1046 ASSERT_EQ(input_buffer, uncompressed_buffer);
1047}
1048
1049INSTANTIATE_TEST_CASE_P(
1050 StreamingCompression, StreamingCompressionTest,
1051 ::testing::Combine(::testing::Values(10, 100, 1000, kBlockSize,
1052 kBlockSize * 2),
1053 ::testing::Values(CompressionType::kZSTD)));
1054
7c673cae 1055} // namespace log
f67539c2 1056} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
1057
1058int main(int argc, char** argv) {
1e59de90 1059 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
7c673cae
FG
1060 ::testing::InitGoogleTest(&argc, argv);
1061 return RUN_ALL_TESTS();
1062}