]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/write_callback_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / write_callback_test.cc
index cb880560efc9d85c35631e8589b8a9aad6228147..df7d673aa7ebc3cf328bd27d31c0c7a065c05937 100644 (file)
 #include <utility>
 #include <vector>
 
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
 #include "db/write_callback.h"
+#include "port/port.h"
 #include "rocksdb/db.h"
 #include "rocksdb/write_batch.h"
-#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "test_util/testharness.h"
 #include "util/random.h"
-#include "util/sync_point.h"
-#include "util/testharness.h"
 
 using std::string;
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 class WriteCallbackTest : public testing::Test {
  public:
@@ -124,6 +124,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
       {false, false, true, false, true},
   };
 
+  for (auto& unordered_write : {true, false}) {
   for (auto& seq_per_batch : {true, false}) {
   for (auto& two_queues : {true, false}) {
     for (auto& allow_parallel : {true, false}) {
@@ -133,15 +134,22 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
             for (auto& write_group : write_scenarios) {
               Options options;
               options.create_if_missing = true;
+              options.unordered_write = unordered_write;
               options.allow_concurrent_memtable_write = allow_parallel;
               options.enable_pipelined_write = enable_pipelined_write;
               options.two_write_queues = two_queues;
+              // Skip unsupported combinations
               if (options.enable_pipelined_write && seq_per_batch) {
-                // This combination is not supported
                 continue;
               }
               if (options.enable_pipelined_write && options.two_write_queues) {
-                // This combination is not supported
+                continue;
+              }
+              if (options.unordered_write &&
+                  !options.allow_concurrent_memtable_write) {
+                continue;
+              }
+              if (options.unordered_write && options.enable_pipelined_write) {
                 continue;
               }
 
@@ -177,7 +185,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
               std::atomic<uint64_t> seq(db_impl->GetLatestSequenceNumber());
               ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0);
 
-              rocksdb::SyncPoint::GetInstance()->SetCallBack(
+              ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
                   "WriteThread::JoinBatchGroup:Start", [&](void*) {
                     uint64_t cur_threads_joining = threads_joining.fetch_add(1);
                     // Wait for the last joined writer to link to the queue.
@@ -189,7 +197,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
                   });
 
               // Verification once writers call JoinBatchGroup.
-              rocksdb::SyncPoint::GetInstance()->SetCallBack(
+              ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
                   "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
                     uint64_t cur_threads_linked = threads_linked.fetch_add(1);
                     bool is_leader = false;
@@ -228,7 +236,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
                     }
                   });
 
-              rocksdb::SyncPoint::GetInstance()->SetCallBack(
+              ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
                   "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) {
                     // check my state
                     auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
@@ -296,7 +304,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
                     PublishSeqCallback(DBImpl* db_impl_in)
                         : db_impl_(db_impl_in) {}
                     Status Callback(SequenceNumber last_seq, bool /*not used*/,
-                                    uint64_t) override {
+                                    uint64_t, size_t /*index*/,
+                                    size_t /*total*/) override {
                       db_impl_->SetLastPublishedSequence(last_seq);
                       return Status::OK();
                     }
@@ -321,7 +330,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
                 }
               };
 
-              rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+              ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
               // do all the writes
               std::vector<port::Thread> threads;
@@ -332,7 +341,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
                 t.join();
               }
 
-              rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+              ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 
               // check for keys
               string value;
@@ -358,8 +367,9 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
         }
       }
     }
-}
-}
+  }
+  }
+  }
 }
 
 TEST_F(WriteCallbackTest, WriteCallBackTest) {
@@ -423,7 +433,7 @@ TEST_F(WriteCallbackTest, WriteCallBackTest) {
   DestroyDB(dbname, options);
 }
 
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);