]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/log_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / log_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/log_reader.h"
11#include "db/log_writer.h"
f67539c2
TL
12#include "env/composite_env_wrapper.h"
13#include "file/sequence_file_reader.h"
14#include "file/writable_file_writer.h"
7c673cae 15#include "rocksdb/env.h"
f67539c2
TL
16#include "test_util/testharness.h"
17#include "test_util/testutil.h"
7c673cae
FG
18#include "util/coding.h"
19#include "util/crc32c.h"
7c673cae 20#include "util/random.h"
7c673cae 21
f67539c2 22namespace ROCKSDB_NAMESPACE {
7c673cae
FG
23namespace log {
24
25// Construct a string of the specified length made out of the supplied
26// partial string.
27static std::string BigString(const std::string& partial_string, size_t n) {
28 std::string result;
29 while (result.size() < n) {
30 result.append(partial_string);
31 }
32 result.resize(n);
33 return result;
34}
35
36// Construct a string from a number
37static std::string NumberString(int n) {
38 char buf[50];
39 snprintf(buf, sizeof(buf), "%d.", n);
40 return std::string(buf);
41}
42
43// Return a skewed potentially long string
44static std::string RandomSkewedString(int i, Random* rnd) {
45 return BigString(NumberString(i), rnd->Skewed(17));
46}
47
494da23a
TL
48// Param type is tuple<int, bool>
49// get<0>(tuple): non-zero if recycling log, zero if regular log
50// get<1>(tuple): true if allow retry after read EOF, false otherwise
51class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
7c673cae
FG
52 private:
53 class StringSource : public SequentialFile {
54 public:
55 Slice& contents_;
56 bool force_error_;
57 size_t force_error_position_;
58 bool force_eof_;
59 size_t force_eof_position_;
60 bool returned_partial_;
494da23a
TL
61 bool fail_after_read_partial_;
62 explicit StringSource(Slice& contents, bool fail_after_read_partial)
63 : contents_(contents),
64 force_error_(false),
65 force_error_position_(0),
66 force_eof_(false),
67 force_eof_position_(0),
68 returned_partial_(false),
69 fail_after_read_partial_(fail_after_read_partial) {}
70
71 Status Read(size_t n, Slice* result, char* scratch) override {
72 if (fail_after_read_partial_) {
73 EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
74 }
7c673cae
FG
75
76 if (force_error_) {
77 if (force_error_position_ >= n) {
78 force_error_position_ -= n;
79 } else {
80 *result = Slice(contents_.data(), force_error_position_);
81 contents_.remove_prefix(force_error_position_);
82 force_error_ = false;
83 returned_partial_ = true;
84 return Status::Corruption("read error");
85 }
86 }
87
88 if (contents_.size() < n) {
89 n = contents_.size();
90 returned_partial_ = true;
91 }
92
93 if (force_eof_) {
94 if (force_eof_position_ >= n) {
95 force_eof_position_ -= n;
96 } else {
97 force_eof_ = false;
98 n = force_eof_position_;
99 returned_partial_ = true;
100 }
101 }
102
103 // By using scratch we ensure that caller has control over the
104 // lifetime of result.data()
105 memcpy(scratch, contents_.data(), n);
106 *result = Slice(scratch, n);
107
108 contents_.remove_prefix(n);
109 return Status::OK();
110 }
111
494da23a 112 Status Skip(uint64_t n) override {
7c673cae
FG
113 if (n > contents_.size()) {
114 contents_.clear();
115 return Status::NotFound("in-memory file skipepd past end");
116 }
117
118 contents_.remove_prefix(n);
119
120 return Status::OK();
121 }
122 };
123
f67539c2
TL
124 inline StringSource* GetStringSourceFromLegacyReader(
125 SequentialFileReader* reader) {
126 LegacySequentialFileWrapper* file =
127 static_cast<LegacySequentialFileWrapper*>(reader->file());
128 return static_cast<StringSource*>(file->target());
129 }
130
7c673cae
FG
131 class ReportCollector : public Reader::Reporter {
132 public:
133 size_t dropped_bytes_;
134 std::string message_;
135
136 ReportCollector() : dropped_bytes_(0) { }
494da23a 137 void Corruption(size_t bytes, const Status& status) override {
7c673cae
FG
138 dropped_bytes_ += bytes;
139 message_.append(status.ToString());
140 }
141 };
142
143 std::string& dest_contents() {
f67539c2 144 auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
7c673cae
FG
145 assert(dest);
146 return dest->contents_;
147 }
148
149 const std::string& dest_contents() const {
f67539c2 150 auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
7c673cae
FG
151 assert(dest);
152 return dest->contents_;
153 }
154
155 void reset_source_contents() {
f67539c2 156 auto src = GetStringSourceFromLegacyReader(reader_->file());
7c673cae
FG
157 assert(src);
158 src->contents_ = dest_contents();
159 }
160
161 Slice reader_contents_;
494da23a
TL
162 std::unique_ptr<WritableFileWriter> dest_holder_;
163 std::unique_ptr<SequentialFileReader> source_holder_;
7c673cae
FG
164 ReportCollector report_;
165 Writer writer_;
494da23a 166 std::unique_ptr<Reader> reader_;
7c673cae 167
494da23a
TL
168 protected:
169 bool allow_retry_read_;
7c673cae
FG
170
171 public:
172 LogTest()
173 : reader_contents_(),
174 dest_holder_(test::GetWritableFileWriter(
11fdf7f2
TL
175 new test::StringSink(&reader_contents_), "" /* don't care */)),
176 source_holder_(test::GetSequentialFileReader(
494da23a
TL
177 new StringSource(reader_contents_, !std::get<1>(GetParam())),
178 "" /* file name */)),
179 writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())),
180 allow_retry_read_(std::get<1>(GetParam())) {
181 if (allow_retry_read_) {
182 reader_.reset(new FragmentBufferedReader(
183 nullptr, std::move(source_holder_), &report_, true /* checksum */,
184 123 /* log_number */));
185 } else {
186 reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_,
187 true /* checksum */, 123 /* log_number */));
188 }
7c673cae
FG
189 }
190
191 Slice* get_reader_contents() { return &reader_contents_; }
192
193 void Write(const std::string& msg) {
194 writer_.AddRecord(Slice(msg));
195 }
196
197 size_t WrittenBytes() const {
198 return dest_contents().size();
199 }
200
201 std::string Read(const WALRecoveryMode wal_recovery_mode =
202 WALRecoveryMode::kTolerateCorruptedTailRecords) {
203 std::string scratch;
204 Slice record;
494da23a
TL
205 bool ret = false;
206 ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode);
207 if (ret) {
7c673cae
FG
208 return record.ToString();
209 } else {
210 return "EOF";
211 }
212 }
213
11fdf7f2 214 void IncrementByte(int offset, char delta) {
7c673cae
FG
215 dest_contents()[offset] += delta;
216 }
217
218 void SetByte(int offset, char new_byte) {
219 dest_contents()[offset] = new_byte;
220 }
221
222 void ShrinkSize(int bytes) {
f67539c2 223 auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
7c673cae
FG
224 assert(dest);
225 dest->Drop(bytes);
226 }
227
228 void FixChecksum(int header_offset, int len, bool recyclable) {
229 // Compute crc of type/len/data
230 int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize;
231 uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6],
232 header_size - 6 + len);
233 crc = crc32c::Mask(crc);
234 EncodeFixed32(&dest_contents()[header_offset], crc);
235 }
236
237 void ForceError(size_t position = 0) {
f67539c2 238 auto src = GetStringSourceFromLegacyReader(reader_->file());
7c673cae
FG
239 src->force_error_ = true;
240 src->force_error_position_ = position;
241 }
242
243 size_t DroppedBytes() const {
244 return report_.dropped_bytes_;
245 }
246
247 std::string ReportMessage() const {
248 return report_.message_;
249 }
250
251 void ForceEOF(size_t position = 0) {
f67539c2 252 auto src = GetStringSourceFromLegacyReader(reader_->file());
7c673cae
FG
253 src->force_eof_ = true;
254 src->force_eof_position_ = position;
255 }
256
257 void UnmarkEOF() {
f67539c2 258 auto src = GetStringSourceFromLegacyReader(reader_->file());
7c673cae 259 src->returned_partial_ = false;
494da23a 260 reader_->UnmarkEOF();
7c673cae
FG
261 }
262
494da23a 263 bool IsEOF() { return reader_->IsEOF(); }
7c673cae
FG
264
265 // Returns OK iff recorded error message contains "msg"
266 std::string MatchError(const std::string& msg) const {
267 if (report_.message_.find(msg) == std::string::npos) {
268 return report_.message_;
269 } else {
270 return "OK";
271 }
272 }
7c673cae
FG
273};
274
7c673cae
FG
275TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
276
277TEST_P(LogTest, ReadWrite) {
278 Write("foo");
279 Write("bar");
280 Write("");
281 Write("xxxx");
282 ASSERT_EQ("foo", Read());
283 ASSERT_EQ("bar", Read());
284 ASSERT_EQ("", Read());
285 ASSERT_EQ("xxxx", Read());
286 ASSERT_EQ("EOF", Read());
287 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
288}
289
290TEST_P(LogTest, ManyBlocks) {
291 for (int i = 0; i < 100000; i++) {
292 Write(NumberString(i));
293 }
294 for (int i = 0; i < 100000; i++) {
295 ASSERT_EQ(NumberString(i), Read());
296 }
297 ASSERT_EQ("EOF", Read());
298}
299
300TEST_P(LogTest, Fragmentation) {
301 Write("small");
302 Write(BigString("medium", 50000));
303 Write(BigString("large", 100000));
304 ASSERT_EQ("small", Read());
305 ASSERT_EQ(BigString("medium", 50000), Read());
306 ASSERT_EQ(BigString("large", 100000), Read());
307 ASSERT_EQ("EOF", Read());
308}
309
310TEST_P(LogTest, MarginalTrailer) {
311 // Make a trailer that is exactly the same length as an empty record.
494da23a
TL
312 int header_size =
313 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
314 const int n = kBlockSize - 2 * header_size;
315 Write(BigString("foo", n));
316 ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
317 Write("");
318 Write("bar");
319 ASSERT_EQ(BigString("foo", n), Read());
320 ASSERT_EQ("", Read());
321 ASSERT_EQ("bar", Read());
322 ASSERT_EQ("EOF", Read());
323}
324
325TEST_P(LogTest, MarginalTrailer2) {
326 // Make a trailer that is exactly the same length as an empty record.
494da23a
TL
327 int header_size =
328 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
329 const int n = kBlockSize - 2 * header_size;
330 Write(BigString("foo", n));
331 ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
332 Write("bar");
333 ASSERT_EQ(BigString("foo", n), Read());
334 ASSERT_EQ("bar", Read());
335 ASSERT_EQ("EOF", Read());
336 ASSERT_EQ(0U, DroppedBytes());
337 ASSERT_EQ("", ReportMessage());
338}
339
340TEST_P(LogTest, ShortTrailer) {
494da23a
TL
341 int header_size =
342 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
343 const int n = kBlockSize - 2 * header_size + 4;
344 Write(BigString("foo", n));
345 ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
346 Write("");
347 Write("bar");
348 ASSERT_EQ(BigString("foo", n), Read());
349 ASSERT_EQ("", Read());
350 ASSERT_EQ("bar", Read());
351 ASSERT_EQ("EOF", Read());
352}
353
354TEST_P(LogTest, AlignedEof) {
494da23a
TL
355 int header_size =
356 std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
357 const int n = kBlockSize - 2 * header_size + 4;
358 Write(BigString("foo", n));
359 ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
360 ASSERT_EQ(BigString("foo", n), Read());
361 ASSERT_EQ("EOF", Read());
362}
363
364TEST_P(LogTest, RandomRead) {
365 const int N = 500;
366 Random write_rnd(301);
367 for (int i = 0; i < N; i++) {
368 Write(RandomSkewedString(i, &write_rnd));
369 }
370 Random read_rnd(301);
371 for (int i = 0; i < N; i++) {
372 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
373 }
374 ASSERT_EQ("EOF", Read());
375}
376
377// Tests of all the error paths in log_reader.cc follow:
378
379TEST_P(LogTest, ReadError) {
380 Write("foo");
381 ForceError();
382 ASSERT_EQ("EOF", Read());
383 ASSERT_EQ((unsigned int)kBlockSize, DroppedBytes());
384 ASSERT_EQ("OK", MatchError("read error"));
385}
386
387TEST_P(LogTest, BadRecordType) {
388 Write("foo");
389 // Type is stored in header[6]
390 IncrementByte(6, 100);
391 FixChecksum(0, 3, false);
392 ASSERT_EQ("EOF", Read());
393 ASSERT_EQ(3U, DroppedBytes());
394 ASSERT_EQ("OK", MatchError("unknown record type"));
395}
396
397TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
398 Write("foo");
399 ShrinkSize(4); // Drop all payload as well as a header byte
400 ASSERT_EQ("EOF", Read());
401 // Truncated last record is ignored, not treated as an error
402 ASSERT_EQ(0U, DroppedBytes());
403 ASSERT_EQ("", ReportMessage());
404}
405
406TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
494da23a
TL
407 if (allow_retry_read_) {
408 // If read retry is allowed, then truncated trailing record should not
409 // raise an error.
410 return;
411 }
7c673cae
FG
412 Write("foo");
413 ShrinkSize(4); // Drop all payload as well as a header byte
414 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
415 // Truncated last record is ignored, not treated as an error
416 ASSERT_GT(DroppedBytes(), 0U);
417 ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
418}
419
420TEST_P(LogTest, BadLength) {
494da23a
TL
421 if (allow_retry_read_) {
422 // If read retry is allowed, then we should not raise an error when the
423 // record length specified in header is longer than data currently
424 // available. It's possible that the body of the record is not written yet.
425 return;
426 }
427 bool recyclable_log = (std::get<0>(GetParam()) != 0);
428 int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
429 const int kPayloadSize = kBlockSize - header_size;
430 Write(BigString("bar", kPayloadSize));
431 Write("foo");
432 // Least significant size byte is stored in header[4].
433 IncrementByte(4, 1);
494da23a 434 if (!recyclable_log) {
7c673cae
FG
435 ASSERT_EQ("foo", Read());
436 ASSERT_EQ(kBlockSize, DroppedBytes());
437 ASSERT_EQ("OK", MatchError("bad record length"));
438 } else {
439 ASSERT_EQ("EOF", Read());
440 }
441}
442
443TEST_P(LogTest, BadLengthAtEndIsIgnored) {
494da23a
TL
444 if (allow_retry_read_) {
445 // If read retry is allowed, then we should not raise an error when the
446 // record length specified in header is longer than data currently
447 // available. It's possible that the body of the record is not written yet.
448 return;
449 }
7c673cae
FG
450 Write("foo");
451 ShrinkSize(1);
452 ASSERT_EQ("EOF", Read());
453 ASSERT_EQ(0U, DroppedBytes());
454 ASSERT_EQ("", ReportMessage());
455}
456
457TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
494da23a
TL
458 if (allow_retry_read_) {
459 // If read retry is allowed, then we should not raise an error when the
460 // record length specified in header is longer than data currently
461 // available. It's possible that the body of the record is not written yet.
462 return;
463 }
7c673cae
FG
464 Write("foo");
465 ShrinkSize(1);
466 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
467 ASSERT_GT(DroppedBytes(), 0U);
20effc67 468 ASSERT_EQ("OK", MatchError("Corruption: truncated record body"));
7c673cae
FG
469}
470
471TEST_P(LogTest, ChecksumMismatch) {
472 Write("foooooo");
473 IncrementByte(0, 14);
474 ASSERT_EQ("EOF", Read());
494da23a
TL
475 bool recyclable_log = (std::get<0>(GetParam()) != 0);
476 if (!recyclable_log) {
7c673cae
FG
477 ASSERT_EQ(14U, DroppedBytes());
478 ASSERT_EQ("OK", MatchError("checksum mismatch"));
479 } else {
480 ASSERT_EQ(0U, DroppedBytes());
481 ASSERT_EQ("", ReportMessage());
482 }
483}
484
485TEST_P(LogTest, UnexpectedMiddleType) {
486 Write("foo");
494da23a
TL
487 bool recyclable_log = (std::get<0>(GetParam()) != 0);
488 SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType
489 : kMiddleType));
490 FixChecksum(0, 3, !!recyclable_log);
7c673cae
FG
491 ASSERT_EQ("EOF", Read());
492 ASSERT_EQ(3U, DroppedBytes());
493 ASSERT_EQ("OK", MatchError("missing start"));
494}
495
496TEST_P(LogTest, UnexpectedLastType) {
497 Write("foo");
494da23a
TL
498 bool recyclable_log = (std::get<0>(GetParam()) != 0);
499 SetByte(6,
500 static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType));
501 FixChecksum(0, 3, !!recyclable_log);
7c673cae
FG
502 ASSERT_EQ("EOF", Read());
503 ASSERT_EQ(3U, DroppedBytes());
504 ASSERT_EQ("OK", MatchError("missing start"));
505}
506
507TEST_P(LogTest, UnexpectedFullType) {
508 Write("foo");
509 Write("bar");
494da23a
TL
510 bool recyclable_log = (std::get<0>(GetParam()) != 0);
511 SetByte(
512 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
513 FixChecksum(0, 3, !!recyclable_log);
7c673cae
FG
514 ASSERT_EQ("bar", Read());
515 ASSERT_EQ("EOF", Read());
516 ASSERT_EQ(3U, DroppedBytes());
517 ASSERT_EQ("OK", MatchError("partial record without end"));
518}
519
520TEST_P(LogTest, UnexpectedFirstType) {
521 Write("foo");
522 Write(BigString("bar", 100000));
494da23a
TL
523 bool recyclable_log = (std::get<0>(GetParam()) != 0);
524 SetByte(
525 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
526 FixChecksum(0, 3, !!recyclable_log);
7c673cae
FG
527 ASSERT_EQ(BigString("bar", 100000), Read());
528 ASSERT_EQ("EOF", Read());
529 ASSERT_EQ(3U, DroppedBytes());
530 ASSERT_EQ("OK", MatchError("partial record without end"));
531}
532
533TEST_P(LogTest, MissingLastIsIgnored) {
534 Write(BigString("bar", kBlockSize));
535 // Remove the LAST block, including header.
536 ShrinkSize(14);
537 ASSERT_EQ("EOF", Read());
538 ASSERT_EQ("", ReportMessage());
539 ASSERT_EQ(0U, DroppedBytes());
540}
541
542TEST_P(LogTest, MissingLastIsNotIgnored) {
494da23a
TL
543 if (allow_retry_read_) {
544 // If read retry is allowed, then truncated trailing record should not
545 // raise an error.
546 return;
547 }
7c673cae
FG
548 Write(BigString("bar", kBlockSize));
549 // Remove the LAST block, including header.
550 ShrinkSize(14);
551 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
552 ASSERT_GT(DroppedBytes(), 0U);
553 ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
554}
555
556TEST_P(LogTest, PartialLastIsIgnored) {
557 Write(BigString("bar", kBlockSize));
558 // Cause a bad record length in the LAST block.
559 ShrinkSize(1);
560 ASSERT_EQ("EOF", Read());
561 ASSERT_EQ("", ReportMessage());
562 ASSERT_EQ(0U, DroppedBytes());
563}
564
565TEST_P(LogTest, PartialLastIsNotIgnored) {
494da23a
TL
566 if (allow_retry_read_) {
567 // If read retry is allowed, then truncated trailing record should not
568 // raise an error.
569 return;
570 }
7c673cae
FG
571 Write(BigString("bar", kBlockSize));
572 // Cause a bad record length in the LAST block.
573 ShrinkSize(1);
574 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
575 ASSERT_GT(DroppedBytes(), 0U);
20effc67 576 ASSERT_EQ("OK", MatchError("Corruption: truncated record body"));
7c673cae
FG
577}
578
579TEST_P(LogTest, ErrorJoinsRecords) {
580 // Consider two fragmented records:
581 // first(R1) last(R1) first(R2) last(R2)
582 // where the middle two fragments disappear. We do not want
583 // first(R1),last(R2) to get joined and returned as a valid record.
584
585 // Write records that span two blocks
586 Write(BigString("foo", kBlockSize));
587 Write(BigString("bar", kBlockSize));
588 Write("correct");
589
590 // Wipe the middle block
591 for (unsigned int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
592 SetByte(offset, 'x');
593 }
594
494da23a
TL
595 bool recyclable_log = (std::get<0>(GetParam()) != 0);
596 if (!recyclable_log) {
7c673cae
FG
597 ASSERT_EQ("correct", Read());
598 ASSERT_EQ("EOF", Read());
599 size_t dropped = DroppedBytes();
600 ASSERT_LE(dropped, 2 * kBlockSize + 100);
601 ASSERT_GE(dropped, 2 * kBlockSize);
602 } else {
603 ASSERT_EQ("EOF", Read());
604 }
605}
606
7c673cae
FG
607TEST_P(LogTest, ClearEofSingleBlock) {
608 Write("foo");
609 Write("bar");
494da23a
TL
610 bool recyclable_log = (std::get<0>(GetParam()) != 0);
611 int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
612 ForceEOF(3 + header_size + 2);
613 ASSERT_EQ("foo", Read());
614 UnmarkEOF();
615 ASSERT_EQ("bar", Read());
616 ASSERT_TRUE(IsEOF());
617 ASSERT_EQ("EOF", Read());
618 Write("xxx");
619 UnmarkEOF();
620 ASSERT_EQ("xxx", Read());
621 ASSERT_TRUE(IsEOF());
622}
623
624TEST_P(LogTest, ClearEofMultiBlock) {
625 size_t num_full_blocks = 5;
494da23a
TL
626 bool recyclable_log = (std::get<0>(GetParam()) != 0);
627 int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
7c673cae
FG
628 size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
629 Write(BigString("foo", n));
630 Write(BigString("bar", n));
631 ForceEOF(n + num_full_blocks * header_size + header_size + 3);
632 ASSERT_EQ(BigString("foo", n), Read());
633 ASSERT_TRUE(IsEOF());
634 UnmarkEOF();
635 ASSERT_EQ(BigString("bar", n), Read());
636 ASSERT_TRUE(IsEOF());
637 Write(BigString("xxx", n));
638 UnmarkEOF();
639 ASSERT_EQ(BigString("xxx", n), Read());
640 ASSERT_TRUE(IsEOF());
641}
642
643TEST_P(LogTest, ClearEofError) {
644 // If an error occurs during Read() in UnmarkEOF(), the records contained
645 // in the buffer should be returned on subsequent calls of ReadRecord()
646 // until no more full records are left, whereafter ReadRecord() should return
647 // false to indicate that it cannot read any further.
648
649 Write("foo");
650 Write("bar");
651 UnmarkEOF();
652 ASSERT_EQ("foo", Read());
653 ASSERT_TRUE(IsEOF());
654 Write("xxx");
655 ForceError(0);
656 UnmarkEOF();
657 ASSERT_EQ("bar", Read());
658 ASSERT_EQ("EOF", Read());
659}
660
661TEST_P(LogTest, ClearEofError2) {
662 Write("foo");
663 Write("bar");
664 UnmarkEOF();
665 ASSERT_EQ("foo", Read());
666 Write("xxx");
667 ForceError(3);
668 UnmarkEOF();
669 ASSERT_EQ("bar", Read());
670 ASSERT_EQ("EOF", Read());
671 ASSERT_EQ(3U, DroppedBytes());
672 ASSERT_EQ("OK", MatchError("read error"));
673}
674
675TEST_P(LogTest, Recycle) {
494da23a
TL
676 bool recyclable_log = (std::get<0>(GetParam()) != 0);
677 if (!recyclable_log) {
7c673cae
FG
678 return; // test is only valid for recycled logs
679 }
680 Write("foo");
681 Write("bar");
682 Write("baz");
683 Write("bif");
684 Write("blitz");
685 while (get_reader_contents()->size() < log::kBlockSize * 2) {
686 Write("xxxxxxxxxxxxxxxx");
687 }
494da23a 688 std::unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
11fdf7f2
TL
689 new test::OverwritingStringSink(get_reader_contents()),
690 "" /* don't care */));
7c673cae
FG
691 Writer recycle_writer(std::move(dest_holder), 123, true);
692 recycle_writer.AddRecord(Slice("foooo"));
693 recycle_writer.AddRecord(Slice("bar"));
694 ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2);
695 ASSERT_EQ("foooo", Read());
696 ASSERT_EQ("bar", Read());
697 ASSERT_EQ("EOF", Read());
698}
699
494da23a
TL
700INSTANTIATE_TEST_CASE_P(bool, LogTest,
701 ::testing::Values(std::make_tuple(0, false),
702 std::make_tuple(0, true),
703 std::make_tuple(1, false),
704 std::make_tuple(1, true)));
705
706class RetriableLogTest : public ::testing::TestWithParam<int> {
707 private:
708 class ReportCollector : public Reader::Reporter {
709 public:
710 size_t dropped_bytes_;
711 std::string message_;
712
713 ReportCollector() : dropped_bytes_(0) {}
714 void Corruption(size_t bytes, const Status& status) override {
715 dropped_bytes_ += bytes;
716 message_.append(status.ToString());
717 }
718 };
719
720 Slice contents_;
721 std::unique_ptr<WritableFileWriter> dest_holder_;
722 std::unique_ptr<Writer> log_writer_;
723 Env* env_;
724 EnvOptions env_options_;
725 const std::string test_dir_;
726 const std::string log_file_;
727 std::unique_ptr<WritableFileWriter> writer_;
728 std::unique_ptr<SequentialFileReader> reader_;
729 ReportCollector report_;
730 std::unique_ptr<FragmentBufferedReader> log_reader_;
731
732 public:
733 RetriableLogTest()
734 : contents_(),
735 dest_holder_(nullptr),
736 log_writer_(nullptr),
737 env_(Env::Default()),
738 test_dir_(test::PerThreadDBPath("retriable_log_test")),
739 log_file_(test_dir_ + "/log"),
740 writer_(nullptr),
741 reader_(nullptr),
742 log_reader_(nullptr) {}
743
744 Status SetupTestEnv() {
745 dest_holder_.reset(test::GetWritableFileWriter(
746 new test::StringSink(&contents_), "" /* file name */));
747 assert(dest_holder_ != nullptr);
748 log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam()));
749 assert(log_writer_ != nullptr);
750
751 Status s;
752 s = env_->CreateDirIfMissing(test_dir_);
753 std::unique_ptr<WritableFile> writable_file;
754 if (s.ok()) {
755 s = env_->NewWritableFile(log_file_, &writable_file, env_options_);
756 }
757 if (s.ok()) {
f67539c2
TL
758 writer_.reset(new WritableFileWriter(
759 NewLegacyWritableFileWrapper(std::move(writable_file)), log_file_,
760 env_options_));
494da23a
TL
761 assert(writer_ != nullptr);
762 }
763 std::unique_ptr<SequentialFile> seq_file;
764 if (s.ok()) {
765 s = env_->NewSequentialFile(log_file_, &seq_file, env_options_);
766 }
767 if (s.ok()) {
f67539c2
TL
768 reader_.reset(new SequentialFileReader(
769 NewLegacySequentialFileWrapper(seq_file), log_file_));
494da23a
TL
770 assert(reader_ != nullptr);
771 log_reader_.reset(new FragmentBufferedReader(
772 nullptr, std::move(reader_), &report_, true /* checksum */,
773 123 /* log_number */));
774 assert(log_reader_ != nullptr);
775 }
776 return s;
777 }
778
779 std::string contents() {
f67539c2 780 auto file = test::GetStringSinkFromLegacyWriter(log_writer_->file());
494da23a
TL
781 assert(file != nullptr);
782 return file->contents_;
783 }
784
785 void Encode(const std::string& msg) { log_writer_->AddRecord(Slice(msg)); }
786
787 void Write(const Slice& data) {
788 writer_->Append(data);
789 writer_->Sync(true);
790 }
791
792 bool TryRead(std::string* result) {
793 assert(result != nullptr);
794 result->clear();
795 std::string scratch;
796 Slice record;
797 bool r = log_reader_->ReadRecord(&record, &scratch);
798 if (r) {
799 result->assign(record.data(), record.size());
800 return true;
801 } else {
802 return false;
803 }
804 }
805};
806
807TEST_P(RetriableLogTest, TailLog_PartialHeader) {
808 ASSERT_OK(SetupTestEnv());
809 std::vector<int> remaining_bytes_in_last_record;
810 size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
811 bool eof = false;
812 SyncPoint::GetInstance()->DisableProcessing();
813 SyncPoint::GetInstance()->LoadDependency(
814 {{"RetriableLogTest::TailLog:AfterPart1",
815 "RetriableLogTest::TailLog:BeforeReadRecord"},
816 {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
817 "RetriableLogTest::TailLog:BeforePart2"}});
818 SyncPoint::GetInstance()->ClearAllCallBacks();
819 SyncPoint::GetInstance()->SetCallBack(
820 "FragmentBufferedLogReader::TryReadMore:FirstEOF",
821 [&](void* /*arg*/) { eof = true; });
822 SyncPoint::GetInstance()->EnableProcessing();
823
824 size_t delta = header_size - 1;
825 port::Thread log_writer_thread([&]() {
826 size_t old_sz = contents().size();
827 Encode("foo");
828 size_t new_sz = contents().size();
829 std::string part1 = contents().substr(old_sz, delta);
830 std::string part2 =
831 contents().substr(old_sz + delta, new_sz - old_sz - delta);
832 Write(Slice(part1));
833 TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
834 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
835 Write(Slice(part2));
836 });
837
838 std::string record;
839 port::Thread log_reader_thread([&]() {
840 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
841 while (!TryRead(&record)) {
842 }
843 });
844 log_reader_thread.join();
845 log_writer_thread.join();
846 ASSERT_EQ("foo", record);
847 ASSERT_TRUE(eof);
848}
849
850TEST_P(RetriableLogTest, TailLog_FullHeader) {
851 ASSERT_OK(SetupTestEnv());
852 std::vector<int> remaining_bytes_in_last_record;
853 size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
854 bool eof = false;
855 SyncPoint::GetInstance()->DisableProcessing();
856 SyncPoint::GetInstance()->LoadDependency(
857 {{"RetriableLogTest::TailLog:AfterPart1",
858 "RetriableLogTest::TailLog:BeforeReadRecord"},
859 {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
860 "RetriableLogTest::TailLog:BeforePart2"}});
861 SyncPoint::GetInstance()->ClearAllCallBacks();
862 SyncPoint::GetInstance()->SetCallBack(
863 "FragmentBufferedLogReader::TryReadMore:FirstEOF",
864 [&](void* /*arg*/) { eof = true; });
865 SyncPoint::GetInstance()->EnableProcessing();
866
867 size_t delta = header_size + 1;
868 port::Thread log_writer_thread([&]() {
869 size_t old_sz = contents().size();
870 Encode("foo");
871 size_t new_sz = contents().size();
872 std::string part1 = contents().substr(old_sz, delta);
873 std::string part2 =
874 contents().substr(old_sz + delta, new_sz - old_sz - delta);
875 Write(Slice(part1));
876 TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
877 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
878 Write(Slice(part2));
879 ASSERT_TRUE(eof);
880 });
881
882 std::string record;
883 port::Thread log_reader_thread([&]() {
884 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
885 while (!TryRead(&record)) {
886 }
887 });
888 log_reader_thread.join();
889 log_writer_thread.join();
890 ASSERT_EQ("foo", record);
891}
892
893TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
894 // Clear all sync point callbacks even if this test does not use sync point.
895 // It is necessary, otherwise the execute of this test may hit a sync point
896 // with which a callback is registered. The registered callback may access
897 // some dead variable, causing segfault.
898 SyncPoint::GetInstance()->DisableProcessing();
899 SyncPoint::GetInstance()->ClearAllCallBacks();
900 ASSERT_OK(SetupTestEnv());
901 size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
902 size_t delta = header_size - 1;
903 size_t old_sz = contents().size();
904 Encode("foo-bar");
905 size_t new_sz = contents().size();
906 std::string part1 = contents().substr(old_sz, delta);
907 std::string part2 =
908 contents().substr(old_sz + delta, new_sz - old_sz - delta);
909 Write(Slice(part1));
910 std::string record;
911 ASSERT_FALSE(TryRead(&record));
912 ASSERT_TRUE(record.empty());
913 Write(Slice(part2));
914 ASSERT_TRUE(TryRead(&record));
915 ASSERT_EQ("foo-bar", record);
916}
917
918INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
7c673cae
FG
919
920} // namespace log
f67539c2 921} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
922
923int main(int argc, char** argv) {
924 ::testing::InitGoogleTest(&argc, argv);
925 return RUN_ALL_TESTS();
926}