]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/log_test.cc
fd237b030e7366e6714d0057e5a426cf2cbc074d
[ceph.git] / ceph / src / rocksdb / db / log_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/log_reader.h"
11 #include "db/log_writer.h"
12 #include "rocksdb/env.h"
13 #include "util/coding.h"
14 #include "util/crc32c.h"
15 #include "util/file_reader_writer.h"
16 #include "util/random.h"
17 #include "util/testharness.h"
18 #include "util/testutil.h"
19
20 namespace rocksdb {
21 namespace log {
22
23 // Construct a string of the specified length made out of the supplied
24 // partial string.
25 static std::string BigString(const std::string& partial_string, size_t n) {
26 std::string result;
27 while (result.size() < n) {
28 result.append(partial_string);
29 }
30 result.resize(n);
31 return result;
32 }
33
34 // Construct a string from a number
35 static std::string NumberString(int n) {
36 char buf[50];
37 snprintf(buf, sizeof(buf), "%d.", n);
38 return std::string(buf);
39 }
40
41 // Return a skewed potentially long string
42 static std::string RandomSkewedString(int i, Random* rnd) {
43 return BigString(NumberString(i), rnd->Skewed(17));
44 }
45
46 // Param type is tuple<int, bool>
47 // get<0>(tuple): non-zero if recycling log, zero if regular log
48 // get<1>(tuple): true if allow retry after read EOF, false otherwise
49 class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
50 private:
51 class StringSource : public SequentialFile {
52 public:
53 Slice& contents_;
54 bool force_error_;
55 size_t force_error_position_;
56 bool force_eof_;
57 size_t force_eof_position_;
58 bool returned_partial_;
59 bool fail_after_read_partial_;
60 explicit StringSource(Slice& contents, bool fail_after_read_partial)
61 : contents_(contents),
62 force_error_(false),
63 force_error_position_(0),
64 force_eof_(false),
65 force_eof_position_(0),
66 returned_partial_(false),
67 fail_after_read_partial_(fail_after_read_partial) {}
68
69 Status Read(size_t n, Slice* result, char* scratch) override {
70 if (fail_after_read_partial_) {
71 EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
72 }
73
74 if (force_error_) {
75 if (force_error_position_ >= n) {
76 force_error_position_ -= n;
77 } else {
78 *result = Slice(contents_.data(), force_error_position_);
79 contents_.remove_prefix(force_error_position_);
80 force_error_ = false;
81 returned_partial_ = true;
82 return Status::Corruption("read error");
83 }
84 }
85
86 if (contents_.size() < n) {
87 n = contents_.size();
88 returned_partial_ = true;
89 }
90
91 if (force_eof_) {
92 if (force_eof_position_ >= n) {
93 force_eof_position_ -= n;
94 } else {
95 force_eof_ = false;
96 n = force_eof_position_;
97 returned_partial_ = true;
98 }
99 }
100
101 // By using scratch we ensure that caller has control over the
102 // lifetime of result.data()
103 memcpy(scratch, contents_.data(), n);
104 *result = Slice(scratch, n);
105
106 contents_.remove_prefix(n);
107 return Status::OK();
108 }
109
110 Status Skip(uint64_t n) override {
111 if (n > contents_.size()) {
112 contents_.clear();
113 return Status::NotFound("in-memory file skipepd past end");
114 }
115
116 contents_.remove_prefix(n);
117
118 return Status::OK();
119 }
120 };
121
122 class ReportCollector : public Reader::Reporter {
123 public:
124 size_t dropped_bytes_;
125 std::string message_;
126
127 ReportCollector() : dropped_bytes_(0) { }
128 void Corruption(size_t bytes, const Status& status) override {
129 dropped_bytes_ += bytes;
130 message_.append(status.ToString());
131 }
132 };
133
134 std::string& dest_contents() {
135 auto dest =
136 dynamic_cast<test::StringSink*>(writer_.file()->writable_file());
137 assert(dest);
138 return dest->contents_;
139 }
140
141 const std::string& dest_contents() const {
142 auto dest =
143 dynamic_cast<const test::StringSink*>(writer_.file()->writable_file());
144 assert(dest);
145 return dest->contents_;
146 }
147
148 void reset_source_contents() {
149 auto src = dynamic_cast<StringSource*>(reader_->file()->file());
150 assert(src);
151 src->contents_ = dest_contents();
152 }
153
154 Slice reader_contents_;
155 std::unique_ptr<WritableFileWriter> dest_holder_;
156 std::unique_ptr<SequentialFileReader> source_holder_;
157 ReportCollector report_;
158 Writer writer_;
159 std::unique_ptr<Reader> reader_;
160
161 protected:
162 bool allow_retry_read_;
163
164 public:
165 LogTest()
166 : reader_contents_(),
167 dest_holder_(test::GetWritableFileWriter(
168 new test::StringSink(&reader_contents_), "" /* don't care */)),
169 source_holder_(test::GetSequentialFileReader(
170 new StringSource(reader_contents_, !std::get<1>(GetParam())),
171 "" /* file name */)),
172 writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())),
173 allow_retry_read_(std::get<1>(GetParam())) {
174 if (allow_retry_read_) {
175 reader_.reset(new FragmentBufferedReader(
176 nullptr, std::move(source_holder_), &report_, true /* checksum */,
177 123 /* log_number */));
178 } else {
179 reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_,
180 true /* checksum */, 123 /* log_number */));
181 }
182 }
183
184 Slice* get_reader_contents() { return &reader_contents_; }
185
186 void Write(const std::string& msg) {
187 writer_.AddRecord(Slice(msg));
188 }
189
190 size_t WrittenBytes() const {
191 return dest_contents().size();
192 }
193
194 std::string Read(const WALRecoveryMode wal_recovery_mode =
195 WALRecoveryMode::kTolerateCorruptedTailRecords) {
196 std::string scratch;
197 Slice record;
198 bool ret = false;
199 ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode);
200 if (ret) {
201 return record.ToString();
202 } else {
203 return "EOF";
204 }
205 }
206
207 void IncrementByte(int offset, char delta) {
208 dest_contents()[offset] += delta;
209 }
210
211 void SetByte(int offset, char new_byte) {
212 dest_contents()[offset] = new_byte;
213 }
214
215 void ShrinkSize(int bytes) {
216 auto dest =
217 dynamic_cast<test::StringSink*>(writer_.file()->writable_file());
218 assert(dest);
219 dest->Drop(bytes);
220 }
221
222 void FixChecksum(int header_offset, int len, bool recyclable) {
223 // Compute crc of type/len/data
224 int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize;
225 uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6],
226 header_size - 6 + len);
227 crc = crc32c::Mask(crc);
228 EncodeFixed32(&dest_contents()[header_offset], crc);
229 }
230
231 void ForceError(size_t position = 0) {
232 auto src = dynamic_cast<StringSource*>(reader_->file()->file());
233 src->force_error_ = true;
234 src->force_error_position_ = position;
235 }
236
237 size_t DroppedBytes() const {
238 return report_.dropped_bytes_;
239 }
240
241 std::string ReportMessage() const {
242 return report_.message_;
243 }
244
245 void ForceEOF(size_t position = 0) {
246 auto src = dynamic_cast<StringSource*>(reader_->file()->file());
247 src->force_eof_ = true;
248 src->force_eof_position_ = position;
249 }
250
251 void UnmarkEOF() {
252 auto src = dynamic_cast<StringSource*>(reader_->file()->file());
253 src->returned_partial_ = false;
254 reader_->UnmarkEOF();
255 }
256
257 bool IsEOF() { return reader_->IsEOF(); }
258
259 // Returns OK iff recorded error message contains "msg"
260 std::string MatchError(const std::string& msg) const {
261 if (report_.message_.find(msg) == std::string::npos) {
262 return report_.message_;
263 } else {
264 return "OK";
265 }
266 }
267 };
268
269 TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
270
271 TEST_P(LogTest, ReadWrite) {
272 Write("foo");
273 Write("bar");
274 Write("");
275 Write("xxxx");
276 ASSERT_EQ("foo", Read());
277 ASSERT_EQ("bar", Read());
278 ASSERT_EQ("", Read());
279 ASSERT_EQ("xxxx", Read());
280 ASSERT_EQ("EOF", Read());
281 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
282 }
283
284 TEST_P(LogTest, ManyBlocks) {
285 for (int i = 0; i < 100000; i++) {
286 Write(NumberString(i));
287 }
288 for (int i = 0; i < 100000; i++) {
289 ASSERT_EQ(NumberString(i), Read());
290 }
291 ASSERT_EQ("EOF", Read());
292 }
293
294 TEST_P(LogTest, Fragmentation) {
295 Write("small");
296 Write(BigString("medium", 50000));
297 Write(BigString("large", 100000));
298 ASSERT_EQ("small", Read());
299 ASSERT_EQ(BigString("medium", 50000), Read());
300 ASSERT_EQ(BigString("large", 100000), Read());
301 ASSERT_EQ("EOF", Read());
302 }
303
304 TEST_P(LogTest, MarginalTrailer) {
305 // Make a trailer that is exactly the same length as an empty record.
306 int header_size =
307 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
308 const int n = kBlockSize - 2 * header_size;
309 Write(BigString("foo", n));
310 ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
311 Write("");
312 Write("bar");
313 ASSERT_EQ(BigString("foo", n), Read());
314 ASSERT_EQ("", Read());
315 ASSERT_EQ("bar", Read());
316 ASSERT_EQ("EOF", Read());
317 }
318
319 TEST_P(LogTest, MarginalTrailer2) {
320 // Make a trailer that is exactly the same length as an empty record.
321 int header_size =
322 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
323 const int n = kBlockSize - 2 * header_size;
324 Write(BigString("foo", n));
325 ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
326 Write("bar");
327 ASSERT_EQ(BigString("foo", n), Read());
328 ASSERT_EQ("bar", Read());
329 ASSERT_EQ("EOF", Read());
330 ASSERT_EQ(0U, DroppedBytes());
331 ASSERT_EQ("", ReportMessage());
332 }
333
334 TEST_P(LogTest, ShortTrailer) {
335 int header_size =
336 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
337 const int n = kBlockSize - 2 * header_size + 4;
338 Write(BigString("foo", n));
339 ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
340 Write("");
341 Write("bar");
342 ASSERT_EQ(BigString("foo", n), Read());
343 ASSERT_EQ("", Read());
344 ASSERT_EQ("bar", Read());
345 ASSERT_EQ("EOF", Read());
346 }
347
348 TEST_P(LogTest, AlignedEof) {
349 int header_size =
350 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
351 const int n = kBlockSize - 2 * header_size + 4;
352 Write(BigString("foo", n));
353 ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
354 ASSERT_EQ(BigString("foo", n), Read());
355 ASSERT_EQ("EOF", Read());
356 }
357
358 TEST_P(LogTest, RandomRead) {
359 const int N = 500;
360 Random write_rnd(301);
361 for (int i = 0; i < N; i++) {
362 Write(RandomSkewedString(i, &write_rnd));
363 }
364 Random read_rnd(301);
365 for (int i = 0; i < N; i++) {
366 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
367 }
368 ASSERT_EQ("EOF", Read());
369 }
370
371 // Tests of all the error paths in log_reader.cc follow:
372
373 TEST_P(LogTest, ReadError) {
374 Write("foo");
375 ForceError();
376 ASSERT_EQ("EOF", Read());
377 ASSERT_EQ((unsigned int)kBlockSize, DroppedBytes());
378 ASSERT_EQ("OK", MatchError("read error"));
379 }
380
381 TEST_P(LogTest, BadRecordType) {
382 Write("foo");
383 // Type is stored in header[6]
384 IncrementByte(6, 100);
385 FixChecksum(0, 3, false);
386 ASSERT_EQ("EOF", Read());
387 ASSERT_EQ(3U, DroppedBytes());
388 ASSERT_EQ("OK", MatchError("unknown record type"));
389 }
390
391 TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
392 Write("foo");
393 ShrinkSize(4); // Drop all payload as well as a header byte
394 ASSERT_EQ("EOF", Read());
395 // Truncated last record is ignored, not treated as an error
396 ASSERT_EQ(0U, DroppedBytes());
397 ASSERT_EQ("", ReportMessage());
398 }
399
400 TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
401 if (allow_retry_read_) {
402 // If read retry is allowed, then truncated trailing record should not
403 // raise an error.
404 return;
405 }
406 Write("foo");
407 ShrinkSize(4); // Drop all payload as well as a header byte
408 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
409 // Truncated last record is ignored, not treated as an error
410 ASSERT_GT(DroppedBytes(), 0U);
411 ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
412 }
413
414 TEST_P(LogTest, BadLength) {
415 if (allow_retry_read_) {
416 // If read retry is allowed, then we should not raise an error when the
417 // record length specified in header is longer than data currently
418 // available. It's possible that the body of the record is not written yet.
419 return;
420 }
421 bool recyclable_log = (std::get<0>(GetParam()) != 0);
422 int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
423 const int kPayloadSize = kBlockSize - header_size;
424 Write(BigString("bar", kPayloadSize));
425 Write("foo");
426 // Least significant size byte is stored in header[4].
427 IncrementByte(4, 1);
428 if (!recyclable_log) {
429 ASSERT_EQ("foo", Read());
430 ASSERT_EQ(kBlockSize, DroppedBytes());
431 ASSERT_EQ("OK", MatchError("bad record length"));
432 } else {
433 ASSERT_EQ("EOF", Read());
434 }
435 }
436
437 TEST_P(LogTest, BadLengthAtEndIsIgnored) {
438 if (allow_retry_read_) {
439 // If read retry is allowed, then we should not raise an error when the
440 // record length specified in header is longer than data currently
441 // available. It's possible that the body of the record is not written yet.
442 return;
443 }
444 Write("foo");
445 ShrinkSize(1);
446 ASSERT_EQ("EOF", Read());
447 ASSERT_EQ(0U, DroppedBytes());
448 ASSERT_EQ("", ReportMessage());
449 }
450
451 TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
452 if (allow_retry_read_) {
453 // If read retry is allowed, then we should not raise an error when the
454 // record length specified in header is longer than data currently
455 // available. It's possible that the body of the record is not written yet.
456 return;
457 }
458 Write("foo");
459 ShrinkSize(1);
460 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
461 ASSERT_GT(DroppedBytes(), 0U);
462 ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
463 }
464
465 TEST_P(LogTest, ChecksumMismatch) {
466 Write("foooooo");
467 IncrementByte(0, 14);
468 ASSERT_EQ("EOF", Read());
469 bool recyclable_log = (std::get<0>(GetParam()) != 0);
470 if (!recyclable_log) {
471 ASSERT_EQ(14U, DroppedBytes());
472 ASSERT_EQ("OK", MatchError("checksum mismatch"));
473 } else {
474 ASSERT_EQ(0U, DroppedBytes());
475 ASSERT_EQ("", ReportMessage());
476 }
477 }
478
479 TEST_P(LogTest, UnexpectedMiddleType) {
480 Write("foo");
481 bool recyclable_log = (std::get<0>(GetParam()) != 0);
482 SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType
483 : kMiddleType));
484 FixChecksum(0, 3, !!recyclable_log);
485 ASSERT_EQ("EOF", Read());
486 ASSERT_EQ(3U, DroppedBytes());
487 ASSERT_EQ("OK", MatchError("missing start"));
488 }
489
490 TEST_P(LogTest, UnexpectedLastType) {
491 Write("foo");
492 bool recyclable_log = (std::get<0>(GetParam()) != 0);
493 SetByte(6,
494 static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType));
495 FixChecksum(0, 3, !!recyclable_log);
496 ASSERT_EQ("EOF", Read());
497 ASSERT_EQ(3U, DroppedBytes());
498 ASSERT_EQ("OK", MatchError("missing start"));
499 }
500
501 TEST_P(LogTest, UnexpectedFullType) {
502 Write("foo");
503 Write("bar");
504 bool recyclable_log = (std::get<0>(GetParam()) != 0);
505 SetByte(
506 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
507 FixChecksum(0, 3, !!recyclable_log);
508 ASSERT_EQ("bar", Read());
509 ASSERT_EQ("EOF", Read());
510 ASSERT_EQ(3U, DroppedBytes());
511 ASSERT_EQ("OK", MatchError("partial record without end"));
512 }
513
514 TEST_P(LogTest, UnexpectedFirstType) {
515 Write("foo");
516 Write(BigString("bar", 100000));
517 bool recyclable_log = (std::get<0>(GetParam()) != 0);
518 SetByte(
519 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
520 FixChecksum(0, 3, !!recyclable_log);
521 ASSERT_EQ(BigString("bar", 100000), Read());
522 ASSERT_EQ("EOF", Read());
523 ASSERT_EQ(3U, DroppedBytes());
524 ASSERT_EQ("OK", MatchError("partial record without end"));
525 }
526
527 TEST_P(LogTest, MissingLastIsIgnored) {
528 Write(BigString("bar", kBlockSize));
529 // Remove the LAST block, including header.
530 ShrinkSize(14);
531 ASSERT_EQ("EOF", Read());
532 ASSERT_EQ("", ReportMessage());
533 ASSERT_EQ(0U, DroppedBytes());
534 }
535
536 TEST_P(LogTest, MissingLastIsNotIgnored) {
537 if (allow_retry_read_) {
538 // If read retry is allowed, then truncated trailing record should not
539 // raise an error.
540 return;
541 }
542 Write(BigString("bar", kBlockSize));
543 // Remove the LAST block, including header.
544 ShrinkSize(14);
545 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
546 ASSERT_GT(DroppedBytes(), 0U);
547 ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
548 }
549
550 TEST_P(LogTest, PartialLastIsIgnored) {
551 Write(BigString("bar", kBlockSize));
552 // Cause a bad record length in the LAST block.
553 ShrinkSize(1);
554 ASSERT_EQ("EOF", Read());
555 ASSERT_EQ("", ReportMessage());
556 ASSERT_EQ(0U, DroppedBytes());
557 }
558
559 TEST_P(LogTest, PartialLastIsNotIgnored) {
560 if (allow_retry_read_) {
561 // If read retry is allowed, then truncated trailing record should not
562 // raise an error.
563 return;
564 }
565 Write(BigString("bar", kBlockSize));
566 // Cause a bad record length in the LAST block.
567 ShrinkSize(1);
568 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
569 ASSERT_GT(DroppedBytes(), 0U);
570 ASSERT_EQ("OK", MatchError(
571 "Corruption: truncated headerCorruption: "
572 "error reading trailing data"));
573 }
574
575 TEST_P(LogTest, ErrorJoinsRecords) {
576 // Consider two fragmented records:
577 // first(R1) last(R1) first(R2) last(R2)
578 // where the middle two fragments disappear. We do not want
579 // first(R1),last(R2) to get joined and returned as a valid record.
580
581 // Write records that span two blocks
582 Write(BigString("foo", kBlockSize));
583 Write(BigString("bar", kBlockSize));
584 Write("correct");
585
586 // Wipe the middle block
587 for (unsigned int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
588 SetByte(offset, 'x');
589 }
590
591 bool recyclable_log = (std::get<0>(GetParam()) != 0);
592 if (!recyclable_log) {
593 ASSERT_EQ("correct", Read());
594 ASSERT_EQ("EOF", Read());
595 size_t dropped = DroppedBytes();
596 ASSERT_LE(dropped, 2 * kBlockSize + 100);
597 ASSERT_GE(dropped, 2 * kBlockSize);
598 } else {
599 ASSERT_EQ("EOF", Read());
600 }
601 }
602
603 TEST_P(LogTest, ClearEofSingleBlock) {
604 Write("foo");
605 Write("bar");
606 bool recyclable_log = (std::get<0>(GetParam()) != 0);
607 int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
608 ForceEOF(3 + header_size + 2);
609 ASSERT_EQ("foo", Read());
610 UnmarkEOF();
611 ASSERT_EQ("bar", Read());
612 ASSERT_TRUE(IsEOF());
613 ASSERT_EQ("EOF", Read());
614 Write("xxx");
615 UnmarkEOF();
616 ASSERT_EQ("xxx", Read());
617 ASSERT_TRUE(IsEOF());
618 }
619
620 TEST_P(LogTest, ClearEofMultiBlock) {
621 size_t num_full_blocks = 5;
622 bool recyclable_log = (std::get<0>(GetParam()) != 0);
623 int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
624 size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
625 Write(BigString("foo", n));
626 Write(BigString("bar", n));
627 ForceEOF(n + num_full_blocks * header_size + header_size + 3);
628 ASSERT_EQ(BigString("foo", n), Read());
629 ASSERT_TRUE(IsEOF());
630 UnmarkEOF();
631 ASSERT_EQ(BigString("bar", n), Read());
632 ASSERT_TRUE(IsEOF());
633 Write(BigString("xxx", n));
634 UnmarkEOF();
635 ASSERT_EQ(BigString("xxx", n), Read());
636 ASSERT_TRUE(IsEOF());
637 }
638
639 TEST_P(LogTest, ClearEofError) {
640 // If an error occurs during Read() in UnmarkEOF(), the records contained
641 // in the buffer should be returned on subsequent calls of ReadRecord()
642 // until no more full records are left, whereafter ReadRecord() should return
643 // false to indicate that it cannot read any further.
644
645 Write("foo");
646 Write("bar");
647 UnmarkEOF();
648 ASSERT_EQ("foo", Read());
649 ASSERT_TRUE(IsEOF());
650 Write("xxx");
651 ForceError(0);
652 UnmarkEOF();
653 ASSERT_EQ("bar", Read());
654 ASSERT_EQ("EOF", Read());
655 }
656
657 TEST_P(LogTest, ClearEofError2) {
658 Write("foo");
659 Write("bar");
660 UnmarkEOF();
661 ASSERT_EQ("foo", Read());
662 Write("xxx");
663 ForceError(3);
664 UnmarkEOF();
665 ASSERT_EQ("bar", Read());
666 ASSERT_EQ("EOF", Read());
667 ASSERT_EQ(3U, DroppedBytes());
668 ASSERT_EQ("OK", MatchError("read error"));
669 }
670
671 TEST_P(LogTest, Recycle) {
672 bool recyclable_log = (std::get<0>(GetParam()) != 0);
673 if (!recyclable_log) {
674 return; // test is only valid for recycled logs
675 }
676 Write("foo");
677 Write("bar");
678 Write("baz");
679 Write("bif");
680 Write("blitz");
681 while (get_reader_contents()->size() < log::kBlockSize * 2) {
682 Write("xxxxxxxxxxxxxxxx");
683 }
684 std::unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
685 new test::OverwritingStringSink(get_reader_contents()),
686 "" /* don't care */));
687 Writer recycle_writer(std::move(dest_holder), 123, true);
688 recycle_writer.AddRecord(Slice("foooo"));
689 recycle_writer.AddRecord(Slice("bar"));
690 ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2);
691 ASSERT_EQ("foooo", Read());
692 ASSERT_EQ("bar", Read());
693 ASSERT_EQ("EOF", Read());
694 }
695
696 INSTANTIATE_TEST_CASE_P(bool, LogTest,
697 ::testing::Values(std::make_tuple(0, false),
698 std::make_tuple(0, true),
699 std::make_tuple(1, false),
700 std::make_tuple(1, true)));
701
702 class RetriableLogTest : public ::testing::TestWithParam<int> {
703 private:
704 class ReportCollector : public Reader::Reporter {
705 public:
706 size_t dropped_bytes_;
707 std::string message_;
708
709 ReportCollector() : dropped_bytes_(0) {}
710 void Corruption(size_t bytes, const Status& status) override {
711 dropped_bytes_ += bytes;
712 message_.append(status.ToString());
713 }
714 };
715
716 Slice contents_;
717 std::unique_ptr<WritableFileWriter> dest_holder_;
718 std::unique_ptr<Writer> log_writer_;
719 Env* env_;
720 EnvOptions env_options_;
721 const std::string test_dir_;
722 const std::string log_file_;
723 std::unique_ptr<WritableFileWriter> writer_;
724 std::unique_ptr<SequentialFileReader> reader_;
725 ReportCollector report_;
726 std::unique_ptr<FragmentBufferedReader> log_reader_;
727
728 public:
729 RetriableLogTest()
730 : contents_(),
731 dest_holder_(nullptr),
732 log_writer_(nullptr),
733 env_(Env::Default()),
734 test_dir_(test::PerThreadDBPath("retriable_log_test")),
735 log_file_(test_dir_ + "/log"),
736 writer_(nullptr),
737 reader_(nullptr),
738 log_reader_(nullptr) {}
739
740 Status SetupTestEnv() {
741 dest_holder_.reset(test::GetWritableFileWriter(
742 new test::StringSink(&contents_), "" /* file name */));
743 assert(dest_holder_ != nullptr);
744 log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam()));
745 assert(log_writer_ != nullptr);
746
747 Status s;
748 s = env_->CreateDirIfMissing(test_dir_);
749 std::unique_ptr<WritableFile> writable_file;
750 if (s.ok()) {
751 s = env_->NewWritableFile(log_file_, &writable_file, env_options_);
752 }
753 if (s.ok()) {
754 writer_.reset(new WritableFileWriter(std::move(writable_file), log_file_,
755 env_options_));
756 assert(writer_ != nullptr);
757 }
758 std::unique_ptr<SequentialFile> seq_file;
759 if (s.ok()) {
760 s = env_->NewSequentialFile(log_file_, &seq_file, env_options_);
761 }
762 if (s.ok()) {
763 reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_));
764 assert(reader_ != nullptr);
765 log_reader_.reset(new FragmentBufferedReader(
766 nullptr, std::move(reader_), &report_, true /* checksum */,
767 123 /* log_number */));
768 assert(log_reader_ != nullptr);
769 }
770 return s;
771 }
772
773 std::string contents() {
774 auto file =
775 dynamic_cast<test::StringSink*>(log_writer_->file()->writable_file());
776 assert(file != nullptr);
777 return file->contents_;
778 }
779
780 void Encode(const std::string& msg) { log_writer_->AddRecord(Slice(msg)); }
781
782 void Write(const Slice& data) {
783 writer_->Append(data);
784 writer_->Sync(true);
785 }
786
787 bool TryRead(std::string* result) {
788 assert(result != nullptr);
789 result->clear();
790 std::string scratch;
791 Slice record;
792 bool r = log_reader_->ReadRecord(&record, &scratch);
793 if (r) {
794 result->assign(record.data(), record.size());
795 return true;
796 } else {
797 return false;
798 }
799 }
800 };
801
802 TEST_P(RetriableLogTest, TailLog_PartialHeader) {
803 ASSERT_OK(SetupTestEnv());
804 std::vector<int> remaining_bytes_in_last_record;
805 size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
806 bool eof = false;
807 SyncPoint::GetInstance()->DisableProcessing();
808 SyncPoint::GetInstance()->LoadDependency(
809 {{"RetriableLogTest::TailLog:AfterPart1",
810 "RetriableLogTest::TailLog:BeforeReadRecord"},
811 {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
812 "RetriableLogTest::TailLog:BeforePart2"}});
813 SyncPoint::GetInstance()->ClearAllCallBacks();
814 SyncPoint::GetInstance()->SetCallBack(
815 "FragmentBufferedLogReader::TryReadMore:FirstEOF",
816 [&](void* /*arg*/) { eof = true; });
817 SyncPoint::GetInstance()->EnableProcessing();
818
819 size_t delta = header_size - 1;
820 port::Thread log_writer_thread([&]() {
821 size_t old_sz = contents().size();
822 Encode("foo");
823 size_t new_sz = contents().size();
824 std::string part1 = contents().substr(old_sz, delta);
825 std::string part2 =
826 contents().substr(old_sz + delta, new_sz - old_sz - delta);
827 Write(Slice(part1));
828 TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
829 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
830 Write(Slice(part2));
831 });
832
833 std::string record;
834 port::Thread log_reader_thread([&]() {
835 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
836 while (!TryRead(&record)) {
837 }
838 });
839 log_reader_thread.join();
840 log_writer_thread.join();
841 ASSERT_EQ("foo", record);
842 ASSERT_TRUE(eof);
843 }
844
845 TEST_P(RetriableLogTest, TailLog_FullHeader) {
846 ASSERT_OK(SetupTestEnv());
847 std::vector<int> remaining_bytes_in_last_record;
848 size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
849 bool eof = false;
850 SyncPoint::GetInstance()->DisableProcessing();
851 SyncPoint::GetInstance()->LoadDependency(
852 {{"RetriableLogTest::TailLog:AfterPart1",
853 "RetriableLogTest::TailLog:BeforeReadRecord"},
854 {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
855 "RetriableLogTest::TailLog:BeforePart2"}});
856 SyncPoint::GetInstance()->ClearAllCallBacks();
857 SyncPoint::GetInstance()->SetCallBack(
858 "FragmentBufferedLogReader::TryReadMore:FirstEOF",
859 [&](void* /*arg*/) { eof = true; });
860 SyncPoint::GetInstance()->EnableProcessing();
861
862 size_t delta = header_size + 1;
863 port::Thread log_writer_thread([&]() {
864 size_t old_sz = contents().size();
865 Encode("foo");
866 size_t new_sz = contents().size();
867 std::string part1 = contents().substr(old_sz, delta);
868 std::string part2 =
869 contents().substr(old_sz + delta, new_sz - old_sz - delta);
870 Write(Slice(part1));
871 TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
872 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
873 Write(Slice(part2));
874 ASSERT_TRUE(eof);
875 });
876
877 std::string record;
878 port::Thread log_reader_thread([&]() {
879 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
880 while (!TryRead(&record)) {
881 }
882 });
883 log_reader_thread.join();
884 log_writer_thread.join();
885 ASSERT_EQ("foo", record);
886 }
887
888 TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
889 // Clear all sync point callbacks even if this test does not use sync point.
890 // It is necessary, otherwise the execute of this test may hit a sync point
891 // with which a callback is registered. The registered callback may access
892 // some dead variable, causing segfault.
893 SyncPoint::GetInstance()->DisableProcessing();
894 SyncPoint::GetInstance()->ClearAllCallBacks();
895 ASSERT_OK(SetupTestEnv());
896 size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
897 size_t delta = header_size - 1;
898 size_t old_sz = contents().size();
899 Encode("foo-bar");
900 size_t new_sz = contents().size();
901 std::string part1 = contents().substr(old_sz, delta);
902 std::string part2 =
903 contents().substr(old_sz + delta, new_sz - old_sz - delta);
904 Write(Slice(part1));
905 std::string record;
906 ASSERT_FALSE(TryRead(&record));
907 ASSERT_TRUE(record.empty());
908 Write(Slice(part2));
909 ASSERT_TRUE(TryRead(&record));
910 ASSERT_EQ("foo-bar", record);
911 }
912
913 INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
914
915 } // namespace log
916 } // namespace rocksdb
917
918 int main(int argc, char** argv) {
919 ::testing::InitGoogleTest(&argc, argv);
920 return RUN_ALL_TESTS();
921 }