#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/sst_file_writer.h"
+#include "util/fault_injection_test_env.h"
#include "util/filename.h"
#include "util/testutil.h"
namespace rocksdb {
-class ExternalSSTFileTest : public DBTestBase {
+class ExternalSSTFileTest
+ : public DBTestBase,
+ public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public:
ExternalSSTFileTest() : DBTestBase("/external_sst_file_test") {
sst_files_dir_ = dbname_ + "/sst_files/";
env_->CreateDir(sst_files_dir_);
}
- Status GenerateAndAddExternalFile(
- const Options options,
- std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
- bool allow_global_seqno = false, bool sort_data = false,
- std::map<std::string, std::string>* true_data = nullptr,
- ColumnFamilyHandle* cfh = nullptr) {
+ Status GenerateOneExternalFile(
+ const Options& options, ColumnFamilyHandle* cfh,
+ std::vector<std::pair<std::string, std::string>>& data, int file_id,
+ bool sort_data, std::string* external_file_path,
+ std::map<std::string, std::string>* true_data) {
// Generate a file id if not provided
- if (file_id == -1) {
- file_id = last_file_id_ + 1;
- last_file_id_++;
+ if (-1 == file_id) {
+ file_id = (++last_file_id_);
}
-
// Sort data if asked to do so
if (sort_data) {
std::sort(data.begin(), data.end(),
}
std::string file_path = sst_files_dir_ + ToString(file_id);
SstFileWriter sst_file_writer(EnvOptions(), options, cfh);
-
Status s = sst_file_writer.Open(file_path);
if (!s.ok()) {
return s;
}
- for (auto& entry : data) {
+ for (const auto& entry : data) {
s = sst_file_writer.Put(entry.first, entry.second);
if (!s.ok()) {
sst_file_writer.Finish();
}
}
s = sst_file_writer.Finish();
-
- if (s.ok()) {
- IngestExternalFileOptions ifo;
- ifo.allow_global_seqno = allow_global_seqno;
- if (cfh) {
- s = db_->IngestExternalFile(cfh, {file_path}, ifo);
- } else {
- s = db_->IngestExternalFile({file_path}, ifo);
- }
+ if (s.ok() && external_file_path != nullptr) {
+ *external_file_path = file_path;
}
-
- if (s.ok() && true_data) {
- for (auto& entry : data) {
- (*true_data)[entry.first] = entry.second;
+ if (s.ok() && nullptr != true_data) {
+ for (const auto& entry : data) {
+ true_data->insert({entry.first, entry.second});
}
}
-
return s;
}
- Status GenerateAndAddExternalFileIngestBehind(
- const Options options, const IngestExternalFileOptions ifo,
+ Status GenerateAndAddExternalFile(
+ const Options options,
std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
+ bool allow_global_seqno = false, bool write_global_seqno = false,
+ bool verify_checksums_before_ingest = true, bool ingest_behind = false,
bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
s = sst_file_writer.Finish();
if (s.ok()) {
+ IngestExternalFileOptions ifo;
+ ifo.allow_global_seqno = allow_global_seqno;
+ ifo.write_global_seqno = allow_global_seqno ? write_global_seqno : false;
+ ifo.verify_checksums_before_ingest = verify_checksums_before_ingest;
+ ifo.ingest_behind = ingest_behind;
if (cfh) {
s = db_->IngestExternalFile(cfh, {file_path}, ifo);
} else {
return s;
}
+ Status GenerateAndAddExternalFiles(
+ const Options& options,
+ const std::vector<ColumnFamilyHandle*>& column_families,
+ const std::vector<IngestExternalFileOptions>& ifos,
+ std::vector<std::vector<std::pair<std::string, std::string>>>& data,
+ int file_id, bool sort_data,
+ std::vector<std::map<std::string, std::string>>& true_data) {
+ if (-1 == file_id) {
+ file_id = (++last_file_id_);
+ }
+ // Generate external SST files, one for each column family
+ size_t num_cfs = column_families.size();
+ assert(ifos.size() == num_cfs);
+ assert(data.size() == num_cfs);
+ Status s;
+ std::vector<IngestExternalFileArg> args(num_cfs);
+ for (size_t i = 0; i != num_cfs; ++i) {
+ std::string external_file_path;
+ s = GenerateOneExternalFile(
+ options, column_families[i], data[i], file_id, sort_data,
+ &external_file_path,
+ true_data.size() == num_cfs ? &true_data[i] : nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+ ++file_id;
+ args[i].column_family = column_families[i];
+ args[i].external_files.push_back(external_file_path);
+ args[i].options = ifos[i];
+ }
+ s = db_->IngestExternalFiles(args);
+ return s;
+ }
Status GenerateAndAddExternalFile(
const Options options, std::vector<std::pair<int, std::string>> data,
- int file_id = -1, bool allow_global_seqno = false, bool sort_data = false,
+ int file_id = -1, bool allow_global_seqno = false,
+ bool write_global_seqno = false,
+ bool verify_checksums_before_ingest = true, bool ingest_behind = false,
+ bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
std::vector<std::pair<std::string, std::string>> file_data;
file_data.emplace_back(Key(entry.first), entry.second);
}
return GenerateAndAddExternalFile(options, file_data, file_id,
- allow_global_seqno, sort_data, true_data,
- cfh);
+ allow_global_seqno, write_global_seqno,
+ verify_checksums_before_ingest,
+ ingest_behind, sort_data, true_data, cfh);
}
Status GenerateAndAddExternalFile(
const Options options, std::vector<int> keys, int file_id = -1,
- bool allow_global_seqno = false, bool sort_data = false,
+ bool allow_global_seqno = false, bool write_global_seqno = false,
+ bool verify_checksums_before_ingest = true, bool ingest_behind = false,
+ bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
std::vector<std::pair<std::string, std::string>> file_data;
file_data.emplace_back(Key(k), Key(k) + ToString(file_id));
}
return GenerateAndAddExternalFile(options, file_data, file_id,
- allow_global_seqno, sort_data, true_data,
- cfh);
+ allow_global_seqno, write_global_seqno,
+ verify_checksums_before_ingest,
+ ingest_behind, sort_data, true_data, cfh);
}
Status DeprecatedAddFile(const std::vector<std::string>& files,
bool move_files = false,
- bool skip_snapshot_check = false) {
+ bool skip_snapshot_check = false,
+ bool skip_write_global_seqno = false) {
IngestExternalFileOptions opts;
opts.move_files = move_files;
opts.snapshot_consistency = !skip_snapshot_check;
opts.allow_global_seqno = false;
opts.allow_blocking_flush = false;
+ opts.write_global_seqno = !skip_write_global_seqno;
return db_->IngestExternalFile(files, opts);
}
- ~ExternalSSTFileTest() { test::DestroyDir(env_, sst_files_dir_); }
+ ~ExternalSSTFileTest() override { test::DestroyDir(env_, sst_files_dir_); }
protected:
int last_file_id_ = 0;
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
kRangeDelSkipConfigs));
}
+
class SstFileWriterCollector : public TablePropertiesCollector {
public:
explicit SstFileWriterCollector(const std::string prefix) : prefix_(prefix) {
return Status::OK();
}
- virtual UserCollectedProperties GetReadableProperties() const override {
+ UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties{};
}
public:
explicit SstFileWriterCollectorFactory(std::string prefix)
: prefix_(prefix), num_created_(0) {}
- virtual TablePropertiesCollector* CreateTablePropertiesCollector(
+ TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context /*context*/) override {
num_created_++;
return new SstFileWriterCollector(prefix_);
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
-TEST_F(ExternalSSTFileTest, PickedLevel) {
+TEST_P(ExternalSSTFileTest, PickedLevel) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;
options.level0_file_num_compaction_trigger = 4;
std::map<std::string, std::string> true_data;
// File 0 will go to last level (L3)
- ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, -1, false, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, -1, false, false, true,
+ false, false, &true_data));
EXPECT_EQ(FilesPerLevel(), "0,0,0,1");
// File 1 will go to level L2 (since it overlap with file 0 in L3)
- ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, -1, false, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, -1, false, false, true,
+ false, false, &true_data));
EXPECT_EQ(FilesPerLevel(), "0,0,1,1");
rocksdb::SyncPoint::GetInstance()->LoadDependency({
// This file overlaps with file 0 (L3), file 1 (L2) and the
// output of compaction going to L1
- ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1, false, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1, false, false, true,
+ false, false, &true_data));
EXPECT_EQ(FilesPerLevel(), "5,0,1,1");
// This file does not overlap with any file or with the running compaction
ASSERT_OK(GenerateAndAddExternalFile(options, {9000, 9001}, -1, false, false,
- &true_data));
+ false, false, false, &true_data));
EXPECT_EQ(FilesPerLevel(), "5,0,1,2");
// Hold compaction from finishing
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
// After full compaction, there should be only 1 file.
std::vector<std::string> files;
// This file overlaps with the output of the compaction (going to L3)
// so the file will be added to L0 since L3 is the base level
ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1, false,
- false, &true_data));
+ false, true, false, false, &true_data));
EXPECT_EQ(FilesPerLevel(), "5");
// This file does not overlap with the current running compactiong
ASSERT_OK(GenerateAndAddExternalFile(options, {9000, 9001}, -1, false, false,
- &true_data));
+ true, false, false, &true_data));
EXPECT_EQ(FilesPerLevel(), "5,0,0,1");
// Hold compaction from finishing
Reopen(options);
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 15, 19}, -1, false, false,
- &true_data));
+ true, false, false, &true_data));
ASSERT_EQ(FilesPerLevel(), "1,0,0,3");
ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1001, 1002}, -1, false,
- false, &true_data));
+ false, true, false, false, &true_data));
ASSERT_EQ(FilesPerLevel(), "1,0,0,4");
ASSERT_OK(GenerateAndAddExternalFile(options, {500, 600, 700}, -1, false,
- false, &true_data));
+ false, true, false, false, &true_data));
ASSERT_EQ(FilesPerLevel(), "1,0,0,5");
// File 5 overlaps with file 2 (L3 / base level)
- ASSERT_OK(GenerateAndAddExternalFile(options, {2, 10}, -1, false, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(options, {2, 10}, -1, false, false, true,
+ false, false, &true_data));
ASSERT_EQ(FilesPerLevel(), "2,0,0,5");
// File 6 overlaps with file 2 (L3 / base level) and file 5 (L0)
- ASSERT_OK(GenerateAndAddExternalFile(options, {3, 9}, -1, false, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(options, {3, 9}, -1, false, false, true,
+ false, false, &true_data));
ASSERT_EQ(FilesPerLevel(), "3,0,0,5");
// Verify data in files
// File 7 overlaps with file 4 (L3)
ASSERT_OK(GenerateAndAddExternalFile(options, {650, 651, 652}, -1, false,
- false, &true_data));
+ false, true, false, false, &true_data));
ASSERT_EQ(FilesPerLevel(), "5,0,0,5");
VerifyDBFromMap(true_data, &kcnt, false);
ASSERT_OK(DeprecatedAddFile({file_path}));
}
-TEST_F(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) {
+TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) {
Options options = CurrentOptions();
options.IncreaseParallelism(20);
options.level0_slowdown_writes_trigger = 256;
options.level0_stop_writes_trigger = 256;
+ bool write_global_seqno = std::get<0>(GetParam());
+ bool verify_checksums_before_ingest = std::get<1>(GetParam());
for (int iter = 0; iter < 2; iter++) {
bool write_to_memtable = (iter == 0);
DestroyAndReopen(options);
true_data[entry.first] = entry.second;
}
} else {
- ASSERT_OK(GenerateAndAddExternalFile(options, random_data, -1, true,
- true, &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, random_data, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, true, &true_data));
}
}
size_t kcnt = 0;
}
}
-TEST_F(ExternalSSTFileTest, IngestFileWithGlobalSeqnoAssignedLevel) {
+TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoAssignedLevel) {
Options options = CurrentOptions();
options.num_levels = 5;
options.disable_auto_compactions = true;
for (int i = 0; i <= 20; i++) {
file_data.emplace_back(Key(i), "L4");
}
- ASSERT_OK(GenerateAndAddExternalFile(options, file_data, -1, true, false,
- &true_data));
+ bool write_global_seqno = std::get<0>(GetParam());
+ bool verify_checksums_before_ingest = std::get<1>(GetParam());
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, file_data, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, false, &true_data));
// This file dont overlap with anything in the DB, will go to L4
ASSERT_EQ("0,0,0,0,1", FilesPerLevel());
for (int i = 80; i <= 130; i++) {
file_data.emplace_back(Key(i), "L0");
}
- ASSERT_OK(GenerateAndAddExternalFile(options, file_data, -1, true, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, file_data, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, false, &true_data));
// This file overlap with the memtable, so it will flush it and add
// it self to L0
for (int i = 30; i <= 50; i++) {
file_data.emplace_back(Key(i), "L4");
}
- ASSERT_OK(GenerateAndAddExternalFile(options, file_data, -1, true, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, file_data, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, false, &true_data));
// This file dont overlap with anything in the DB and fit in L4 as well
ASSERT_EQ("2,0,0,0,2", FilesPerLevel());
for (int i = 10; i <= 40; i++) {
file_data.emplace_back(Key(i), "L3");
}
- ASSERT_OK(GenerateAndAddExternalFile(options, file_data, -1, true, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, file_data, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, false, &true_data));
// This file overlap with files in L4, we will ingest it in L3
ASSERT_EQ("2,0,0,1,2", FilesPerLevel());
VerifyDBFromMap(true_data, &kcnt, false);
}
-TEST_F(ExternalSSTFileTest, IngestFileWithGlobalSeqnoMemtableFlush) {
+TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoMemtableFlush) {
Options options = CurrentOptions();
DestroyAndReopen(options);
uint64_t entries_in_memtable;
&entries_in_memtable);
ASSERT_GE(entries_in_memtable, 1);
+ bool write_global_seqno = std::get<0>(GetParam());
+ bool verify_checksums_before_ingest = std::get<1>(GetParam());
// No need for flush
- ASSERT_OK(GenerateAndAddExternalFile(options, {90, 100, 110}, -1, true, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, {90, 100, 110}, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, false, &true_data));
db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable);
ASSERT_GE(entries_in_memtable, 1);
// This file will flush the memtable
- ASSERT_OK(GenerateAndAddExternalFile(options, {19, 20, 21}, -1, true, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, {19, 20, 21}, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, false, &true_data));
db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable);
ASSERT_EQ(entries_in_memtable, 0);
ASSERT_GE(entries_in_memtable, 1);
// No need for flush, this file keys fit between the memtable keys
- ASSERT_OK(GenerateAndAddExternalFile(options, {202, 203, 204}, -1, true,
- false, &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, {202, 203, 204}, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, false, &true_data));
db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable);
ASSERT_GE(entries_in_memtable, 1);
// This file will flush the memtable
- ASSERT_OK(GenerateAndAddExternalFile(options, {206, 207}, -1, true, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, {206, 207}, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, false, &true_data));
db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&entries_in_memtable);
ASSERT_EQ(entries_in_memtable, 0);
VerifyDBFromMap(true_data, &kcnt, false);
}
-TEST_F(ExternalSSTFileTest, L0SortingIssue) {
+TEST_P(ExternalSSTFileTest, L0SortingIssue) {
Options options = CurrentOptions();
options.num_levels = 2;
DestroyAndReopen(options);
ASSERT_OK(Put(Key(1), "memtable"));
ASSERT_OK(Put(Key(10), "memtable"));
+ bool write_global_seqno = std::get<0>(GetParam());
+ bool verify_checksums_before_ingest = std::get<1>(GetParam());
// No Flush needed, No global seqno needed, Ingest in L1
- ASSERT_OK(GenerateAndAddExternalFile(options, {7, 8}, -1, true, false));
+ ASSERT_OK(
+ GenerateAndAddExternalFile(options, {7, 8}, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, false));
// No Flush needed, but need a global seqno, Ingest in L0
- ASSERT_OK(GenerateAndAddExternalFile(options, {7, 8}, -1, true, false));
+ ASSERT_OK(
+ GenerateAndAddExternalFile(options, {7, 8}, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, false));
printf("%s\n", FilesPerLevel().c_str());
// Overwrite what we added using external files
std::vector<ExternalFileIngestionInfo> ingested_files;
};
-TEST_F(ExternalSSTFileTest, IngestionListener) {
+TEST_P(ExternalSSTFileTest, IngestionListener) {
Options options = CurrentOptions();
TestIngestExternalFileListener* listener =
new TestIngestExternalFileListener();
options.listeners.emplace_back(listener);
CreateAndReopenWithCF({"koko", "toto"}, options);
+ bool write_global_seqno = std::get<0>(GetParam());
+ bool verify_checksums_before_ingest = std::get<1>(GetParam());
// Ingest into default cf
- ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
- handles_[0]));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, {1, 2}, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, true, nullptr, handles_[0]));
ASSERT_EQ(listener->ingested_files.size(), 1);
ASSERT_EQ(listener->ingested_files.back().cf_name, "default");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
"default");
// Ingest into cf1
- ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
- handles_[1]));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, {1, 2}, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, true, nullptr, handles_[1]));
ASSERT_EQ(listener->ingested_files.size(), 2);
ASSERT_EQ(listener->ingested_files.back().cf_name, "koko");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
"koko");
// Ingest into cf2
- ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
- handles_[2]));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, {1, 2}, -1, true, write_global_seqno,
+ verify_checksums_before_ingest, false, true, nullptr, handles_[2]));
ASSERT_EQ(listener->ingested_files.size(), 3);
ASSERT_EQ(listener->ingested_files.back().cf_name, "toto");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
db_->ReleaseSnapshot(snap);
}
-TEST_F(ExternalSSTFileTest, IngestBehind) {
+TEST_P(ExternalSSTFileTest, IngestBehind) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 3;
file_data.emplace_back(Key(i), "ingest_behind");
}
- IngestExternalFileOptions ifo;
- ifo.allow_global_seqno = true;
- ifo.ingest_behind = true;
+ bool allow_global_seqno = true;
+ bool ingest_behind = true;
+ bool write_global_seqno = std::get<0>(GetParam());
+ bool verify_checksums_before_ingest = std::get<1>(GetParam());
// Can't ingest behind since allow_ingest_behind isn't set to true
- ASSERT_NOK(GenerateAndAddExternalFileIngestBehind(options, ifo,
- file_data, -1, false,
- &true_data));
+ ASSERT_NOK(GenerateAndAddExternalFile(
+ options, file_data, -1, allow_global_seqno, write_global_seqno,
+ verify_checksums_before_ingest, ingest_behind, false /*sort_data*/,
+ &true_data));
options.allow_ingest_behind = true;
// check that we still can open the DB, as num_levels should be
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Universal picker should go at second from the bottom level
ASSERT_EQ("0,1", FilesPerLevel());
- ASSERT_OK(GenerateAndAddExternalFileIngestBehind(options, ifo,
- file_data, -1, false,
- &true_data));
+ ASSERT_OK(GenerateAndAddExternalFile(
+ options, file_data, -1, allow_global_seqno, write_global_seqno,
+ verify_checksums_before_ingest, true /*ingest_behind*/,
+ false /*sort_data*/, &true_data));
ASSERT_EQ("0,1,1", FilesPerLevel());
// this time ingest should fail as the file doesn't fit to the bottom level
- ASSERT_NOK(GenerateAndAddExternalFileIngestBehind(options, ifo,
- file_data, -1, false,
- &true_data));
+ ASSERT_NOK(GenerateAndAddExternalFile(
+ options, file_data, -1, allow_global_seqno, write_global_seqno,
+ verify_checksums_before_ingest, true /*ingest_behind*/,
+ false /*sort_data*/, &true_data));
ASSERT_EQ("0,1,1", FilesPerLevel());
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// bottom level should be empty
}
}
+TEST_F(ExternalSSTFileTest, IngestFileWrittenWithCompressionDictionary) {
+ if (!ZSTD_Supported()) {
+ return;
+ }
+ const int kNumEntries = 1 << 10;
+ const int kNumBytesPerEntry = 1 << 10;
+ Options options = CurrentOptions();
+ options.compression = kZSTD;
+ options.compression_opts.max_dict_bytes = 1 << 14; // 16KB
+ options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
+ DestroyAndReopen(options);
+
+ std::atomic<int> num_compression_dicts(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
+ [&](void* /* arg */) { ++num_compression_dicts; });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ Random rnd(301);
+ std::vector<std::pair<std::string, std::string>> random_data;
+ for (int i = 0; i < kNumEntries; i++) {
+ std::string val;
+ test::RandomString(&rnd, kNumBytesPerEntry, &val);
+ random_data.emplace_back(Key(i), std::move(val));
+ }
+ ASSERT_OK(GenerateAndAddExternalFile(options, std::move(random_data)));
+ ASSERT_EQ(1, num_compression_dicts);
+}
+
+TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
+ std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
+ new FaultInjectionTestEnv(env_));
+ Options options = CurrentOptions();
+ options.env = fault_injection_env.get();
+ CreateAndReopenWithCF({"pikachu"}, options);
+ std::vector<ColumnFamilyHandle*> column_families;
+ column_families.push_back(handles_[0]);
+ column_families.push_back(handles_[1]);
+ std::vector<IngestExternalFileOptions> ifos(column_families.size());
+ for (auto& ifo : ifos) {
+ ifo.allow_global_seqno = true; // Always allow global_seqno
+ // May or may not write global_seqno
+ ifo.write_global_seqno = std::get<0>(GetParam());
+ // Whether to verify checksums before ingestion
+ ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
+ }
+ std::vector<std::vector<std::pair<std::string, std::string>>> data;
+ data.push_back(
+ {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
+ data.push_back(
+ {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
+ // Resize the true_data vector upon construction to avoid re-alloc
+ std::vector<std::map<std::string, std::string>> true_data(
+ column_families.size());
+ Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
+ -1, true, true_data);
+ ASSERT_OK(s);
+ Close();
+ ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
+ ASSERT_EQ(2, handles_.size());
+ int cf = 0;
+ for (const auto& verify_map : true_data) {
+ for (const auto& elem : verify_map) {
+ const std::string& key = elem.first;
+ const std::string& value = elem.second;
+ ASSERT_EQ(value, Get(cf, key));
+ }
+ ++cf;
+ }
+ Close();
+ Destroy(options, true /* delete_cf_paths */);
+}
+
+TEST_P(ExternalSSTFileTest,
+ IngestFilesIntoMultipleColumnFamilies_NoMixedStateWithSnapshot) {
+ std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
+ new FaultInjectionTestEnv(env_));
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->LoadDependency({
+ {"DBImpl::IngestExternalFiles:InstallSVForFirstCF:0",
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
+ "BeforeRead"},
+ {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
+ "AfterRead",
+ "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Options options = CurrentOptions();
+ options.env = fault_injection_env.get();
+ CreateAndReopenWithCF({"pikachu"}, options);
+ const std::vector<std::map<std::string, std::string>> data_before_ingestion =
+ {{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
+ {{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}};
+ for (size_t i = 0; i != handles_.size(); ++i) {
+ int cf = static_cast<int>(i);
+ const auto& orig_data = data_before_ingestion[i];
+ for (const auto& kv : orig_data) {
+ ASSERT_OK(Put(cf, kv.first, kv.second));
+ }
+ ASSERT_OK(Flush(cf));
+ }
+
+ std::vector<ColumnFamilyHandle*> column_families;
+ column_families.push_back(handles_[0]);
+ column_families.push_back(handles_[1]);
+ std::vector<IngestExternalFileOptions> ifos(column_families.size());
+ for (auto& ifo : ifos) {
+ ifo.allow_global_seqno = true; // Always allow global_seqno
+ // May or may not write global_seqno
+ ifo.write_global_seqno = std::get<0>(GetParam());
+ // Whether to verify checksums before ingestion
+ ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
+ }
+ std::vector<std::vector<std::pair<std::string, std::string>>> data;
+ data.push_back(
+ {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
+ data.push_back(
+ {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
+ // Resize the true_data vector upon construction to avoid re-alloc
+ std::vector<std::map<std::string, std::string>> true_data(
+ column_families.size());
+ // Take snapshot before ingestion starts
+ ReadOptions read_opts;
+ read_opts.total_order_seek = true;
+ read_opts.snapshot = dbfull()->GetSnapshot();
+ std::vector<Iterator*> iters(handles_.size());
+
+ // Range scan checks first kv of each CF before ingestion starts.
+ for (size_t i = 0; i != handles_.size(); ++i) {
+ iters[i] = dbfull()->NewIterator(read_opts, handles_[i]);
+ iters[i]->SeekToFirst();
+ ASSERT_TRUE(iters[i]->Valid());
+ const std::string& key = iters[i]->key().ToString();
+ const std::string& value = iters[i]->value().ToString();
+ const std::map<std::string, std::string>& orig_data =
+ data_before_ingestion[i];
+ std::map<std::string, std::string>::const_iterator it = orig_data.find(key);
+ ASSERT_NE(orig_data.end(), it);
+ ASSERT_EQ(it->second, value);
+ iters[i]->Next();
+ }
+ port::Thread ingest_thread([&]() {
+ ASSERT_OK(GenerateAndAddExternalFiles(options, column_families, ifos, data,
+ -1, true, true_data));
+ });
+ TEST_SYNC_POINT(
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
+ "BeforeRead");
+ // Should see only data before ingestion
+ for (size_t i = 0; i != handles_.size(); ++i) {
+ const auto& orig_data = data_before_ingestion[i];
+ for (; iters[i]->Valid(); iters[i]->Next()) {
+ const std::string& key = iters[i]->key().ToString();
+ const std::string& value = iters[i]->value().ToString();
+ std::map<std::string, std::string>::const_iterator it =
+ orig_data.find(key);
+ ASSERT_NE(orig_data.end(), it);
+ ASSERT_EQ(it->second, value);
+ }
+ }
+ TEST_SYNC_POINT(
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
+ "AfterRead");
+ ingest_thread.join();
+ for (auto* iter : iters) {
+ delete iter;
+ }
+ iters.clear();
+ dbfull()->ReleaseSnapshot(read_opts.snapshot);
+
+ Close();
+ ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
+ // Should see consistent state after ingestion for all column families even
+ // without snapshot.
+ ASSERT_EQ(2, handles_.size());
+ int cf = 0;
+ for (const auto& verify_map : true_data) {
+ for (const auto& elem : verify_map) {
+ const std::string& key = elem.first;
+ const std::string& value = elem.second;
+ ASSERT_EQ(value, Get(cf, key));
+ }
+ ++cf;
+ }
+ Close();
+ Destroy(options, true /* delete_cf_paths */);
+}
+
+TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
+ std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
+ new FaultInjectionTestEnv(env_));
+ Options options = CurrentOptions();
+ options.env = fault_injection_env.get();
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->LoadDependency({
+ {"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0",
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:"
+ "0"},
+ {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:"
+ "1",
+ "DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ CreateAndReopenWithCF({"pikachu"}, options);
+ std::vector<ColumnFamilyHandle*> column_families;
+ column_families.push_back(handles_[0]);
+ column_families.push_back(handles_[1]);
+ std::vector<IngestExternalFileOptions> ifos(column_families.size());
+ for (auto& ifo : ifos) {
+ ifo.allow_global_seqno = true; // Always allow global_seqno
+ // May or may not write global_seqno
+ ifo.write_global_seqno = std::get<0>(GetParam());
+ // Whether to verify block checksums before ingest
+ ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
+ }
+ std::vector<std::vector<std::pair<std::string, std::string>>> data;
+ data.push_back(
+ {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
+ data.push_back(
+ {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
+ // Resize the true_data vector upon construction to avoid re-alloc
+ std::vector<std::map<std::string, std::string>> true_data(
+ column_families.size());
+ port::Thread ingest_thread([&]() {
+ Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
+ -1, true, true_data);
+ ASSERT_NOK(s);
+ });
+ TEST_SYNC_POINT(
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:"
+ "0");
+ fault_injection_env->SetFilesystemActive(false);
+ TEST_SYNC_POINT(
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:"
+ "1");
+ ingest_thread.join();
+
+ fault_injection_env->SetFilesystemActive(true);
+ Close();
+ ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
+ ASSERT_EQ(2, handles_.size());
+ int cf = 0;
+ for (const auto& verify_map : true_data) {
+ for (const auto& elem : verify_map) {
+ const std::string& key = elem.first;
+ ASSERT_EQ("NOT_FOUND", Get(cf, key));
+ }
+ ++cf;
+ }
+ Close();
+ Destroy(options, true /* delete_cf_paths */);
+}
+
+TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
+ std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
+ new FaultInjectionTestEnv(env_));
+ Options options = CurrentOptions();
+ options.env = fault_injection_env.get();
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->LoadDependency({
+ {"DBImpl::IngestExternalFiles:BeforeJobsRun:0",
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
+ "0"},
+ {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
+ "1",
+ "DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ CreateAndReopenWithCF({"pikachu"}, options);
+ std::vector<ColumnFamilyHandle*> column_families;
+ column_families.push_back(handles_[0]);
+ column_families.push_back(handles_[1]);
+ std::vector<IngestExternalFileOptions> ifos(column_families.size());
+ for (auto& ifo : ifos) {
+ ifo.allow_global_seqno = true; // Always allow global_seqno
+ // May or may not write global_seqno
+ ifo.write_global_seqno = std::get<0>(GetParam());
+ // Whether to verify block checksums before ingestion
+ ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
+ }
+ std::vector<std::vector<std::pair<std::string, std::string>>> data;
+ data.push_back(
+ {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
+ data.push_back(
+ {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
+ // Resize the true_data vector upon construction to avoid re-alloc
+ std::vector<std::map<std::string, std::string>> true_data(
+ column_families.size());
+ port::Thread ingest_thread([&]() {
+ Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
+ -1, true, true_data);
+ ASSERT_NOK(s);
+ });
+ TEST_SYNC_POINT(
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
+ "0");
+ fault_injection_env->SetFilesystemActive(false);
+ TEST_SYNC_POINT(
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
+ "1");
+ ingest_thread.join();
+
+ fault_injection_env->SetFilesystemActive(true);
+ Close();
+ ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
+ ASSERT_EQ(2, handles_.size());
+ int cf = 0;
+ for (const auto& verify_map : true_data) {
+ for (const auto& elem : verify_map) {
+ const std::string& key = elem.first;
+ ASSERT_EQ("NOT_FOUND", Get(cf, key));
+ }
+ ++cf;
+ }
+ Close();
+ Destroy(options, true /* delete_cf_paths */);
+}
+
+TEST_P(ExternalSSTFileTest,
+ IngestFilesIntoMultipleColumnFamilies_PartialManifestWriteFail) {
+ std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
+ new FaultInjectionTestEnv(env_));
+ Options options = CurrentOptions();
+ options.env = fault_injection_env.get();
+
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ SyncPoint::GetInstance()->ClearTrace();
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->LoadDependency({
+ {"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
+ "PartialManifestWriteFail:0"},
+ {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
+ "PartialManifestWriteFail:1",
+ "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ std::vector<ColumnFamilyHandle*> column_families;
+ column_families.push_back(handles_[0]);
+ column_families.push_back(handles_[1]);
+ std::vector<IngestExternalFileOptions> ifos(column_families.size());
+ for (auto& ifo : ifos) {
+ ifo.allow_global_seqno = true; // Always allow global_seqno
+ // May or may not write global_seqno
+ ifo.write_global_seqno = std::get<0>(GetParam());
+ // Whether to verify block checksums before ingestion
+ ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
+ }
+ std::vector<std::vector<std::pair<std::string, std::string>>> data;
+ data.push_back(
+ {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
+ data.push_back(
+ {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
+ // Resize the true_data vector upon construction to avoid re-alloc
+ std::vector<std::map<std::string, std::string>> true_data(
+ column_families.size());
+ port::Thread ingest_thread([&]() {
+ Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
+ -1, true, true_data);
+ ASSERT_NOK(s);
+ });
+ TEST_SYNC_POINT(
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
+ "PartialManifestWriteFail:0");
+ fault_injection_env->SetFilesystemActive(false);
+ TEST_SYNC_POINT(
+ "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
+ "PartialManifestWriteFail:1");
+ ingest_thread.join();
+
+ fault_injection_env->DropUnsyncedFileData();
+ fault_injection_env->SetFilesystemActive(true);
+ Close();
+ ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
+ ASSERT_EQ(2, handles_.size());
+ int cf = 0;
+ for (const auto& verify_map : true_data) {
+ for (const auto& elem : verify_map) {
+ const std::string& key = elem.first;
+ ASSERT_EQ("NOT_FOUND", Get(cf, key));
+ }
+ ++cf;
+ }
+ Close();
+ Destroy(options, true /* delete_cf_paths */);
+}
+
+INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
+ testing::Values(std::make_tuple(false, false),
+ std::make_tuple(false, true),
+ std::make_tuple(true, false),
+ std::make_tuple(true, true)));
+
} // namespace rocksdb
int main(int argc, char** argv) {