]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/flush_job_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / flush_job_test.cc
index 041edeaa4b17b1cdc3f9f369deefdc03a85bf91a..199ed29cacce17c1ae48f46e5519c71b64ef87f1 100644 (file)
@@ -30,6 +30,7 @@ class FlushJobTest : public testing::Test {
         dbname_(test::PerThreadDBPath("flush_job_test")),
         options_(),
         db_options_(options_),
+        column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}),
         table_cache_(NewLRUCache(50000, 16)),
         write_buffer_manager_(db_options_.db_write_buffer_size),
         versions_(new VersionSet(dbname_, &db_options_, env_options_,
@@ -45,7 +46,9 @@ class FlushJobTest : public testing::Test {
     NewDB();
     std::vector<ColumnFamilyDescriptor> column_families;
     cf_options_.table_factory = mock_table_factory_;
-    column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
+    for (const auto& cf_name : column_family_names_) {
+      column_families.emplace_back(cf_name, cf_options_);
+    }
 
     EXPECT_OK(versions_->Recover(column_families, false));
   }
@@ -56,18 +59,38 @@ class FlushJobTest : public testing::Test {
     new_db.SetNextFile(2);
     new_db.SetLastSequence(0);
 
+    autovector<VersionEdit> new_cfs;
+    SequenceNumber last_seq = 1;
+    uint32_t cf_id = 1;
+    for (size_t i = 1; i != column_family_names_.size(); ++i) {
+      VersionEdit new_cf;
+      new_cf.AddColumnFamily(column_family_names_[i]);
+      new_cf.SetColumnFamily(cf_id++);
+      new_cf.SetLogNumber(0);
+      new_cf.SetNextFile(2);
+      new_cf.SetLastSequence(last_seq++);
+      new_cfs.emplace_back(new_cf);
+    }
+
     const std::string manifest = DescriptorFileName(dbname_, 1);
-    unique_ptr<WritableFile> file;
+    std::unique_ptr<WritableFile> file;
     Status s = env_->NewWritableFile(
         manifest, &file, env_->OptimizeForManifestWrite(env_options_));
     ASSERT_OK(s);
-    unique_ptr<WritableFileWriter> file_writer(
+    std::unique_ptr<WritableFileWriter> file_writer(
         new WritableFileWriter(std::move(file), manifest, EnvOptions()));
     {
       log::Writer log(std::move(file_writer), 0, false);
       std::string record;
       new_db.EncodeTo(&record);
       s = log.AddRecord(record);
+
+      for (const auto& e : new_cfs) {
+        record.clear();
+        e.EncodeTo(&record);
+        s = log.AddRecord(record);
+        ASSERT_OK(s);
+      }
     }
     ASSERT_OK(s);
     // Make "CURRENT" file that points to the new manifest file.
@@ -79,6 +102,7 @@ class FlushJobTest : public testing::Test {
   EnvOptions env_options_;
   Options options_;
   ImmutableDBOptions db_options_;
+  const std::vector<std::string> column_family_names_;
   std::shared_ptr<Cache> table_cache_;
   WriteController write_controller_;
   WriteBufferManager write_buffer_manager_;
@@ -94,11 +118,14 @@ TEST_F(FlushJobTest, Empty) {
   auto cfd = versions_->GetColumnFamilySet()->GetDefault();
   EventLogger event_logger(db_options_.info_log.get());
   SnapshotChecker* snapshot_checker = nullptr;  // not relavant
-  FlushJob flush_job(
-      dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
-      *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_,
-      &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context,
-      nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, false);
+  FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
+                     db_options_, *cfd->GetLatestMutableCFOptions(),
+                     nullptr /* memtable_id */, env_options_, versions_.get(),
+                     &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
+                     snapshot_checker, &job_context, nullptr, nullptr, nullptr,
+                     kNoCompression, nullptr, &event_logger, false,
+                     true /* sync_output_directory */,
+                     true /* write_manifest */, Env::Priority::USER);
   {
     InstrumentedMutexLock l(&mutex_);
     flush_job.PickMemTable();
@@ -141,10 +168,12 @@ TEST_F(FlushJobTest, NonEmpty) {
   SnapshotChecker* snapshot_checker = nullptr;  // not relavant
   FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
                      db_options_, *cfd->GetLatestMutableCFOptions(),
-                     env_options_, versions_.get(), &mutex_, &shutting_down_,
-                     {}, kMaxSequenceNumber, snapshot_checker, &job_context,
-                     nullptr, nullptr, nullptr, kNoCompression,
-                     db_options_.statistics.get(), &event_logger, true);
+                     nullptr /* memtable_id */, env_options_, versions_.get(),
+                     &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
+                     snapshot_checker, &job_context, nullptr, nullptr, nullptr,
+                     kNoCompression, db_options_.statistics.get(),
+                     &event_logger, true, true /* sync_output_directory */,
+                     true /* write_manifest */, Env::Priority::USER);
 
   HistogramData hist;
   FileMetaData file_meta;
@@ -165,23 +194,198 @@ TEST_F(FlushJobTest, NonEmpty) {
   job_context.Clean();
 }
 
+TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
+  const size_t num_mems = 2;
+  const size_t num_mems_to_flush = 1;
+  const size_t num_keys_per_table = 100;
+  JobContext job_context(0);
+  ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
+  std::vector<uint64_t> memtable_ids;
+  std::vector<MemTable*> new_mems;
+  for (size_t i = 0; i != num_mems; ++i) {
+    MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
+                                              kMaxSequenceNumber);
+    mem->SetID(i);
+    mem->Ref();
+    new_mems.emplace_back(mem);
+    memtable_ids.push_back(mem->GetID());
+
+    for (size_t j = 0; j < num_keys_per_table; ++j) {
+      std::string key(ToString(j + i * num_keys_per_table));
+      std::string value("value" + key);
+      mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, key,
+               value);
+    }
+  }
+
+  autovector<MemTable*> to_delete;
+  for (auto mem : new_mems) {
+    cfd->imm()->Add(mem, &to_delete);
+  }
+
+  EventLogger event_logger(db_options_.info_log.get());
+  SnapshotChecker* snapshot_checker = nullptr;  // not relavant
+
+  assert(memtable_ids.size() == num_mems);
+  uint64_t smallest_memtable_id = memtable_ids.front();
+  uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
+
+  FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
+                     db_options_, *cfd->GetLatestMutableCFOptions(),
+                     &flush_memtable_id, env_options_, versions_.get(), &mutex_,
+                     &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker,
+                     &job_context, nullptr, nullptr, nullptr, kNoCompression,
+                     db_options_.statistics.get(), &event_logger, true,
+                     true /* sync_output_directory */,
+                     true /* write_manifest */, Env::Priority::USER);
+  HistogramData hist;
+  FileMetaData file_meta;
+  mutex_.Lock();
+  flush_job.PickMemTable();
+  ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta));
+  mutex_.Unlock();
+  db_options_.statistics->histogramData(FLUSH_TIME, &hist);
+  ASSERT_GT(hist.average, 0.0);
+
+  ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
+  ASSERT_EQ("99", file_meta.largest.user_key().ToString());
+  ASSERT_EQ(0, file_meta.fd.smallest_seqno);
+  ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1),
+            file_meta.fd.largest_seqno);
+
+  for (auto m : to_delete) {
+    delete m;
+  }
+  to_delete.clear();
+  job_context.Clean();
+}
+
+TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
+  autovector<ColumnFamilyData*> all_cfds;
+  for (auto cfd : *versions_->GetColumnFamilySet()) {
+    all_cfds.push_back(cfd);
+  }
+  const std::vector<size_t> num_memtables = {2, 1, 3};
+  assert(num_memtables.size() == column_family_names_.size());
+  const size_t num_keys_per_memtable = 1000;
+  JobContext job_context(0);
+  std::vector<uint64_t> memtable_ids;
+  std::vector<SequenceNumber> smallest_seqs;
+  std::vector<SequenceNumber> largest_seqs;
+  autovector<MemTable*> to_delete;
+  SequenceNumber curr_seqno = 0;
+  size_t k = 0;
+  for (auto cfd : all_cfds) {
+    smallest_seqs.push_back(curr_seqno);
+    for (size_t i = 0; i != num_memtables[k]; ++i) {
+      MemTable* mem = cfd->ConstructNewMemtable(
+          *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
+      mem->SetID(i);
+      mem->Ref();
+
+      for (size_t j = 0; j != num_keys_per_memtable; ++j) {
+        std::string key(ToString(j + i * num_keys_per_memtable));
+        std::string value("value" + key);
+        mem->Add(curr_seqno++, kTypeValue, key, value);
+      }
+
+      cfd->imm()->Add(mem, &to_delete);
+    }
+    largest_seqs.push_back(curr_seqno - 1);
+    memtable_ids.push_back(num_memtables[k++] - 1);
+  }
+
+  EventLogger event_logger(db_options_.info_log.get());
+  SnapshotChecker* snapshot_checker = nullptr;  // not relevant
+  std::vector<FlushJob> flush_jobs;
+  k = 0;
+  for (auto cfd : all_cfds) {
+    std::vector<SequenceNumber> snapshot_seqs;
+    flush_jobs.emplace_back(
+        dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
+        &memtable_ids[k], env_options_, versions_.get(), &mutex_,
+        &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker,
+        &job_context, nullptr, nullptr, nullptr, kNoCompression,
+        db_options_.statistics.get(), &event_logger, true,
+        false /* sync_output_directory */, false /* write_manifest */,
+        Env::Priority::USER);
+    k++;
+  }
+  HistogramData hist;
+  std::vector<FileMetaData> file_metas;
+  // Call reserve to avoid auto-resizing
+  file_metas.reserve(flush_jobs.size());
+  mutex_.Lock();
+  for (auto& job : flush_jobs) {
+    job.PickMemTable();
+  }
+  for (auto& job : flush_jobs) {
+    FileMetaData meta;
+    // Run will release and re-acquire  mutex
+    ASSERT_OK(job.Run(nullptr /**/, &meta));
+    file_metas.emplace_back(meta);
+  }
+  autovector<FileMetaData*> file_meta_ptrs;
+  for (auto& meta : file_metas) {
+    file_meta_ptrs.push_back(&meta);
+  }
+  autovector<const autovector<MemTable*>*> mems_list;
+  for (size_t i = 0; i != all_cfds.size(); ++i) {
+    const auto& mems = flush_jobs[i].GetMemTables();
+    mems_list.push_back(&mems);
+  }
+  autovector<const MutableCFOptions*> mutable_cf_options_list;
+  for (auto cfd : all_cfds) {
+    mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
+  }
+
+  Status s = InstallMemtableAtomicFlushResults(
+      nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
+      versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
+      nullptr /* db_directory */, nullptr /* log_buffer */);
+  ASSERT_OK(s);
+
+  mutex_.Unlock();
+  db_options_.statistics->histogramData(FLUSH_TIME, &hist);
+  ASSERT_GT(hist.average, 0.0);
+  k = 0;
+  for (const auto& file_meta : file_metas) {
+    ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
+    ASSERT_EQ("999", file_meta.largest.user_key()
+                         .ToString());  // max key by bytewise comparator
+    ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno);
+    ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno);
+    // Verify that imm is empty
+    ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+              all_cfds[k]->imm()->GetEarliestMemTableID());
+    ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID());
+    ++k;
+  }
+
+  for (auto m : to_delete) {
+    delete m;
+  }
+  to_delete.clear();
+  job_context.Clean();
+}
+
 TEST_F(FlushJobTest, Snapshots) {
   JobContext job_context(0);
   auto cfd = versions_->GetColumnFamilySet()->GetDefault();
   auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
                                            kMaxSequenceNumber);
 
-  std::vector<SequenceNumber> snapshots;
   std::set<SequenceNumber> snapshots_set;
   int keys = 10000;
   int max_inserts_per_keys = 8;
 
   Random rnd(301);
   for (int i = 0; i < keys / 2; ++i) {
-    snapshots.push_back(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1);
-    snapshots_set.insert(snapshots.back());
+    snapshots_set.insert(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1);
   }
-  std::sort(snapshots.begin(), snapshots.end());
+  // set has already removed the duplicate snapshots
+  std::vector<SequenceNumber> snapshots(snapshots_set.begin(),
+                                        snapshots_set.end());
 
   new_mem->Ref();
   SequenceNumber current_seqno = 0;
@@ -215,10 +419,12 @@ TEST_F(FlushJobTest, Snapshots) {
   SnapshotChecker* snapshot_checker = nullptr;  // not relavant
   FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
                      db_options_, *cfd->GetLatestMutableCFOptions(),
-                     env_options_, versions_.get(), &mutex_, &shutting_down_,
-                     snapshots, kMaxSequenceNumber, snapshot_checker,
-                     &job_context, nullptr, nullptr, nullptr, kNoCompression,
-                     db_options_.statistics.get(), &event_logger, true);
+                     nullptr /* memtable_id */, env_options_, versions_.get(),
+                     &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
+                     snapshot_checker, &job_context, nullptr, nullptr, nullptr,
+                     kNoCompression, db_options_.statistics.get(),
+                     &event_logger, true, true /* sync_output_directory */,
+                     true /* write_manifest */, Env::Priority::USER);
   mutex_.Lock();
   flush_job.PickMemTable();
   ASSERT_OK(flush_job.Run());