]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/log_test.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / log_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/log_reader.h"
11 #include "db/log_writer.h"
12 #include "rocksdb/env.h"
13 #include "util/coding.h"
14 #include "util/crc32c.h"
15 #include "util/file_reader_writer.h"
16 #include "util/random.h"
17 #include "util/testharness.h"
18 #include "util/testutil.h"
19
20 namespace rocksdb {
21 namespace log {
22
23 // Construct a string of the specified length made out of the supplied
24 // partial string.
25 static std::string BigString(const std::string& partial_string, size_t n) {
26 std::string result;
27 while (result.size() < n) {
28 result.append(partial_string);
29 }
30 result.resize(n);
31 return result;
32 }
33
34 // Construct a string from a number
35 static std::string NumberString(int n) {
36 char buf[50];
37 snprintf(buf, sizeof(buf), "%d.", n);
38 return std::string(buf);
39 }
40
41 // Return a skewed potentially long string
42 static std::string RandomSkewedString(int i, Random* rnd) {
43 return BigString(NumberString(i), rnd->Skewed(17));
44 }
45
46 class LogTest : public ::testing::TestWithParam<int> {
47 private:
48 class StringSource : public SequentialFile {
49 public:
50 Slice& contents_;
51 bool force_error_;
52 size_t force_error_position_;
53 bool force_eof_;
54 size_t force_eof_position_;
55 bool returned_partial_;
56 explicit StringSource(Slice& contents) :
57 contents_(contents),
58 force_error_(false),
59 force_error_position_(0),
60 force_eof_(false),
61 force_eof_position_(0),
62 returned_partial_(false) { }
63
64 virtual Status Read(size_t n, Slice* result, char* scratch) override {
65 EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
66
67 if (force_error_) {
68 if (force_error_position_ >= n) {
69 force_error_position_ -= n;
70 } else {
71 *result = Slice(contents_.data(), force_error_position_);
72 contents_.remove_prefix(force_error_position_);
73 force_error_ = false;
74 returned_partial_ = true;
75 return Status::Corruption("read error");
76 }
77 }
78
79 if (contents_.size() < n) {
80 n = contents_.size();
81 returned_partial_ = true;
82 }
83
84 if (force_eof_) {
85 if (force_eof_position_ >= n) {
86 force_eof_position_ -= n;
87 } else {
88 force_eof_ = false;
89 n = force_eof_position_;
90 returned_partial_ = true;
91 }
92 }
93
94 // By using scratch we ensure that caller has control over the
95 // lifetime of result.data()
96 memcpy(scratch, contents_.data(), n);
97 *result = Slice(scratch, n);
98
99 contents_.remove_prefix(n);
100 return Status::OK();
101 }
102
103 virtual Status Skip(uint64_t n) override {
104 if (n > contents_.size()) {
105 contents_.clear();
106 return Status::NotFound("in-memory file skipepd past end");
107 }
108
109 contents_.remove_prefix(n);
110
111 return Status::OK();
112 }
113 };
114
115 class ReportCollector : public Reader::Reporter {
116 public:
117 size_t dropped_bytes_;
118 std::string message_;
119
120 ReportCollector() : dropped_bytes_(0) { }
121 virtual void Corruption(size_t bytes, const Status& status) override {
122 dropped_bytes_ += bytes;
123 message_.append(status.ToString());
124 }
125 };
126
127 std::string& dest_contents() {
128 auto dest =
129 dynamic_cast<test::StringSink*>(writer_.file()->writable_file());
130 assert(dest);
131 return dest->contents_;
132 }
133
134 const std::string& dest_contents() const {
135 auto dest =
136 dynamic_cast<const test::StringSink*>(writer_.file()->writable_file());
137 assert(dest);
138 return dest->contents_;
139 }
140
141 void reset_source_contents() {
142 auto src = dynamic_cast<StringSource*>(reader_.file()->file());
143 assert(src);
144 src->contents_ = dest_contents();
145 }
146
147 Slice reader_contents_;
148 unique_ptr<WritableFileWriter> dest_holder_;
149 unique_ptr<SequentialFileReader> source_holder_;
150 ReportCollector report_;
151 Writer writer_;
152 Reader reader_;
153
154 // Record metadata for testing initial offset functionality
155 static size_t initial_offset_record_sizes_[];
156 uint64_t initial_offset_last_record_offsets_[4];
157
158 public:
159 LogTest()
160 : reader_contents_(),
161 dest_holder_(test::GetWritableFileWriter(
162 new test::StringSink(&reader_contents_))),
163 source_holder_(
164 test::GetSequentialFileReader(new StringSource(reader_contents_))),
165 writer_(std::move(dest_holder_), 123, GetParam()),
166 reader_(NULL, std::move(source_holder_), &report_, true /*checksum*/,
167 0 /*initial_offset*/, 123) {
168 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
169 initial_offset_last_record_offsets_[0] = 0;
170 initial_offset_last_record_offsets_[1] = header_size + 10000;
171 initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000);
172 initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) +
173 (2 * log::kBlockSize - 1000) +
174 3 * header_size;
175 }
176
177 Slice* get_reader_contents() { return &reader_contents_; }
178
179 void Write(const std::string& msg) {
180 writer_.AddRecord(Slice(msg));
181 }
182
183 size_t WrittenBytes() const {
184 return dest_contents().size();
185 }
186
187 std::string Read(const WALRecoveryMode wal_recovery_mode =
188 WALRecoveryMode::kTolerateCorruptedTailRecords) {
189 std::string scratch;
190 Slice record;
191 if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) {
192 return record.ToString();
193 } else {
194 return "EOF";
195 }
196 }
197
198 void IncrementByte(int offset, int delta) {
199 dest_contents()[offset] += delta;
200 }
201
202 void SetByte(int offset, char new_byte) {
203 dest_contents()[offset] = new_byte;
204 }
205
206 void ShrinkSize(int bytes) {
207 auto dest =
208 dynamic_cast<test::StringSink*>(writer_.file()->writable_file());
209 assert(dest);
210 dest->Drop(bytes);
211 }
212
213 void FixChecksum(int header_offset, int len, bool recyclable) {
214 // Compute crc of type/len/data
215 int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize;
216 uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6],
217 header_size - 6 + len);
218 crc = crc32c::Mask(crc);
219 EncodeFixed32(&dest_contents()[header_offset], crc);
220 }
221
222 void ForceError(size_t position = 0) {
223 auto src = dynamic_cast<StringSource*>(reader_.file()->file());
224 src->force_error_ = true;
225 src->force_error_position_ = position;
226 }
227
228 size_t DroppedBytes() const {
229 return report_.dropped_bytes_;
230 }
231
232 std::string ReportMessage() const {
233 return report_.message_;
234 }
235
236 void ForceEOF(size_t position = 0) {
237 auto src = dynamic_cast<StringSource*>(reader_.file()->file());
238 src->force_eof_ = true;
239 src->force_eof_position_ = position;
240 }
241
242 void UnmarkEOF() {
243 auto src = dynamic_cast<StringSource*>(reader_.file()->file());
244 src->returned_partial_ = false;
245 reader_.UnmarkEOF();
246 }
247
248 bool IsEOF() {
249 return reader_.IsEOF();
250 }
251
252 // Returns OK iff recorded error message contains "msg"
253 std::string MatchError(const std::string& msg) const {
254 if (report_.message_.find(msg) == std::string::npos) {
255 return report_.message_;
256 } else {
257 return "OK";
258 }
259 }
260
261 void WriteInitialOffsetLog() {
262 for (int i = 0; i < 4; i++) {
263 std::string record(initial_offset_record_sizes_[i],
264 static_cast<char>('a' + i));
265 Write(record);
266 }
267 }
268
269 void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
270 WriteInitialOffsetLog();
271 unique_ptr<SequentialFileReader> file_reader(
272 test::GetSequentialFileReader(new StringSource(reader_contents_)));
273 unique_ptr<Reader> offset_reader(
274 new Reader(NULL, std::move(file_reader), &report_,
275 true /*checksum*/, WrittenBytes() + offset_past_end, 123));
276 Slice record;
277 std::string scratch;
278 ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
279 }
280
281 void CheckInitialOffsetRecord(uint64_t initial_offset,
282 int expected_record_offset) {
283 WriteInitialOffsetLog();
284 unique_ptr<SequentialFileReader> file_reader(
285 test::GetSequentialFileReader(new StringSource(reader_contents_)));
286 unique_ptr<Reader> offset_reader(
287 new Reader(NULL, std::move(file_reader), &report_,
288 true /*checksum*/, initial_offset, 123));
289 Slice record;
290 std::string scratch;
291 ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
292 ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
293 record.size());
294 ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
295 offset_reader->LastRecordOffset());
296 ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
297 }
298
299 };
300
301 size_t LogTest::initial_offset_record_sizes_[] =
302 {10000, // Two sizable records in first block
303 10000,
304 2 * log::kBlockSize - 1000, // Span three blocks
305 1};
306
307 TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
308
309 TEST_P(LogTest, ReadWrite) {
310 Write("foo");
311 Write("bar");
312 Write("");
313 Write("xxxx");
314 ASSERT_EQ("foo", Read());
315 ASSERT_EQ("bar", Read());
316 ASSERT_EQ("", Read());
317 ASSERT_EQ("xxxx", Read());
318 ASSERT_EQ("EOF", Read());
319 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
320 }
321
322 TEST_P(LogTest, ManyBlocks) {
323 for (int i = 0; i < 100000; i++) {
324 Write(NumberString(i));
325 }
326 for (int i = 0; i < 100000; i++) {
327 ASSERT_EQ(NumberString(i), Read());
328 }
329 ASSERT_EQ("EOF", Read());
330 }
331
332 TEST_P(LogTest, Fragmentation) {
333 Write("small");
334 Write(BigString("medium", 50000));
335 Write(BigString("large", 100000));
336 ASSERT_EQ("small", Read());
337 ASSERT_EQ(BigString("medium", 50000), Read());
338 ASSERT_EQ(BigString("large", 100000), Read());
339 ASSERT_EQ("EOF", Read());
340 }
341
342 TEST_P(LogTest, MarginalTrailer) {
343 // Make a trailer that is exactly the same length as an empty record.
344 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
345 const int n = kBlockSize - 2 * header_size;
346 Write(BigString("foo", n));
347 ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
348 Write("");
349 Write("bar");
350 ASSERT_EQ(BigString("foo", n), Read());
351 ASSERT_EQ("", Read());
352 ASSERT_EQ("bar", Read());
353 ASSERT_EQ("EOF", Read());
354 }
355
356 TEST_P(LogTest, MarginalTrailer2) {
357 // Make a trailer that is exactly the same length as an empty record.
358 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
359 const int n = kBlockSize - 2 * header_size;
360 Write(BigString("foo", n));
361 ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
362 Write("bar");
363 ASSERT_EQ(BigString("foo", n), Read());
364 ASSERT_EQ("bar", Read());
365 ASSERT_EQ("EOF", Read());
366 ASSERT_EQ(0U, DroppedBytes());
367 ASSERT_EQ("", ReportMessage());
368 }
369
370 TEST_P(LogTest, ShortTrailer) {
371 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
372 const int n = kBlockSize - 2 * header_size + 4;
373 Write(BigString("foo", n));
374 ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
375 Write("");
376 Write("bar");
377 ASSERT_EQ(BigString("foo", n), Read());
378 ASSERT_EQ("", Read());
379 ASSERT_EQ("bar", Read());
380 ASSERT_EQ("EOF", Read());
381 }
382
383 TEST_P(LogTest, AlignedEof) {
384 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
385 const int n = kBlockSize - 2 * header_size + 4;
386 Write(BigString("foo", n));
387 ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
388 ASSERT_EQ(BigString("foo", n), Read());
389 ASSERT_EQ("EOF", Read());
390 }
391
392 TEST_P(LogTest, RandomRead) {
393 const int N = 500;
394 Random write_rnd(301);
395 for (int i = 0; i < N; i++) {
396 Write(RandomSkewedString(i, &write_rnd));
397 }
398 Random read_rnd(301);
399 for (int i = 0; i < N; i++) {
400 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
401 }
402 ASSERT_EQ("EOF", Read());
403 }
404
405 // Tests of all the error paths in log_reader.cc follow:
406
407 TEST_P(LogTest, ReadError) {
408 Write("foo");
409 ForceError();
410 ASSERT_EQ("EOF", Read());
411 ASSERT_EQ((unsigned int)kBlockSize, DroppedBytes());
412 ASSERT_EQ("OK", MatchError("read error"));
413 }
414
415 TEST_P(LogTest, BadRecordType) {
416 Write("foo");
417 // Type is stored in header[6]
418 IncrementByte(6, 100);
419 FixChecksum(0, 3, false);
420 ASSERT_EQ("EOF", Read());
421 ASSERT_EQ(3U, DroppedBytes());
422 ASSERT_EQ("OK", MatchError("unknown record type"));
423 }
424
425 TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
426 Write("foo");
427 ShrinkSize(4); // Drop all payload as well as a header byte
428 ASSERT_EQ("EOF", Read());
429 // Truncated last record is ignored, not treated as an error
430 ASSERT_EQ(0U, DroppedBytes());
431 ASSERT_EQ("", ReportMessage());
432 }
433
434 TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
435 Write("foo");
436 ShrinkSize(4); // Drop all payload as well as a header byte
437 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
438 // Truncated last record is ignored, not treated as an error
439 ASSERT_GT(DroppedBytes(), 0U);
440 ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
441 }
442
443 TEST_P(LogTest, BadLength) {
444 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
445 const int kPayloadSize = kBlockSize - header_size;
446 Write(BigString("bar", kPayloadSize));
447 Write("foo");
448 // Least significant size byte is stored in header[4].
449 IncrementByte(4, 1);
450 if (!GetParam()) {
451 ASSERT_EQ("foo", Read());
452 ASSERT_EQ(kBlockSize, DroppedBytes());
453 ASSERT_EQ("OK", MatchError("bad record length"));
454 } else {
455 ASSERT_EQ("EOF", Read());
456 }
457 }
458
459 TEST_P(LogTest, BadLengthAtEndIsIgnored) {
460 Write("foo");
461 ShrinkSize(1);
462 ASSERT_EQ("EOF", Read());
463 ASSERT_EQ(0U, DroppedBytes());
464 ASSERT_EQ("", ReportMessage());
465 }
466
467 TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
468 Write("foo");
469 ShrinkSize(1);
470 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
471 ASSERT_GT(DroppedBytes(), 0U);
472 ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
473 }
474
475 TEST_P(LogTest, ChecksumMismatch) {
476 Write("foooooo");
477 IncrementByte(0, 14);
478 ASSERT_EQ("EOF", Read());
479 if (!GetParam()) {
480 ASSERT_EQ(14U, DroppedBytes());
481 ASSERT_EQ("OK", MatchError("checksum mismatch"));
482 } else {
483 ASSERT_EQ(0U, DroppedBytes());
484 ASSERT_EQ("", ReportMessage());
485 }
486 }
487
488 TEST_P(LogTest, UnexpectedMiddleType) {
489 Write("foo");
490 SetByte(6, GetParam() ? kRecyclableMiddleType : kMiddleType);
491 FixChecksum(0, 3, !!GetParam());
492 ASSERT_EQ("EOF", Read());
493 ASSERT_EQ(3U, DroppedBytes());
494 ASSERT_EQ("OK", MatchError("missing start"));
495 }
496
497 TEST_P(LogTest, UnexpectedLastType) {
498 Write("foo");
499 SetByte(6, GetParam() ? kRecyclableLastType : kLastType);
500 FixChecksum(0, 3, !!GetParam());
501 ASSERT_EQ("EOF", Read());
502 ASSERT_EQ(3U, DroppedBytes());
503 ASSERT_EQ("OK", MatchError("missing start"));
504 }
505
506 TEST_P(LogTest, UnexpectedFullType) {
507 Write("foo");
508 Write("bar");
509 SetByte(6, GetParam() ? kRecyclableFirstType : kFirstType);
510 FixChecksum(0, 3, !!GetParam());
511 ASSERT_EQ("bar", Read());
512 ASSERT_EQ("EOF", Read());
513 ASSERT_EQ(3U, DroppedBytes());
514 ASSERT_EQ("OK", MatchError("partial record without end"));
515 }
516
517 TEST_P(LogTest, UnexpectedFirstType) {
518 Write("foo");
519 Write(BigString("bar", 100000));
520 SetByte(6, GetParam() ? kRecyclableFirstType : kFirstType);
521 FixChecksum(0, 3, !!GetParam());
522 ASSERT_EQ(BigString("bar", 100000), Read());
523 ASSERT_EQ("EOF", Read());
524 ASSERT_EQ(3U, DroppedBytes());
525 ASSERT_EQ("OK", MatchError("partial record without end"));
526 }
527
528 TEST_P(LogTest, MissingLastIsIgnored) {
529 Write(BigString("bar", kBlockSize));
530 // Remove the LAST block, including header.
531 ShrinkSize(14);
532 ASSERT_EQ("EOF", Read());
533 ASSERT_EQ("", ReportMessage());
534 ASSERT_EQ(0U, DroppedBytes());
535 }
536
537 TEST_P(LogTest, MissingLastIsNotIgnored) {
538 Write(BigString("bar", kBlockSize));
539 // Remove the LAST block, including header.
540 ShrinkSize(14);
541 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
542 ASSERT_GT(DroppedBytes(), 0U);
543 ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
544 }
545
546 TEST_P(LogTest, PartialLastIsIgnored) {
547 Write(BigString("bar", kBlockSize));
548 // Cause a bad record length in the LAST block.
549 ShrinkSize(1);
550 ASSERT_EQ("EOF", Read());
551 ASSERT_EQ("", ReportMessage());
552 ASSERT_EQ(0U, DroppedBytes());
553 }
554
555 TEST_P(LogTest, PartialLastIsNotIgnored) {
556 Write(BigString("bar", kBlockSize));
557 // Cause a bad record length in the LAST block.
558 ShrinkSize(1);
559 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
560 ASSERT_GT(DroppedBytes(), 0U);
561 ASSERT_EQ("OK", MatchError(
562 "Corruption: truncated headerCorruption: "
563 "error reading trailing data"));
564 }
565
566 TEST_P(LogTest, ErrorJoinsRecords) {
567 // Consider two fragmented records:
568 // first(R1) last(R1) first(R2) last(R2)
569 // where the middle two fragments disappear. We do not want
570 // first(R1),last(R2) to get joined and returned as a valid record.
571
572 // Write records that span two blocks
573 Write(BigString("foo", kBlockSize));
574 Write(BigString("bar", kBlockSize));
575 Write("correct");
576
577 // Wipe the middle block
578 for (unsigned int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
579 SetByte(offset, 'x');
580 }
581
582 if (!GetParam()) {
583 ASSERT_EQ("correct", Read());
584 ASSERT_EQ("EOF", Read());
585 size_t dropped = DroppedBytes();
586 ASSERT_LE(dropped, 2 * kBlockSize + 100);
587 ASSERT_GE(dropped, 2 * kBlockSize);
588 } else {
589 ASSERT_EQ("EOF", Read());
590 }
591 }
592
593 TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
594
595 TEST_P(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
596
597 TEST_P(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
598
599 TEST_P(LogTest, ReadSecondStart) {
600 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
601 CheckInitialOffsetRecord(10000 + header_size, 1);
602 }
603
604 TEST_P(LogTest, ReadThirdOneOff) {
605 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
606 CheckInitialOffsetRecord(10000 + header_size + 1, 2);
607 }
608
609 TEST_P(LogTest, ReadThirdStart) {
610 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
611 CheckInitialOffsetRecord(20000 + 2 * header_size, 2);
612 }
613
614 TEST_P(LogTest, ReadFourthOneOff) {
615 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
616 CheckInitialOffsetRecord(20000 + 2 * header_size + 1, 3);
617 }
618
619 TEST_P(LogTest, ReadFourthFirstBlockTrailer) {
620 CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
621 }
622
623 TEST_P(LogTest, ReadFourthMiddleBlock) {
624 CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
625 }
626
627 TEST_P(LogTest, ReadFourthLastBlock) {
628 CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
629 }
630
631 TEST_P(LogTest, ReadFourthStart) {
632 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
633 CheckInitialOffsetRecord(
634 2 * (header_size + 1000) + (2 * log::kBlockSize - 1000) + 3 * header_size,
635 3);
636 }
637
638 TEST_P(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
639
640 TEST_P(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
641
642 TEST_P(LogTest, ClearEofSingleBlock) {
643 Write("foo");
644 Write("bar");
645 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
646 ForceEOF(3 + header_size + 2);
647 ASSERT_EQ("foo", Read());
648 UnmarkEOF();
649 ASSERT_EQ("bar", Read());
650 ASSERT_TRUE(IsEOF());
651 ASSERT_EQ("EOF", Read());
652 Write("xxx");
653 UnmarkEOF();
654 ASSERT_EQ("xxx", Read());
655 ASSERT_TRUE(IsEOF());
656 }
657
658 TEST_P(LogTest, ClearEofMultiBlock) {
659 size_t num_full_blocks = 5;
660 int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
661 size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
662 Write(BigString("foo", n));
663 Write(BigString("bar", n));
664 ForceEOF(n + num_full_blocks * header_size + header_size + 3);
665 ASSERT_EQ(BigString("foo", n), Read());
666 ASSERT_TRUE(IsEOF());
667 UnmarkEOF();
668 ASSERT_EQ(BigString("bar", n), Read());
669 ASSERT_TRUE(IsEOF());
670 Write(BigString("xxx", n));
671 UnmarkEOF();
672 ASSERT_EQ(BigString("xxx", n), Read());
673 ASSERT_TRUE(IsEOF());
674 }
675
676 TEST_P(LogTest, ClearEofError) {
677 // If an error occurs during Read() in UnmarkEOF(), the records contained
678 // in the buffer should be returned on subsequent calls of ReadRecord()
679 // until no more full records are left, whereafter ReadRecord() should return
680 // false to indicate that it cannot read any further.
681
682 Write("foo");
683 Write("bar");
684 UnmarkEOF();
685 ASSERT_EQ("foo", Read());
686 ASSERT_TRUE(IsEOF());
687 Write("xxx");
688 ForceError(0);
689 UnmarkEOF();
690 ASSERT_EQ("bar", Read());
691 ASSERT_EQ("EOF", Read());
692 }
693
694 TEST_P(LogTest, ClearEofError2) {
695 Write("foo");
696 Write("bar");
697 UnmarkEOF();
698 ASSERT_EQ("foo", Read());
699 Write("xxx");
700 ForceError(3);
701 UnmarkEOF();
702 ASSERT_EQ("bar", Read());
703 ASSERT_EQ("EOF", Read());
704 ASSERT_EQ(3U, DroppedBytes());
705 ASSERT_EQ("OK", MatchError("read error"));
706 }
707
708 TEST_P(LogTest, Recycle) {
709 if (!GetParam()) {
710 return; // test is only valid for recycled logs
711 }
712 Write("foo");
713 Write("bar");
714 Write("baz");
715 Write("bif");
716 Write("blitz");
717 while (get_reader_contents()->size() < log::kBlockSize * 2) {
718 Write("xxxxxxxxxxxxxxxx");
719 }
720 unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
721 new test::OverwritingStringSink(get_reader_contents())));
722 Writer recycle_writer(std::move(dest_holder), 123, true);
723 recycle_writer.AddRecord(Slice("foooo"));
724 recycle_writer.AddRecord(Slice("bar"));
725 ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2);
726 ASSERT_EQ("foooo", Read());
727 ASSERT_EQ("bar", Read());
728 ASSERT_EQ("EOF", Read());
729 }
730
731 INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
732
733 } // namespace log
734 } // namespace rocksdb
735
736 int main(int argc, char** argv) {
737 ::testing::InitGoogleTest(&argc, argv);
738 return RUN_ALL_TESTS();
739 }