]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl/db_secondary_test.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_secondary_test.cc
CommitLineData
494da23a
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
f67539c2 10#include "db/db_impl/db_impl_secondary.h"
494da23a
TL
11#include "db/db_test_util.h"
12#include "port/stack_trace.h"
f67539c2 13#include "test_util/sync_point.h"
20effc67 14#include "utilities/fault_injection_env.h"
494da23a 15
f67539c2 16namespace ROCKSDB_NAMESPACE {
494da23a
TL
17
18#ifndef ROCKSDB_LITE
19class DBSecondaryTest : public DBTestBase {
20 public:
21 DBSecondaryTest()
20effc67 22 : DBTestBase("/db_secondary_test", /*env_do_fsync=*/true),
494da23a
TL
23 secondary_path_(),
24 handles_secondary_(),
25 db_secondary_(nullptr) {
26 secondary_path_ =
27 test::PerThreadDBPath(env_, "/db_secondary_test_secondary");
28 }
29
30 ~DBSecondaryTest() override {
31 CloseSecondary();
32 if (getenv("KEEP_DB") != nullptr) {
33 fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str());
34 } else {
35 Options options;
36 options.env = env_;
37 EXPECT_OK(DestroyDB(secondary_path_, options));
38 }
39 }
40
41 protected:
42 Status ReopenAsSecondary(const Options& options) {
43 return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_);
44 }
45
46 void OpenSecondary(const Options& options);
47
20effc67
TL
48 Status TryOpenSecondary(const Options& options);
49
494da23a
TL
50 void OpenSecondaryWithColumnFamilies(
51 const std::vector<std::string>& column_families, const Options& options);
52
53 void CloseSecondary() {
54 for (auto h : handles_secondary_) {
20effc67 55 ASSERT_OK(db_secondary_->DestroyColumnFamilyHandle(h));
494da23a
TL
56 }
57 handles_secondary_.clear();
58 delete db_secondary_;
59 db_secondary_ = nullptr;
60 }
61
62 DBImplSecondary* db_secondary_full() {
63 return static_cast<DBImplSecondary*>(db_secondary_);
64 }
65
66 void CheckFileTypeCounts(const std::string& dir, int expected_log,
67 int expected_sst, int expected_manifest) const;
68
69 std::string secondary_path_;
70 std::vector<ColumnFamilyHandle*> handles_secondary_;
71 DB* db_secondary_;
72};
73
74void DBSecondaryTest::OpenSecondary(const Options& options) {
20effc67
TL
75 ASSERT_OK(TryOpenSecondary(options));
76}
77
78Status DBSecondaryTest::TryOpenSecondary(const Options& options) {
494da23a
TL
79 Status s =
80 DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
20effc67 81 return s;
494da23a
TL
82}
83
84void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
85 const std::vector<std::string>& column_families, const Options& options) {
86 std::vector<ColumnFamilyDescriptor> cf_descs;
87 cf_descs.emplace_back(kDefaultColumnFamilyName, options);
88 for (const auto& cf_name : column_families) {
89 cf_descs.emplace_back(cf_name, options);
90 }
91 Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs,
92 &handles_secondary_, &db_secondary_);
93 ASSERT_OK(s);
94}
95
96void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir,
97 int expected_log, int expected_sst,
98 int expected_manifest) const {
99 std::vector<std::string> filenames;
20effc67 100 ASSERT_OK(env_->GetChildren(dir, &filenames));
494da23a
TL
101
102 int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0;
103 for (auto file : filenames) {
104 uint64_t number;
105 FileType type;
106 if (ParseFileName(file, &number, &type)) {
20effc67 107 log_cnt += (type == kWalFile);
494da23a
TL
108 sst_cnt += (type == kTableFile);
109 manifest_cnt += (type == kDescriptorFile);
110 }
111 }
112 ASSERT_EQ(expected_log, log_cnt);
113 ASSERT_EQ(expected_sst, sst_cnt);
114 ASSERT_EQ(expected_manifest, manifest_cnt);
115}
116
117TEST_F(DBSecondaryTest, ReopenAsSecondary) {
118 Options options;
119 options.env = env_;
120 Reopen(options);
121 ASSERT_OK(Put("foo", "foo_value"));
122 ASSERT_OK(Put("bar", "bar_value"));
123 ASSERT_OK(dbfull()->Flush(FlushOptions()));
124 Close();
125
126 ASSERT_OK(ReopenAsSecondary(options));
127 ASSERT_EQ("foo_value", Get("foo"));
128 ASSERT_EQ("bar_value", Get("bar"));
129 ReadOptions ropts;
130 ropts.verify_checksums = true;
131 auto db1 = static_cast<DBImplSecondary*>(db_);
132 ASSERT_NE(nullptr, db1);
133 Iterator* iter = db1->NewIterator(ropts);
134 ASSERT_NE(nullptr, iter);
135 size_t count = 0;
136 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
137 if (0 == count) {
138 ASSERT_EQ("bar", iter->key().ToString());
139 ASSERT_EQ("bar_value", iter->value().ToString());
140 } else if (1 == count) {
141 ASSERT_EQ("foo", iter->key().ToString());
142 ASSERT_EQ("foo_value", iter->value().ToString());
143 }
144 ++count;
145 }
146 delete iter;
147 ASSERT_EQ(2, count);
148}
149
150TEST_F(DBSecondaryTest, OpenAsSecondary) {
151 Options options;
152 options.env = env_;
153 options.level0_file_num_compaction_trigger = 4;
154 Reopen(options);
155 for (int i = 0; i < 3; ++i) {
156 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
157 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
158 ASSERT_OK(Flush());
159 }
160 Options options1;
161 options1.env = env_;
162 options1.max_open_files = -1;
163 OpenSecondary(options1);
164 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
165 ASSERT_OK(dbfull()->TEST_WaitForCompact());
166
167 ReadOptions ropts;
168 ropts.verify_checksums = true;
169 const auto verify_db_func = [&](const std::string& foo_val,
170 const std::string& bar_val) {
171 std::string value;
172 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
173 ASSERT_EQ(foo_val, value);
174 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
175 ASSERT_EQ(bar_val, value);
176 Iterator* iter = db_secondary_->NewIterator(ropts);
177 ASSERT_NE(nullptr, iter);
178 iter->Seek("foo");
179 ASSERT_TRUE(iter->Valid());
180 ASSERT_EQ("foo", iter->key().ToString());
181 ASSERT_EQ(foo_val, iter->value().ToString());
182 iter->Seek("bar");
183 ASSERT_TRUE(iter->Valid());
184 ASSERT_EQ("bar", iter->key().ToString());
185 ASSERT_EQ(bar_val, iter->value().ToString());
186 size_t count = 0;
187 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
188 ++count;
189 }
190 ASSERT_EQ(2, count);
191 delete iter;
192 };
193
194 verify_db_func("foo_value2", "bar_value2");
195
196 ASSERT_OK(Put("foo", "new_foo_value"));
197 ASSERT_OK(Put("bar", "new_bar_value"));
198 ASSERT_OK(Flush());
199
200 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
201 verify_db_func("new_foo_value", "new_bar_value");
202}
203
f67539c2
TL
204namespace {
205class TraceFileEnv : public EnvWrapper {
206 public:
207 explicit TraceFileEnv(Env* _target) : EnvWrapper(_target) {}
208 Status NewRandomAccessFile(const std::string& f,
209 std::unique_ptr<RandomAccessFile>* r,
210 const EnvOptions& env_options) override {
211 class TracedRandomAccessFile : public RandomAccessFile {
212 public:
213 TracedRandomAccessFile(std::unique_ptr<RandomAccessFile>&& target,
214 std::atomic<int>& counter)
215 : target_(std::move(target)), files_closed_(counter) {}
216 ~TracedRandomAccessFile() override {
217 files_closed_.fetch_add(1, std::memory_order_relaxed);
218 }
219 Status Read(uint64_t offset, size_t n, Slice* result,
220 char* scratch) const override {
221 return target_->Read(offset, n, result, scratch);
222 }
223
224 private:
225 std::unique_ptr<RandomAccessFile> target_;
226 std::atomic<int>& files_closed_;
227 };
228 Status s = target()->NewRandomAccessFile(f, r, env_options);
229 if (s.ok()) {
230 r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_));
231 }
232 return s;
233 }
234
235 int files_closed() const {
236 return files_closed_.load(std::memory_order_relaxed);
237 }
238
239 private:
240 std::atomic<int> files_closed_{0};
241};
242} // namespace
243
244TEST_F(DBSecondaryTest, SecondaryCloseFiles) {
245 Options options;
246 options.env = env_;
247 options.max_open_files = 1;
248 options.disable_auto_compactions = true;
249 Reopen(options);
250 Options options1;
251 std::unique_ptr<Env> traced_env(new TraceFileEnv(env_));
252 options1.env = traced_env.get();
253 OpenSecondary(options1);
254
255 static const auto verify_db = [&]() {
256 std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
257 std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(ReadOptions()));
258 for (iter1->SeekToFirst(), iter2->SeekToFirst();
259 iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
260 ASSERT_EQ(iter1->key(), iter2->key());
261 ASSERT_EQ(iter1->value(), iter2->value());
262 }
263 ASSERT_FALSE(iter1->Valid());
264 ASSERT_FALSE(iter2->Valid());
265 };
266
267 ASSERT_OK(Put("a", "value"));
268 ASSERT_OK(Put("c", "value"));
269 ASSERT_OK(Flush());
270 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
271 verify_db();
272
273 ASSERT_OK(Put("b", "value"));
274 ASSERT_OK(Put("d", "value"));
275 ASSERT_OK(Flush());
276 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
277 verify_db();
278
279 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
280 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
281 ASSERT_EQ(2, static_cast<TraceFileEnv*>(traced_env.get())->files_closed());
282
283 Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}});
284 ASSERT_TRUE(s.IsNotSupported());
285 CloseSecondary();
286}
287
288TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
289 Options options;
290 options.env = env_;
291 options.level0_file_num_compaction_trigger = 4;
292 Reopen(options);
293 for (int i = 0; i < 3; ++i) {
294 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
295 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
296 }
297 Options options1;
298 options1.env = env_;
299 options1.max_open_files = -1;
300 OpenSecondary(options1);
301
302 ReadOptions ropts;
303 ropts.verify_checksums = true;
304 const auto verify_db_func = [&](const std::string& foo_val,
305 const std::string& bar_val) {
306 std::string value;
307 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
308 ASSERT_EQ(foo_val, value);
309 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
310 ASSERT_EQ(bar_val, value);
311 Iterator* iter = db_secondary_->NewIterator(ropts);
312 ASSERT_NE(nullptr, iter);
313 iter->Seek("foo");
314 ASSERT_TRUE(iter->Valid());
315 ASSERT_EQ("foo", iter->key().ToString());
316 ASSERT_EQ(foo_val, iter->value().ToString());
317 iter->Seek("bar");
318 ASSERT_TRUE(iter->Valid());
319 ASSERT_EQ("bar", iter->key().ToString());
320 ASSERT_EQ(bar_val, iter->value().ToString());
321 size_t count = 0;
322 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
323 ++count;
324 }
325 ASSERT_EQ(2, count);
326 delete iter;
327 };
328
329 verify_db_func("foo_value2", "bar_value2");
330
331 ASSERT_OK(Put("foo", "new_foo_value"));
332 ASSERT_OK(Put("bar", "new_bar_value"));
333
334 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
335 verify_db_func("new_foo_value", "new_bar_value");
336
337 ASSERT_OK(Flush());
338 ASSERT_OK(Put("foo", "new_foo_value_1"));
339 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
340 verify_db_func("new_foo_value_1", "new_bar_value");
341}
342
494da23a
TL
343TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {
344 Options options;
345 options.env = env_;
346 CreateAndReopenWithCF({"pikachu"}, options);
347
348 Options options1;
349 options1.env = env_;
350 options1.max_open_files = -1;
351 std::vector<ColumnFamilyDescriptor> cf_descs;
352 cf_descs.emplace_back(kDefaultColumnFamilyName, options1);
353 cf_descs.emplace_back("pikachu", options1);
354 cf_descs.emplace_back("eevee", options1);
355 Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs,
356 &handles_secondary_, &db_secondary_);
357 ASSERT_NOK(s);
358}
359
360TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) {
361 Options options;
362 options.env = env_;
363 CreateAndReopenWithCF({"pikachu"}, options);
364 Options options1;
365 options1.env = env_;
366 options1.max_open_files = -1;
367 OpenSecondary(options1);
368 ASSERT_EQ(0, handles_secondary_.size());
369 ASSERT_NE(nullptr, db_secondary_);
370
371 ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value"));
372 ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value"));
373 ASSERT_OK(Flush(0 /*cf*/));
374 ASSERT_OK(Flush(1 /*cf*/));
375 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
376 ReadOptions ropts;
377 ropts.verify_checksums = true;
378 std::string value;
379 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
380 ASSERT_EQ("foo_value", value);
381}
382
383TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) {
384 Options options;
385 options.env = env_;
386 Reopen(options);
387 Close();
388
389 SyncPoint::GetInstance()->DisableProcessing();
390 SyncPoint::GetInstance()->ClearAllCallBacks();
391 SyncPoint::GetInstance()->LoadDependency(
392 {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0",
393 "VersionSet::ProcessManifestWrites:BeforeNewManifest"},
394 {"VersionSet::ProcessManifestWrites:AfterNewManifest",
395 "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:"
396 "1"}});
397 SyncPoint::GetInstance()->EnableProcessing();
398
399 // Make sure db calls RecoverLogFiles so as to trigger a manifest write,
400 // which causes the db to switch to a new MANIFEST upon start.
401 port::Thread ro_db_thread([&]() {
402 Options options1;
403 options1.env = env_;
404 options1.max_open_files = -1;
405 OpenSecondary(options1);
406 CloseSecondary();
407 });
408 Reopen(options);
409 ro_db_thread.join();
410}
411
412TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) {
413 Options options;
414 options.env = env_;
415 options.level0_file_num_compaction_trigger = 4;
416 Reopen(options);
417 for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
418 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
419 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
420 ASSERT_OK(dbfull()->Flush(FlushOptions()));
421 }
422 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
423 ASSERT_OK(dbfull()->TEST_WaitForCompact());
424 Options options1;
425 options1.env = env_;
426 options1.max_open_files = -1;
427 OpenSecondary(options1);
428 ReadOptions ropts;
429 ropts.verify_checksums = true;
430 std::string value;
431 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
432 ASSERT_EQ("foo_value" +
433 std::to_string(options.level0_file_num_compaction_trigger - 1),
434 value);
435 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
436 ASSERT_EQ("bar_value" +
437 std::to_string(options.level0_file_num_compaction_trigger - 1),
438 value);
439 Iterator* iter = db_secondary_->NewIterator(ropts);
440 ASSERT_NE(nullptr, iter);
441 iter->Seek("bar");
442 ASSERT_TRUE(iter->Valid());
443 ASSERT_EQ("bar", iter->key().ToString());
444 ASSERT_EQ("bar_value" +
445 std::to_string(options.level0_file_num_compaction_trigger - 1),
446 iter->value().ToString());
447 iter->Seek("foo");
448 ASSERT_TRUE(iter->Valid());
449 ASSERT_EQ("foo", iter->key().ToString());
450 ASSERT_EQ("foo_value" +
451 std::to_string(options.level0_file_num_compaction_trigger - 1),
452 iter->value().ToString());
453 size_t count = 0;
454 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
455 ++count;
456 }
457 ASSERT_EQ(2, count);
458 delete iter;
459}
460
461TEST_F(DBSecondaryTest, MissingTableFile) {
462 int table_files_not_exist = 0;
463 SyncPoint::GetInstance()->DisableProcessing();
464 SyncPoint::GetInstance()->ClearAllCallBacks();
465 SyncPoint::GetInstance()->SetCallBack(
f67539c2 466 "ReactiveVersionSet::ApplyOneVersionEditToBuilder:AfterLoadTableHandlers",
494da23a
TL
467 [&](void* arg) {
468 Status s = *reinterpret_cast<Status*>(arg);
469 if (s.IsPathNotFound()) {
470 ++table_files_not_exist;
471 } else if (!s.ok()) {
472 assert(false); // Should not reach here
473 }
474 });
475 SyncPoint::GetInstance()->EnableProcessing();
476 Options options;
477 options.env = env_;
478 options.level0_file_num_compaction_trigger = 4;
479 Reopen(options);
480
481 Options options1;
482 options1.env = env_;
483 options1.max_open_files = -1;
484 OpenSecondary(options1);
485
486 for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
487 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
488 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
489 ASSERT_OK(dbfull()->Flush(FlushOptions()));
490 }
491 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
492 ASSERT_OK(dbfull()->TEST_WaitForCompact());
493
494 ASSERT_NE(nullptr, db_secondary_full());
495 ReadOptions ropts;
496 ropts.verify_checksums = true;
497 std::string value;
498 ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value));
499 ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value));
500
501 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
502 ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist);
503 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
504 ASSERT_EQ("foo_value" +
505 std::to_string(options.level0_file_num_compaction_trigger - 1),
506 value);
507 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
508 ASSERT_EQ("bar_value" +
509 std::to_string(options.level0_file_num_compaction_trigger - 1),
510 value);
511 Iterator* iter = db_secondary_->NewIterator(ropts);
512 ASSERT_NE(nullptr, iter);
513 iter->Seek("bar");
514 ASSERT_TRUE(iter->Valid());
515 ASSERT_EQ("bar", iter->key().ToString());
516 ASSERT_EQ("bar_value" +
517 std::to_string(options.level0_file_num_compaction_trigger - 1),
518 iter->value().ToString());
519 iter->Seek("foo");
520 ASSERT_TRUE(iter->Valid());
521 ASSERT_EQ("foo", iter->key().ToString());
522 ASSERT_EQ("foo_value" +
523 std::to_string(options.level0_file_num_compaction_trigger - 1),
524 iter->value().ToString());
525 size_t count = 0;
526 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
527 ++count;
528 }
529 ASSERT_EQ(2, count);
530 delete iter;
531}
532
533TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) {
534 Options options;
535 options.env = env_;
536 const std::string kCfName1 = "pikachu";
537 CreateAndReopenWithCF({kCfName1}, options);
538
539 Options options1;
540 options1.env = env_;
541 options1.max_open_files = -1;
542 OpenSecondaryWithColumnFamilies({kCfName1}, options1);
543 ASSERT_EQ(2, handles_secondary_.size());
544
545 ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1"));
546 ASSERT_OK(Flush(1 /*cf*/));
547
548 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
549 ReadOptions ropts;
550 ropts.verify_checksums = true;
551 std::string value;
552 ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
553 ASSERT_EQ("foo_val_1", value);
554
555 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
556 Close();
557 CheckFileTypeCounts(dbname_, 1, 0, 1);
558 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
559 value.clear();
560 ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
561 ASSERT_EQ("foo_val_1", value);
562}
563
564TEST_F(DBSecondaryTest, SwitchManifest) {
565 Options options;
566 options.env = env_;
567 options.level0_file_num_compaction_trigger = 4;
568 Reopen(options);
569
570 Options options1;
571 options1.env = env_;
572 options1.max_open_files = -1;
573 OpenSecondary(options1);
574
575 const int kNumFiles = options.level0_file_num_compaction_trigger - 1;
576 // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1,
577 // ..., 9.
578 const int kNumKeys = 10;
579 // Create two sst
580 for (int i = 0; i != kNumFiles; ++i) {
581 for (int j = 0; j != kNumKeys; ++j) {
582 ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i)));
583 }
584 ASSERT_OK(Flush());
585 }
586
587 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
588 const auto& range_scan_db = [&]() {
589 ReadOptions tmp_ropts;
590 tmp_ropts.total_order_seek = true;
591 tmp_ropts.verify_checksums = true;
592 std::unique_ptr<Iterator> iter(db_secondary_->NewIterator(tmp_ropts));
593 int cnt = 0;
594 for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) {
595 ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString());
596 ASSERT_EQ("value_" + std::to_string(kNumFiles - 1),
597 iter->value().ToString());
598 }
599 };
600
601 range_scan_db();
602
603 // While secondary instance still keeps old MANIFEST open, we close primary,
604 // restart primary, performs full compaction, close again, restart again so
605 // that next time secondary tries to catch up with primary, the secondary
606 // will skip the MANIFEST in middle.
607 Reopen(options);
608 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
609 ASSERT_OK(dbfull()->TEST_WaitForCompact());
610
611 Reopen(options);
612 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
613
614 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
615 range_scan_db();
616}
f67539c2
TL
617
618// Here, "Snapshot" refers to the version edits written by
619// VersionSet::WriteSnapshot() at the beginning of the new MANIFEST after
620// switching from the old one.
621TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) {
622 Options options;
623 options.env = env_;
624 options.disable_auto_compactions = true;
625 Reopen(options);
626
627 Options options1;
628 options1.env = env_;
629 options1.max_open_files = -1;
630 OpenSecondary(options1);
631
632 ASSERT_OK(Put("0", "value0"));
633 ASSERT_OK(Flush());
634 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
635 std::string value;
636 ReadOptions ropts;
637 ropts.verify_checksums = true;
638 ASSERT_OK(db_secondary_->Get(ropts, "0", &value));
639 ASSERT_EQ("value0", value);
640
641 Reopen(options);
642 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
643 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
644}
645
646TEST_F(DBSecondaryTest, SwitchWAL) {
647 const int kNumKeysPerMemtable = 1;
648 Options options;
649 options.env = env_;
650 options.max_write_buffer_number = 4;
651 options.min_write_buffer_number_to_merge = 2;
652 options.memtable_factory.reset(
653 new SpecialSkipListFactory(kNumKeysPerMemtable));
654 Reopen(options);
655
656 Options options1;
657 options1.env = env_;
658 options1.max_open_files = -1;
659 OpenSecondary(options1);
660
661 const auto& verify_db = [](DB* db1, DB* db2) {
662 ASSERT_NE(nullptr, db1);
663 ASSERT_NE(nullptr, db2);
664 ReadOptions read_opts;
665 read_opts.verify_checksums = true;
666 std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts));
667 std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts));
668 it1->SeekToFirst();
669 it2->SeekToFirst();
670 for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) {
671 ASSERT_EQ(it1->key(), it2->key());
672 ASSERT_EQ(it1->value(), it2->value());
673 }
674 ASSERT_FALSE(it1->Valid());
675 ASSERT_FALSE(it2->Valid());
676
677 for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
678 std::string value;
679 ASSERT_OK(db2->Get(read_opts, it1->key(), &value));
680 ASSERT_EQ(it1->value(), value);
681 }
682 for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
683 std::string value;
684 ASSERT_OK(db1->Get(read_opts, it2->key(), &value));
685 ASSERT_EQ(it2->value(), value);
686 }
687 };
688 for (int k = 0; k != 16; ++k) {
689 ASSERT_OK(Put("key" + std::to_string(k), "value" + std::to_string(k)));
690 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
691 verify_db(dbfull(), db_secondary_);
692 }
693}
694
695TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) {
696 const int kNumKeysPerMemtable = 1;
697 SyncPoint::GetInstance()->DisableProcessing();
698 SyncPoint::GetInstance()->LoadDependency(
699 {{"DBImpl::BackgroundCallFlush:ContextCleanedUp",
700 "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"}});
701 SyncPoint::GetInstance()->EnableProcessing();
702 const std::string kCFName1 = "pikachu";
703 Options options;
704 options.env = env_;
705 options.max_write_buffer_number = 4;
706 options.min_write_buffer_number_to_merge = 2;
707 options.memtable_factory.reset(
708 new SpecialSkipListFactory(kNumKeysPerMemtable));
709 CreateAndReopenWithCF({kCFName1}, options);
710
711 Options options1;
712 options1.env = env_;
713 options1.max_open_files = -1;
714 OpenSecondaryWithColumnFamilies({kCFName1}, options1);
715 ASSERT_EQ(2, handles_secondary_.size());
716
717 const auto& verify_db = [](DB* db1,
718 const std::vector<ColumnFamilyHandle*>& handles1,
719 DB* db2,
720 const std::vector<ColumnFamilyHandle*>& handles2) {
721 ASSERT_NE(nullptr, db1);
722 ASSERT_NE(nullptr, db2);
723 ReadOptions read_opts;
724 read_opts.verify_checksums = true;
725 ASSERT_EQ(handles1.size(), handles2.size());
726 for (size_t i = 0; i != handles1.size(); ++i) {
727 std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts, handles1[i]));
728 std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts, handles2[i]));
729 it1->SeekToFirst();
730 it2->SeekToFirst();
731 for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) {
732 ASSERT_EQ(it1->key(), it2->key());
733 ASSERT_EQ(it1->value(), it2->value());
734 }
735 ASSERT_FALSE(it1->Valid());
736 ASSERT_FALSE(it2->Valid());
737
738 for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
739 std::string value;
740 ASSERT_OK(db2->Get(read_opts, handles2[i], it1->key(), &value));
741 ASSERT_EQ(it1->value(), value);
742 }
743 for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
744 std::string value;
745 ASSERT_OK(db1->Get(read_opts, handles1[i], it2->key(), &value));
746 ASSERT_EQ(it2->value(), value);
747 }
748 }
749 };
750 for (int k = 0; k != 8; ++k) {
751 ASSERT_OK(
752 Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
753 ASSERT_OK(
754 Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
755 TEST_SYNC_POINT(
756 "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp");
757 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
758 verify_db(dbfull(), handles_, db_secondary_, handles_secondary_);
759 SyncPoint::GetInstance()->ClearTrace();
760 }
761}
762
763TEST_F(DBSecondaryTest, CatchUpAfterFlush) {
764 const int kNumKeysPerMemtable = 16;
765 Options options;
766 options.env = env_;
767 options.max_write_buffer_number = 4;
768 options.min_write_buffer_number_to_merge = 2;
769 options.memtable_factory.reset(
770 new SpecialSkipListFactory(kNumKeysPerMemtable));
771 Reopen(options);
772
773 Options options1;
774 options1.env = env_;
775 options1.max_open_files = -1;
776 OpenSecondary(options1);
777
778 WriteOptions write_opts;
779 WriteBatch wb;
20effc67
TL
780 ASSERT_OK(wb.Put("key0", "value0"));
781 ASSERT_OK(wb.Put("key1", "value1"));
f67539c2
TL
782 ASSERT_OK(dbfull()->Write(write_opts, &wb));
783 ReadOptions read_opts;
784 std::unique_ptr<Iterator> iter1(db_secondary_->NewIterator(read_opts));
785 iter1->Seek("key0");
786 ASSERT_FALSE(iter1->Valid());
787 iter1->Seek("key1");
788 ASSERT_FALSE(iter1->Valid());
789 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
790 iter1->Seek("key0");
791 ASSERT_FALSE(iter1->Valid());
792 iter1->Seek("key1");
793 ASSERT_FALSE(iter1->Valid());
20effc67 794 ASSERT_OK(iter1->status());
f67539c2
TL
795 std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(read_opts));
796 iter2->Seek("key0");
797 ASSERT_TRUE(iter2->Valid());
798 ASSERT_EQ("value0", iter2->value());
799 iter2->Seek("key1");
800 ASSERT_TRUE(iter2->Valid());
20effc67 801 ASSERT_OK(iter2->status());
f67539c2
TL
802 ASSERT_EQ("value1", iter2->value());
803
804 {
805 WriteBatch wb1;
20effc67
TL
806 ASSERT_OK(wb1.Put("key0", "value01"));
807 ASSERT_OK(wb1.Put("key1", "value11"));
f67539c2
TL
808 ASSERT_OK(dbfull()->Write(write_opts, &wb1));
809 }
810
811 {
812 WriteBatch wb2;
20effc67
TL
813 ASSERT_OK(wb2.Put("key0", "new_value0"));
814 ASSERT_OK(wb2.Delete("key1"));
f67539c2
TL
815 ASSERT_OK(dbfull()->Write(write_opts, &wb2));
816 }
817
818 ASSERT_OK(Flush());
819
820 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
821 std::unique_ptr<Iterator> iter3(db_secondary_->NewIterator(read_opts));
822 // iter3 should not see value01 and value11 at all.
823 iter3->Seek("key0");
824 ASSERT_TRUE(iter3->Valid());
825 ASSERT_EQ("new_value0", iter3->value());
826 iter3->Seek("key1");
827 ASSERT_FALSE(iter3->Valid());
20effc67 828 ASSERT_OK(iter3->status());
f67539c2
TL
829}
830
831TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) {
832 bool called = false;
833 Options options;
834 options.env = env_;
835 options.disable_auto_compactions = true;
836 Reopen(options);
837 SyncPoint::GetInstance()->DisableProcessing();
838 SyncPoint::GetInstance()->ClearAllCallBacks();
839 SyncPoint::GetInstance()->SetCallBack(
840 "DBImplSecondary::CheckConsistency:AfterFirstAttempt", [&](void* arg) {
841 ASSERT_NE(nullptr, arg);
842 called = true;
843 auto* s = reinterpret_cast<Status*>(arg);
844 ASSERT_NOK(*s);
845 });
846 SyncPoint::GetInstance()->LoadDependency(
847 {{"DBImpl::CheckConsistency:AfterGetLiveFilesMetaData",
848 "BackgroundCallCompaction:0"},
849 {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
850 "DBImpl::CheckConsistency:BeforeGetFileSize"}});
851 SyncPoint::GetInstance()->EnableProcessing();
852
853 ASSERT_OK(Put("a", "value0"));
854 ASSERT_OK(Put("c", "value0"));
855 ASSERT_OK(Flush());
856 ASSERT_OK(Put("b", "value1"));
857 ASSERT_OK(Put("d", "value1"));
858 ASSERT_OK(Flush());
859 port::Thread thread([this]() {
860 Options opts;
861 opts.env = env_;
862 opts.max_open_files = -1;
863 OpenSecondary(opts);
864 });
865 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
866 ASSERT_OK(dbfull()->TEST_WaitForCompact());
867 thread.join();
868 ASSERT_TRUE(called);
869}
20effc67
TL
870
871TEST_F(DBSecondaryTest, StartFromInconsistent) {
872 Options options = CurrentOptions();
873 DestroyAndReopen(options);
874 ASSERT_OK(Put("foo", "value"));
875 ASSERT_OK(Flush());
876 SyncPoint::GetInstance()->DisableProcessing();
877 SyncPoint::GetInstance()->ClearAllCallBacks();
878 SyncPoint::GetInstance()->SetCallBack(
879 "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
880 ASSERT_NE(nullptr, arg);
881 *(reinterpret_cast<Status*>(arg)) =
882 Status::Corruption("Inject corruption");
883 });
884 SyncPoint::GetInstance()->EnableProcessing();
885 Options options1;
886 options1.env = env_;
887 Status s = TryOpenSecondary(options1);
888 ASSERT_TRUE(s.IsCorruption());
889}
890
891TEST_F(DBSecondaryTest, InconsistencyDuringCatchUp) {
892 Options options = CurrentOptions();
893 DestroyAndReopen(options);
894 ASSERT_OK(Put("foo", "value"));
895 ASSERT_OK(Flush());
896
897 Options options1;
898 options1.env = env_;
899 OpenSecondary(options1);
900
901 {
902 std::string value;
903 ASSERT_OK(db_secondary_->Get(ReadOptions(), "foo", &value));
904 ASSERT_EQ("value", value);
905 }
906
907 ASSERT_OK(Put("bar", "value1"));
908 ASSERT_OK(Flush());
909
910 SyncPoint::GetInstance()->DisableProcessing();
911 SyncPoint::GetInstance()->ClearAllCallBacks();
912 SyncPoint::GetInstance()->SetCallBack(
913 "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
914 ASSERT_NE(nullptr, arg);
915 *(reinterpret_cast<Status*>(arg)) =
916 Status::Corruption("Inject corruption");
917 });
918 SyncPoint::GetInstance()->EnableProcessing();
919 Status s = db_secondary_->TryCatchUpWithPrimary();
920 ASSERT_TRUE(s.IsCorruption());
921}
494da23a
TL
922#endif //! ROCKSDB_LITE
923
f67539c2 924} // namespace ROCKSDB_NAMESPACE
494da23a
TL
925
926int main(int argc, char** argv) {
f67539c2 927 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
494da23a
TL
928 ::testing::InitGoogleTest(&argc, argv);
929 return RUN_ALL_TESTS();
930}