1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/db_impl_secondary.h"
11 #include "db/db_test_util.h"
12 #include "port/stack_trace.h"
13 #include "util/fault_injection_test_env.h"
14 #include "util/sync_point.h"
19 class DBSecondaryTest
: public DBTestBase
{
22 : DBTestBase("/db_secondary_test"),
25 db_secondary_(nullptr) {
27 test::PerThreadDBPath(env_
, "/db_secondary_test_secondary");
30 ~DBSecondaryTest() override
{
32 if (getenv("KEEP_DB") != nullptr) {
33 fprintf(stdout
, "Secondary DB is still at %s\n", secondary_path_
.c_str());
37 EXPECT_OK(DestroyDB(secondary_path_
, options
));
42 Status
ReopenAsSecondary(const Options
& options
) {
43 return DB::OpenAsSecondary(options
, dbname_
, secondary_path_
, &db_
);
46 void OpenSecondary(const Options
& options
);
48 void OpenSecondaryWithColumnFamilies(
49 const std::vector
<std::string
>& column_families
, const Options
& options
);
51 void CloseSecondary() {
52 for (auto h
: handles_secondary_
) {
53 db_secondary_
->DestroyColumnFamilyHandle(h
);
55 handles_secondary_
.clear();
57 db_secondary_
= nullptr;
60 DBImplSecondary
* db_secondary_full() {
61 return static_cast<DBImplSecondary
*>(db_secondary_
);
64 void CheckFileTypeCounts(const std::string
& dir
, int expected_log
,
65 int expected_sst
, int expected_manifest
) const;
67 std::string secondary_path_
;
68 std::vector
<ColumnFamilyHandle
*> handles_secondary_
;
72 void DBSecondaryTest::OpenSecondary(const Options
& options
) {
74 DB::OpenAsSecondary(options
, dbname_
, secondary_path_
, &db_secondary_
);
78 void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
79 const std::vector
<std::string
>& column_families
, const Options
& options
) {
80 std::vector
<ColumnFamilyDescriptor
> cf_descs
;
81 cf_descs
.emplace_back(kDefaultColumnFamilyName
, options
);
82 for (const auto& cf_name
: column_families
) {
83 cf_descs
.emplace_back(cf_name
, options
);
85 Status s
= DB::OpenAsSecondary(options
, dbname_
, secondary_path_
, cf_descs
,
86 &handles_secondary_
, &db_secondary_
);
90 void DBSecondaryTest::CheckFileTypeCounts(const std::string
& dir
,
91 int expected_log
, int expected_sst
,
92 int expected_manifest
) const {
93 std::vector
<std::string
> filenames
;
94 env_
->GetChildren(dir
, &filenames
);
96 int log_cnt
= 0, sst_cnt
= 0, manifest_cnt
= 0;
97 for (auto file
: filenames
) {
100 if (ParseFileName(file
, &number
, &type
)) {
101 log_cnt
+= (type
== kLogFile
);
102 sst_cnt
+= (type
== kTableFile
);
103 manifest_cnt
+= (type
== kDescriptorFile
);
106 ASSERT_EQ(expected_log
, log_cnt
);
107 ASSERT_EQ(expected_sst
, sst_cnt
);
108 ASSERT_EQ(expected_manifest
, manifest_cnt
);
111 TEST_F(DBSecondaryTest
, ReopenAsSecondary
) {
115 ASSERT_OK(Put("foo", "foo_value"));
116 ASSERT_OK(Put("bar", "bar_value"));
117 ASSERT_OK(dbfull()->Flush(FlushOptions()));
120 ASSERT_OK(ReopenAsSecondary(options
));
121 ASSERT_EQ("foo_value", Get("foo"));
122 ASSERT_EQ("bar_value", Get("bar"));
124 ropts
.verify_checksums
= true;
125 auto db1
= static_cast<DBImplSecondary
*>(db_
);
126 ASSERT_NE(nullptr, db1
);
127 Iterator
* iter
= db1
->NewIterator(ropts
);
128 ASSERT_NE(nullptr, iter
);
130 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
132 ASSERT_EQ("bar", iter
->key().ToString());
133 ASSERT_EQ("bar_value", iter
->value().ToString());
134 } else if (1 == count
) {
135 ASSERT_EQ("foo", iter
->key().ToString());
136 ASSERT_EQ("foo_value", iter
->value().ToString());
144 TEST_F(DBSecondaryTest
, OpenAsSecondary
) {
147 options
.level0_file_num_compaction_trigger
= 4;
149 for (int i
= 0; i
< 3; ++i
) {
150 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i
)));
151 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i
)));
156 options1
.max_open_files
= -1;
157 OpenSecondary(options1
);
158 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
159 ASSERT_OK(dbfull()->TEST_WaitForCompact());
162 ropts
.verify_checksums
= true;
163 const auto verify_db_func
= [&](const std::string
& foo_val
,
164 const std::string
& bar_val
) {
166 ASSERT_OK(db_secondary_
->Get(ropts
, "foo", &value
));
167 ASSERT_EQ(foo_val
, value
);
168 ASSERT_OK(db_secondary_
->Get(ropts
, "bar", &value
));
169 ASSERT_EQ(bar_val
, value
);
170 Iterator
* iter
= db_secondary_
->NewIterator(ropts
);
171 ASSERT_NE(nullptr, iter
);
173 ASSERT_TRUE(iter
->Valid());
174 ASSERT_EQ("foo", iter
->key().ToString());
175 ASSERT_EQ(foo_val
, iter
->value().ToString());
177 ASSERT_TRUE(iter
->Valid());
178 ASSERT_EQ("bar", iter
->key().ToString());
179 ASSERT_EQ(bar_val
, iter
->value().ToString());
181 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
188 verify_db_func("foo_value2", "bar_value2");
190 ASSERT_OK(Put("foo", "new_foo_value"));
191 ASSERT_OK(Put("bar", "new_bar_value"));
194 ASSERT_OK(db_secondary_
->TryCatchUpWithPrimary());
195 verify_db_func("new_foo_value", "new_bar_value");
198 TEST_F(DBSecondaryTest
, OpenWithNonExistColumnFamily
) {
201 CreateAndReopenWithCF({"pikachu"}, options
);
205 options1
.max_open_files
= -1;
206 std::vector
<ColumnFamilyDescriptor
> cf_descs
;
207 cf_descs
.emplace_back(kDefaultColumnFamilyName
, options1
);
208 cf_descs
.emplace_back("pikachu", options1
);
209 cf_descs
.emplace_back("eevee", options1
);
210 Status s
= DB::OpenAsSecondary(options1
, dbname_
, secondary_path_
, cf_descs
,
211 &handles_secondary_
, &db_secondary_
);
215 TEST_F(DBSecondaryTest
, OpenWithSubsetOfColumnFamilies
) {
218 CreateAndReopenWithCF({"pikachu"}, options
);
221 options1
.max_open_files
= -1;
222 OpenSecondary(options1
);
223 ASSERT_EQ(0, handles_secondary_
.size());
224 ASSERT_NE(nullptr, db_secondary_
);
226 ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value"));
227 ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value"));
228 ASSERT_OK(Flush(0 /*cf*/));
229 ASSERT_OK(Flush(1 /*cf*/));
230 ASSERT_OK(db_secondary_
->TryCatchUpWithPrimary());
232 ropts
.verify_checksums
= true;
234 ASSERT_OK(db_secondary_
->Get(ropts
, "foo", &value
));
235 ASSERT_EQ("foo_value", value
);
238 TEST_F(DBSecondaryTest
, SwitchToNewManifestDuringOpen
) {
244 SyncPoint::GetInstance()->DisableProcessing();
245 SyncPoint::GetInstance()->ClearAllCallBacks();
246 SyncPoint::GetInstance()->LoadDependency(
247 {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0",
248 "VersionSet::ProcessManifestWrites:BeforeNewManifest"},
249 {"VersionSet::ProcessManifestWrites:AfterNewManifest",
250 "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:"
252 SyncPoint::GetInstance()->EnableProcessing();
254 // Make sure db calls RecoverLogFiles so as to trigger a manifest write,
255 // which causes the db to switch to a new MANIFEST upon start.
256 port::Thread
ro_db_thread([&]() {
259 options1
.max_open_files
= -1;
260 OpenSecondary(options1
);
267 TEST_F(DBSecondaryTest
, MissingTableFileDuringOpen
) {
270 options
.level0_file_num_compaction_trigger
= 4;
272 for (int i
= 0; i
!= options
.level0_file_num_compaction_trigger
; ++i
) {
273 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i
)));
274 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i
)));
275 ASSERT_OK(dbfull()->Flush(FlushOptions()));
277 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
278 ASSERT_OK(dbfull()->TEST_WaitForCompact());
281 options1
.max_open_files
= -1;
282 OpenSecondary(options1
);
284 ropts
.verify_checksums
= true;
286 ASSERT_OK(db_secondary_
->Get(ropts
, "foo", &value
));
287 ASSERT_EQ("foo_value" +
288 std::to_string(options
.level0_file_num_compaction_trigger
- 1),
290 ASSERT_OK(db_secondary_
->Get(ropts
, "bar", &value
));
291 ASSERT_EQ("bar_value" +
292 std::to_string(options
.level0_file_num_compaction_trigger
- 1),
294 Iterator
* iter
= db_secondary_
->NewIterator(ropts
);
295 ASSERT_NE(nullptr, iter
);
297 ASSERT_TRUE(iter
->Valid());
298 ASSERT_EQ("bar", iter
->key().ToString());
299 ASSERT_EQ("bar_value" +
300 std::to_string(options
.level0_file_num_compaction_trigger
- 1),
301 iter
->value().ToString());
303 ASSERT_TRUE(iter
->Valid());
304 ASSERT_EQ("foo", iter
->key().ToString());
305 ASSERT_EQ("foo_value" +
306 std::to_string(options
.level0_file_num_compaction_trigger
- 1),
307 iter
->value().ToString());
309 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
316 TEST_F(DBSecondaryTest
, MissingTableFile
) {
317 int table_files_not_exist
= 0;
318 SyncPoint::GetInstance()->DisableProcessing();
319 SyncPoint::GetInstance()->ClearAllCallBacks();
320 SyncPoint::GetInstance()->SetCallBack(
321 "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers",
323 Status s
= *reinterpret_cast<Status
*>(arg
);
324 if (s
.IsPathNotFound()) {
325 ++table_files_not_exist
;
326 } else if (!s
.ok()) {
327 assert(false); // Should not reach here
330 SyncPoint::GetInstance()->EnableProcessing();
333 options
.level0_file_num_compaction_trigger
= 4;
338 options1
.max_open_files
= -1;
339 OpenSecondary(options1
);
341 for (int i
= 0; i
!= options
.level0_file_num_compaction_trigger
; ++i
) {
342 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i
)));
343 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i
)));
344 ASSERT_OK(dbfull()->Flush(FlushOptions()));
346 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
347 ASSERT_OK(dbfull()->TEST_WaitForCompact());
349 ASSERT_NE(nullptr, db_secondary_full());
351 ropts
.verify_checksums
= true;
353 ASSERT_NOK(db_secondary_
->Get(ropts
, "foo", &value
));
354 ASSERT_NOK(db_secondary_
->Get(ropts
, "bar", &value
));
356 ASSERT_OK(db_secondary_
->TryCatchUpWithPrimary());
357 ASSERT_EQ(options
.level0_file_num_compaction_trigger
, table_files_not_exist
);
358 ASSERT_OK(db_secondary_
->Get(ropts
, "foo", &value
));
359 ASSERT_EQ("foo_value" +
360 std::to_string(options
.level0_file_num_compaction_trigger
- 1),
362 ASSERT_OK(db_secondary_
->Get(ropts
, "bar", &value
));
363 ASSERT_EQ("bar_value" +
364 std::to_string(options
.level0_file_num_compaction_trigger
- 1),
366 Iterator
* iter
= db_secondary_
->NewIterator(ropts
);
367 ASSERT_NE(nullptr, iter
);
369 ASSERT_TRUE(iter
->Valid());
370 ASSERT_EQ("bar", iter
->key().ToString());
371 ASSERT_EQ("bar_value" +
372 std::to_string(options
.level0_file_num_compaction_trigger
- 1),
373 iter
->value().ToString());
375 ASSERT_TRUE(iter
->Valid());
376 ASSERT_EQ("foo", iter
->key().ToString());
377 ASSERT_EQ("foo_value" +
378 std::to_string(options
.level0_file_num_compaction_trigger
- 1),
379 iter
->value().ToString());
381 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
388 TEST_F(DBSecondaryTest
, PrimaryDropColumnFamily
) {
391 const std::string kCfName1
= "pikachu";
392 CreateAndReopenWithCF({kCfName1
}, options
);
396 options1
.max_open_files
= -1;
397 OpenSecondaryWithColumnFamilies({kCfName1
}, options1
);
398 ASSERT_EQ(2, handles_secondary_
.size());
400 ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1"));
401 ASSERT_OK(Flush(1 /*cf*/));
403 ASSERT_OK(db_secondary_
->TryCatchUpWithPrimary());
405 ropts
.verify_checksums
= true;
407 ASSERT_OK(db_secondary_
->Get(ropts
, handles_secondary_
[1], "foo", &value
));
408 ASSERT_EQ("foo_val_1", value
);
410 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
412 CheckFileTypeCounts(dbname_
, 1, 0, 1);
413 ASSERT_OK(db_secondary_
->TryCatchUpWithPrimary());
415 ASSERT_OK(db_secondary_
->Get(ropts
, handles_secondary_
[1], "foo", &value
));
416 ASSERT_EQ("foo_val_1", value
);
419 TEST_F(DBSecondaryTest
, SwitchManifest
) {
422 options
.level0_file_num_compaction_trigger
= 4;
427 options1
.max_open_files
= -1;
428 OpenSecondary(options1
);
430 const int kNumFiles
= options
.level0_file_num_compaction_trigger
- 1;
431 // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1,
433 const int kNumKeys
= 10;
435 for (int i
= 0; i
!= kNumFiles
; ++i
) {
436 for (int j
= 0; j
!= kNumKeys
; ++j
) {
437 ASSERT_OK(Put("key" + std::to_string(j
), "value_" + std::to_string(i
)));
442 ASSERT_OK(db_secondary_
->TryCatchUpWithPrimary());
443 const auto& range_scan_db
= [&]() {
444 ReadOptions tmp_ropts
;
445 tmp_ropts
.total_order_seek
= true;
446 tmp_ropts
.verify_checksums
= true;
447 std::unique_ptr
<Iterator
> iter(db_secondary_
->NewIterator(tmp_ropts
));
449 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next(), ++cnt
) {
450 ASSERT_EQ("key" + std::to_string(cnt
), iter
->key().ToString());
451 ASSERT_EQ("value_" + std::to_string(kNumFiles
- 1),
452 iter
->value().ToString());
458 // While secondary instance still keeps old MANIFEST open, we close primary,
459 // restart primary, performs full compaction, close again, restart again so
460 // that next time secondary tries to catch up with primary, the secondary
461 // will skip the MANIFEST in middle.
463 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
464 ASSERT_OK(dbfull()->TEST_WaitForCompact());
467 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
469 ASSERT_OK(db_secondary_
->TryCatchUpWithPrimary());
472 #endif //! ROCKSDB_LITE
474 } // namespace rocksdb
476 int main(int argc
, char** argv
) {
477 rocksdb::port::InstallStackTraceHandler();
478 ::testing::InitGoogleTest(&argc
, argv
);
479 return RUN_ALL_TESTS();