]>
Commit | Line | Data |
---|---|---|
494da23a TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
f67539c2 | 10 | #include "db/db_impl/db_impl_secondary.h" |
494da23a TL |
11 | #include "db/db_test_util.h" |
12 | #include "port/stack_trace.h" | |
f67539c2 | 13 | #include "test_util/sync_point.h" |
20effc67 | 14 | #include "utilities/fault_injection_env.h" |
494da23a | 15 | |
f67539c2 | 16 | namespace ROCKSDB_NAMESPACE { |
494da23a TL |
17 | |
18 | #ifndef ROCKSDB_LITE | |
19 | class DBSecondaryTest : public DBTestBase { | |
20 | public: | |
21 | DBSecondaryTest() | |
20effc67 | 22 | : DBTestBase("/db_secondary_test", /*env_do_fsync=*/true), |
494da23a TL |
23 | secondary_path_(), |
24 | handles_secondary_(), | |
25 | db_secondary_(nullptr) { | |
26 | secondary_path_ = | |
27 | test::PerThreadDBPath(env_, "/db_secondary_test_secondary"); | |
28 | } | |
29 | ||
30 | ~DBSecondaryTest() override { | |
31 | CloseSecondary(); | |
32 | if (getenv("KEEP_DB") != nullptr) { | |
33 | fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str()); | |
34 | } else { | |
35 | Options options; | |
36 | options.env = env_; | |
37 | EXPECT_OK(DestroyDB(secondary_path_, options)); | |
38 | } | |
39 | } | |
40 | ||
41 | protected: | |
42 | Status ReopenAsSecondary(const Options& options) { | |
43 | return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_); | |
44 | } | |
45 | ||
46 | void OpenSecondary(const Options& options); | |
47 | ||
20effc67 TL |
48 | Status TryOpenSecondary(const Options& options); |
49 | ||
494da23a TL |
50 | void OpenSecondaryWithColumnFamilies( |
51 | const std::vector<std::string>& column_families, const Options& options); | |
52 | ||
53 | void CloseSecondary() { | |
54 | for (auto h : handles_secondary_) { | |
20effc67 | 55 | ASSERT_OK(db_secondary_->DestroyColumnFamilyHandle(h)); |
494da23a TL |
56 | } |
57 | handles_secondary_.clear(); | |
58 | delete db_secondary_; | |
59 | db_secondary_ = nullptr; | |
60 | } | |
61 | ||
62 | DBImplSecondary* db_secondary_full() { | |
63 | return static_cast<DBImplSecondary*>(db_secondary_); | |
64 | } | |
65 | ||
66 | void CheckFileTypeCounts(const std::string& dir, int expected_log, | |
67 | int expected_sst, int expected_manifest) const; | |
68 | ||
69 | std::string secondary_path_; | |
70 | std::vector<ColumnFamilyHandle*> handles_secondary_; | |
71 | DB* db_secondary_; | |
72 | }; | |
73 | ||
74 | void DBSecondaryTest::OpenSecondary(const Options& options) { | |
20effc67 TL |
75 | ASSERT_OK(TryOpenSecondary(options)); |
76 | } | |
77 | ||
78 | Status DBSecondaryTest::TryOpenSecondary(const Options& options) { | |
494da23a TL |
79 | Status s = |
80 | DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_); | |
20effc67 | 81 | return s; |
494da23a TL |
82 | } |
83 | ||
84 | void DBSecondaryTest::OpenSecondaryWithColumnFamilies( | |
85 | const std::vector<std::string>& column_families, const Options& options) { | |
86 | std::vector<ColumnFamilyDescriptor> cf_descs; | |
87 | cf_descs.emplace_back(kDefaultColumnFamilyName, options); | |
88 | for (const auto& cf_name : column_families) { | |
89 | cf_descs.emplace_back(cf_name, options); | |
90 | } | |
91 | Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs, | |
92 | &handles_secondary_, &db_secondary_); | |
93 | ASSERT_OK(s); | |
94 | } | |
95 | ||
96 | void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir, | |
97 | int expected_log, int expected_sst, | |
98 | int expected_manifest) const { | |
99 | std::vector<std::string> filenames; | |
20effc67 | 100 | ASSERT_OK(env_->GetChildren(dir, &filenames)); |
494da23a TL |
101 | |
102 | int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0; | |
103 | for (auto file : filenames) { | |
104 | uint64_t number; | |
105 | FileType type; | |
106 | if (ParseFileName(file, &number, &type)) { | |
20effc67 | 107 | log_cnt += (type == kWalFile); |
494da23a TL |
108 | sst_cnt += (type == kTableFile); |
109 | manifest_cnt += (type == kDescriptorFile); | |
110 | } | |
111 | } | |
112 | ASSERT_EQ(expected_log, log_cnt); | |
113 | ASSERT_EQ(expected_sst, sst_cnt); | |
114 | ASSERT_EQ(expected_manifest, manifest_cnt); | |
115 | } | |
116 | ||
117 | TEST_F(DBSecondaryTest, ReopenAsSecondary) { | |
118 | Options options; | |
119 | options.env = env_; | |
120 | Reopen(options); | |
121 | ASSERT_OK(Put("foo", "foo_value")); | |
122 | ASSERT_OK(Put("bar", "bar_value")); | |
123 | ASSERT_OK(dbfull()->Flush(FlushOptions())); | |
124 | Close(); | |
125 | ||
126 | ASSERT_OK(ReopenAsSecondary(options)); | |
127 | ASSERT_EQ("foo_value", Get("foo")); | |
128 | ASSERT_EQ("bar_value", Get("bar")); | |
129 | ReadOptions ropts; | |
130 | ropts.verify_checksums = true; | |
131 | auto db1 = static_cast<DBImplSecondary*>(db_); | |
132 | ASSERT_NE(nullptr, db1); | |
133 | Iterator* iter = db1->NewIterator(ropts); | |
134 | ASSERT_NE(nullptr, iter); | |
135 | size_t count = 0; | |
136 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
137 | if (0 == count) { | |
138 | ASSERT_EQ("bar", iter->key().ToString()); | |
139 | ASSERT_EQ("bar_value", iter->value().ToString()); | |
140 | } else if (1 == count) { | |
141 | ASSERT_EQ("foo", iter->key().ToString()); | |
142 | ASSERT_EQ("foo_value", iter->value().ToString()); | |
143 | } | |
144 | ++count; | |
145 | } | |
146 | delete iter; | |
147 | ASSERT_EQ(2, count); | |
148 | } | |
149 | ||
150 | TEST_F(DBSecondaryTest, OpenAsSecondary) { | |
151 | Options options; | |
152 | options.env = env_; | |
153 | options.level0_file_num_compaction_trigger = 4; | |
154 | Reopen(options); | |
155 | for (int i = 0; i < 3; ++i) { | |
156 | ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); | |
157 | ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); | |
158 | ASSERT_OK(Flush()); | |
159 | } | |
160 | Options options1; | |
161 | options1.env = env_; | |
162 | options1.max_open_files = -1; | |
163 | OpenSecondary(options1); | |
164 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
165 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
166 | ||
167 | ReadOptions ropts; | |
168 | ropts.verify_checksums = true; | |
169 | const auto verify_db_func = [&](const std::string& foo_val, | |
170 | const std::string& bar_val) { | |
171 | std::string value; | |
172 | ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); | |
173 | ASSERT_EQ(foo_val, value); | |
174 | ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); | |
175 | ASSERT_EQ(bar_val, value); | |
176 | Iterator* iter = db_secondary_->NewIterator(ropts); | |
177 | ASSERT_NE(nullptr, iter); | |
178 | iter->Seek("foo"); | |
179 | ASSERT_TRUE(iter->Valid()); | |
180 | ASSERT_EQ("foo", iter->key().ToString()); | |
181 | ASSERT_EQ(foo_val, iter->value().ToString()); | |
182 | iter->Seek("bar"); | |
183 | ASSERT_TRUE(iter->Valid()); | |
184 | ASSERT_EQ("bar", iter->key().ToString()); | |
185 | ASSERT_EQ(bar_val, iter->value().ToString()); | |
186 | size_t count = 0; | |
187 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
188 | ++count; | |
189 | } | |
190 | ASSERT_EQ(2, count); | |
191 | delete iter; | |
192 | }; | |
193 | ||
194 | verify_db_func("foo_value2", "bar_value2"); | |
195 | ||
196 | ASSERT_OK(Put("foo", "new_foo_value")); | |
197 | ASSERT_OK(Put("bar", "new_bar_value")); | |
198 | ASSERT_OK(Flush()); | |
199 | ||
200 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
201 | verify_db_func("new_foo_value", "new_bar_value"); | |
202 | } | |
203 | ||
f67539c2 TL |
204 | namespace { |
205 | class TraceFileEnv : public EnvWrapper { | |
206 | public: | |
207 | explicit TraceFileEnv(Env* _target) : EnvWrapper(_target) {} | |
208 | Status NewRandomAccessFile(const std::string& f, | |
209 | std::unique_ptr<RandomAccessFile>* r, | |
210 | const EnvOptions& env_options) override { | |
211 | class TracedRandomAccessFile : public RandomAccessFile { | |
212 | public: | |
213 | TracedRandomAccessFile(std::unique_ptr<RandomAccessFile>&& target, | |
214 | std::atomic<int>& counter) | |
215 | : target_(std::move(target)), files_closed_(counter) {} | |
216 | ~TracedRandomAccessFile() override { | |
217 | files_closed_.fetch_add(1, std::memory_order_relaxed); | |
218 | } | |
219 | Status Read(uint64_t offset, size_t n, Slice* result, | |
220 | char* scratch) const override { | |
221 | return target_->Read(offset, n, result, scratch); | |
222 | } | |
223 | ||
224 | private: | |
225 | std::unique_ptr<RandomAccessFile> target_; | |
226 | std::atomic<int>& files_closed_; | |
227 | }; | |
228 | Status s = target()->NewRandomAccessFile(f, r, env_options); | |
229 | if (s.ok()) { | |
230 | r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_)); | |
231 | } | |
232 | return s; | |
233 | } | |
234 | ||
235 | int files_closed() const { | |
236 | return files_closed_.load(std::memory_order_relaxed); | |
237 | } | |
238 | ||
239 | private: | |
240 | std::atomic<int> files_closed_{0}; | |
241 | }; | |
242 | } // namespace | |
243 | ||
244 | TEST_F(DBSecondaryTest, SecondaryCloseFiles) { | |
245 | Options options; | |
246 | options.env = env_; | |
247 | options.max_open_files = 1; | |
248 | options.disable_auto_compactions = true; | |
249 | Reopen(options); | |
250 | Options options1; | |
251 | std::unique_ptr<Env> traced_env(new TraceFileEnv(env_)); | |
252 | options1.env = traced_env.get(); | |
253 | OpenSecondary(options1); | |
254 | ||
255 | static const auto verify_db = [&]() { | |
256 | std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions())); | |
257 | std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(ReadOptions())); | |
258 | for (iter1->SeekToFirst(), iter2->SeekToFirst(); | |
259 | iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) { | |
260 | ASSERT_EQ(iter1->key(), iter2->key()); | |
261 | ASSERT_EQ(iter1->value(), iter2->value()); | |
262 | } | |
263 | ASSERT_FALSE(iter1->Valid()); | |
264 | ASSERT_FALSE(iter2->Valid()); | |
265 | }; | |
266 | ||
267 | ASSERT_OK(Put("a", "value")); | |
268 | ASSERT_OK(Put("c", "value")); | |
269 | ASSERT_OK(Flush()); | |
270 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
271 | verify_db(); | |
272 | ||
273 | ASSERT_OK(Put("b", "value")); | |
274 | ASSERT_OK(Put("d", "value")); | |
275 | ASSERT_OK(Flush()); | |
276 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
277 | verify_db(); | |
278 | ||
279 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
280 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
281 | ASSERT_EQ(2, static_cast<TraceFileEnv*>(traced_env.get())->files_closed()); | |
282 | ||
283 | Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}}); | |
284 | ASSERT_TRUE(s.IsNotSupported()); | |
285 | CloseSecondary(); | |
286 | } | |
287 | ||
288 | TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) { | |
289 | Options options; | |
290 | options.env = env_; | |
291 | options.level0_file_num_compaction_trigger = 4; | |
292 | Reopen(options); | |
293 | for (int i = 0; i < 3; ++i) { | |
294 | ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); | |
295 | ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); | |
296 | } | |
297 | Options options1; | |
298 | options1.env = env_; | |
299 | options1.max_open_files = -1; | |
300 | OpenSecondary(options1); | |
301 | ||
302 | ReadOptions ropts; | |
303 | ropts.verify_checksums = true; | |
304 | const auto verify_db_func = [&](const std::string& foo_val, | |
305 | const std::string& bar_val) { | |
306 | std::string value; | |
307 | ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); | |
308 | ASSERT_EQ(foo_val, value); | |
309 | ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); | |
310 | ASSERT_EQ(bar_val, value); | |
311 | Iterator* iter = db_secondary_->NewIterator(ropts); | |
312 | ASSERT_NE(nullptr, iter); | |
313 | iter->Seek("foo"); | |
314 | ASSERT_TRUE(iter->Valid()); | |
315 | ASSERT_EQ("foo", iter->key().ToString()); | |
316 | ASSERT_EQ(foo_val, iter->value().ToString()); | |
317 | iter->Seek("bar"); | |
318 | ASSERT_TRUE(iter->Valid()); | |
319 | ASSERT_EQ("bar", iter->key().ToString()); | |
320 | ASSERT_EQ(bar_val, iter->value().ToString()); | |
321 | size_t count = 0; | |
322 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
323 | ++count; | |
324 | } | |
325 | ASSERT_EQ(2, count); | |
326 | delete iter; | |
327 | }; | |
328 | ||
329 | verify_db_func("foo_value2", "bar_value2"); | |
330 | ||
331 | ASSERT_OK(Put("foo", "new_foo_value")); | |
332 | ASSERT_OK(Put("bar", "new_bar_value")); | |
333 | ||
334 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
335 | verify_db_func("new_foo_value", "new_bar_value"); | |
336 | ||
337 | ASSERT_OK(Flush()); | |
338 | ASSERT_OK(Put("foo", "new_foo_value_1")); | |
339 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
340 | verify_db_func("new_foo_value_1", "new_bar_value"); | |
341 | } | |
342 | ||
494da23a TL |
343 | TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) { |
344 | Options options; | |
345 | options.env = env_; | |
346 | CreateAndReopenWithCF({"pikachu"}, options); | |
347 | ||
348 | Options options1; | |
349 | options1.env = env_; | |
350 | options1.max_open_files = -1; | |
351 | std::vector<ColumnFamilyDescriptor> cf_descs; | |
352 | cf_descs.emplace_back(kDefaultColumnFamilyName, options1); | |
353 | cf_descs.emplace_back("pikachu", options1); | |
354 | cf_descs.emplace_back("eevee", options1); | |
355 | Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs, | |
356 | &handles_secondary_, &db_secondary_); | |
357 | ASSERT_NOK(s); | |
358 | } | |
359 | ||
360 | TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) { | |
361 | Options options; | |
362 | options.env = env_; | |
363 | CreateAndReopenWithCF({"pikachu"}, options); | |
364 | Options options1; | |
365 | options1.env = env_; | |
366 | options1.max_open_files = -1; | |
367 | OpenSecondary(options1); | |
368 | ASSERT_EQ(0, handles_secondary_.size()); | |
369 | ASSERT_NE(nullptr, db_secondary_); | |
370 | ||
371 | ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value")); | |
372 | ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value")); | |
373 | ASSERT_OK(Flush(0 /*cf*/)); | |
374 | ASSERT_OK(Flush(1 /*cf*/)); | |
375 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
376 | ReadOptions ropts; | |
377 | ropts.verify_checksums = true; | |
378 | std::string value; | |
379 | ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); | |
380 | ASSERT_EQ("foo_value", value); | |
381 | } | |
382 | ||
383 | TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) { | |
384 | Options options; | |
385 | options.env = env_; | |
386 | Reopen(options); | |
387 | Close(); | |
388 | ||
389 | SyncPoint::GetInstance()->DisableProcessing(); | |
390 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
391 | SyncPoint::GetInstance()->LoadDependency( | |
392 | {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0", | |
393 | "VersionSet::ProcessManifestWrites:BeforeNewManifest"}, | |
394 | {"VersionSet::ProcessManifestWrites:AfterNewManifest", | |
395 | "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:" | |
396 | "1"}}); | |
397 | SyncPoint::GetInstance()->EnableProcessing(); | |
398 | ||
399 | // Make sure db calls RecoverLogFiles so as to trigger a manifest write, | |
400 | // which causes the db to switch to a new MANIFEST upon start. | |
401 | port::Thread ro_db_thread([&]() { | |
402 | Options options1; | |
403 | options1.env = env_; | |
404 | options1.max_open_files = -1; | |
405 | OpenSecondary(options1); | |
406 | CloseSecondary(); | |
407 | }); | |
408 | Reopen(options); | |
409 | ro_db_thread.join(); | |
410 | } | |
411 | ||
412 | TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) { | |
413 | Options options; | |
414 | options.env = env_; | |
415 | options.level0_file_num_compaction_trigger = 4; | |
416 | Reopen(options); | |
417 | for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { | |
418 | ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); | |
419 | ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); | |
420 | ASSERT_OK(dbfull()->Flush(FlushOptions())); | |
421 | } | |
422 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); | |
423 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
424 | Options options1; | |
425 | options1.env = env_; | |
426 | options1.max_open_files = -1; | |
427 | OpenSecondary(options1); | |
428 | ReadOptions ropts; | |
429 | ropts.verify_checksums = true; | |
430 | std::string value; | |
431 | ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); | |
432 | ASSERT_EQ("foo_value" + | |
433 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
434 | value); | |
435 | ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); | |
436 | ASSERT_EQ("bar_value" + | |
437 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
438 | value); | |
439 | Iterator* iter = db_secondary_->NewIterator(ropts); | |
440 | ASSERT_NE(nullptr, iter); | |
441 | iter->Seek("bar"); | |
442 | ASSERT_TRUE(iter->Valid()); | |
443 | ASSERT_EQ("bar", iter->key().ToString()); | |
444 | ASSERT_EQ("bar_value" + | |
445 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
446 | iter->value().ToString()); | |
447 | iter->Seek("foo"); | |
448 | ASSERT_TRUE(iter->Valid()); | |
449 | ASSERT_EQ("foo", iter->key().ToString()); | |
450 | ASSERT_EQ("foo_value" + | |
451 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
452 | iter->value().ToString()); | |
453 | size_t count = 0; | |
454 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
455 | ++count; | |
456 | } | |
457 | ASSERT_EQ(2, count); | |
458 | delete iter; | |
459 | } | |
460 | ||
461 | TEST_F(DBSecondaryTest, MissingTableFile) { | |
462 | int table_files_not_exist = 0; | |
463 | SyncPoint::GetInstance()->DisableProcessing(); | |
464 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
465 | SyncPoint::GetInstance()->SetCallBack( | |
f67539c2 | 466 | "ReactiveVersionSet::ApplyOneVersionEditToBuilder:AfterLoadTableHandlers", |
494da23a TL |
467 | [&](void* arg) { |
468 | Status s = *reinterpret_cast<Status*>(arg); | |
469 | if (s.IsPathNotFound()) { | |
470 | ++table_files_not_exist; | |
471 | } else if (!s.ok()) { | |
472 | assert(false); // Should not reach here | |
473 | } | |
474 | }); | |
475 | SyncPoint::GetInstance()->EnableProcessing(); | |
476 | Options options; | |
477 | options.env = env_; | |
478 | options.level0_file_num_compaction_trigger = 4; | |
479 | Reopen(options); | |
480 | ||
481 | Options options1; | |
482 | options1.env = env_; | |
483 | options1.max_open_files = -1; | |
484 | OpenSecondary(options1); | |
485 | ||
486 | for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { | |
487 | ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); | |
488 | ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); | |
489 | ASSERT_OK(dbfull()->Flush(FlushOptions())); | |
490 | } | |
491 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); | |
492 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
493 | ||
494 | ASSERT_NE(nullptr, db_secondary_full()); | |
495 | ReadOptions ropts; | |
496 | ropts.verify_checksums = true; | |
497 | std::string value; | |
498 | ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value)); | |
499 | ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value)); | |
500 | ||
501 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
502 | ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist); | |
503 | ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); | |
504 | ASSERT_EQ("foo_value" + | |
505 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
506 | value); | |
507 | ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); | |
508 | ASSERT_EQ("bar_value" + | |
509 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
510 | value); | |
511 | Iterator* iter = db_secondary_->NewIterator(ropts); | |
512 | ASSERT_NE(nullptr, iter); | |
513 | iter->Seek("bar"); | |
514 | ASSERT_TRUE(iter->Valid()); | |
515 | ASSERT_EQ("bar", iter->key().ToString()); | |
516 | ASSERT_EQ("bar_value" + | |
517 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
518 | iter->value().ToString()); | |
519 | iter->Seek("foo"); | |
520 | ASSERT_TRUE(iter->Valid()); | |
521 | ASSERT_EQ("foo", iter->key().ToString()); | |
522 | ASSERT_EQ("foo_value" + | |
523 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
524 | iter->value().ToString()); | |
525 | size_t count = 0; | |
526 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
527 | ++count; | |
528 | } | |
529 | ASSERT_EQ(2, count); | |
530 | delete iter; | |
531 | } | |
532 | ||
533 | TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) { | |
534 | Options options; | |
535 | options.env = env_; | |
536 | const std::string kCfName1 = "pikachu"; | |
537 | CreateAndReopenWithCF({kCfName1}, options); | |
538 | ||
539 | Options options1; | |
540 | options1.env = env_; | |
541 | options1.max_open_files = -1; | |
542 | OpenSecondaryWithColumnFamilies({kCfName1}, options1); | |
543 | ASSERT_EQ(2, handles_secondary_.size()); | |
544 | ||
545 | ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1")); | |
546 | ASSERT_OK(Flush(1 /*cf*/)); | |
547 | ||
548 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
549 | ReadOptions ropts; | |
550 | ropts.verify_checksums = true; | |
551 | std::string value; | |
552 | ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value)); | |
553 | ASSERT_EQ("foo_val_1", value); | |
554 | ||
555 | ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); | |
556 | Close(); | |
557 | CheckFileTypeCounts(dbname_, 1, 0, 1); | |
558 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
559 | value.clear(); | |
560 | ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value)); | |
561 | ASSERT_EQ("foo_val_1", value); | |
562 | } | |
563 | ||
564 | TEST_F(DBSecondaryTest, SwitchManifest) { | |
565 | Options options; | |
566 | options.env = env_; | |
567 | options.level0_file_num_compaction_trigger = 4; | |
568 | Reopen(options); | |
569 | ||
570 | Options options1; | |
571 | options1.env = env_; | |
572 | options1.max_open_files = -1; | |
573 | OpenSecondary(options1); | |
574 | ||
575 | const int kNumFiles = options.level0_file_num_compaction_trigger - 1; | |
576 | // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1, | |
577 | // ..., 9. | |
578 | const int kNumKeys = 10; | |
579 | // Create two sst | |
580 | for (int i = 0; i != kNumFiles; ++i) { | |
581 | for (int j = 0; j != kNumKeys; ++j) { | |
582 | ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i))); | |
583 | } | |
584 | ASSERT_OK(Flush()); | |
585 | } | |
586 | ||
587 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
588 | const auto& range_scan_db = [&]() { | |
589 | ReadOptions tmp_ropts; | |
590 | tmp_ropts.total_order_seek = true; | |
591 | tmp_ropts.verify_checksums = true; | |
592 | std::unique_ptr<Iterator> iter(db_secondary_->NewIterator(tmp_ropts)); | |
593 | int cnt = 0; | |
594 | for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) { | |
595 | ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString()); | |
596 | ASSERT_EQ("value_" + std::to_string(kNumFiles - 1), | |
597 | iter->value().ToString()); | |
598 | } | |
599 | }; | |
600 | ||
601 | range_scan_db(); | |
602 | ||
603 | // While secondary instance still keeps old MANIFEST open, we close primary, | |
604 | // restart primary, performs full compaction, close again, restart again so | |
605 | // that next time secondary tries to catch up with primary, the secondary | |
606 | // will skip the MANIFEST in middle. | |
607 | Reopen(options); | |
608 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
609 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
610 | ||
611 | Reopen(options); | |
612 | ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); | |
613 | ||
614 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
615 | range_scan_db(); | |
616 | } | |
f67539c2 TL |
617 | |
618 | // Here, "Snapshot" refers to the version edits written by | |
619 | // VersionSet::WriteSnapshot() at the beginning of the new MANIFEST after | |
620 | // switching from the old one. | |
621 | TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) { | |
622 | Options options; | |
623 | options.env = env_; | |
624 | options.disable_auto_compactions = true; | |
625 | Reopen(options); | |
626 | ||
627 | Options options1; | |
628 | options1.env = env_; | |
629 | options1.max_open_files = -1; | |
630 | OpenSecondary(options1); | |
631 | ||
632 | ASSERT_OK(Put("0", "value0")); | |
633 | ASSERT_OK(Flush()); | |
634 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
635 | std::string value; | |
636 | ReadOptions ropts; | |
637 | ropts.verify_checksums = true; | |
638 | ASSERT_OK(db_secondary_->Get(ropts, "0", &value)); | |
639 | ASSERT_EQ("value0", value); | |
640 | ||
641 | Reopen(options); | |
642 | ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); | |
643 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
644 | } | |
645 | ||
646 | TEST_F(DBSecondaryTest, SwitchWAL) { | |
647 | const int kNumKeysPerMemtable = 1; | |
648 | Options options; | |
649 | options.env = env_; | |
650 | options.max_write_buffer_number = 4; | |
651 | options.min_write_buffer_number_to_merge = 2; | |
652 | options.memtable_factory.reset( | |
653 | new SpecialSkipListFactory(kNumKeysPerMemtable)); | |
654 | Reopen(options); | |
655 | ||
656 | Options options1; | |
657 | options1.env = env_; | |
658 | options1.max_open_files = -1; | |
659 | OpenSecondary(options1); | |
660 | ||
661 | const auto& verify_db = [](DB* db1, DB* db2) { | |
662 | ASSERT_NE(nullptr, db1); | |
663 | ASSERT_NE(nullptr, db2); | |
664 | ReadOptions read_opts; | |
665 | read_opts.verify_checksums = true; | |
666 | std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts)); | |
667 | std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts)); | |
668 | it1->SeekToFirst(); | |
669 | it2->SeekToFirst(); | |
670 | for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) { | |
671 | ASSERT_EQ(it1->key(), it2->key()); | |
672 | ASSERT_EQ(it1->value(), it2->value()); | |
673 | } | |
674 | ASSERT_FALSE(it1->Valid()); | |
675 | ASSERT_FALSE(it2->Valid()); | |
676 | ||
677 | for (it1->SeekToFirst(); it1->Valid(); it1->Next()) { | |
678 | std::string value; | |
679 | ASSERT_OK(db2->Get(read_opts, it1->key(), &value)); | |
680 | ASSERT_EQ(it1->value(), value); | |
681 | } | |
682 | for (it2->SeekToFirst(); it2->Valid(); it2->Next()) { | |
683 | std::string value; | |
684 | ASSERT_OK(db1->Get(read_opts, it2->key(), &value)); | |
685 | ASSERT_EQ(it2->value(), value); | |
686 | } | |
687 | }; | |
688 | for (int k = 0; k != 16; ++k) { | |
689 | ASSERT_OK(Put("key" + std::to_string(k), "value" + std::to_string(k))); | |
690 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
691 | verify_db(dbfull(), db_secondary_); | |
692 | } | |
693 | } | |
694 | ||
695 | TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { | |
696 | const int kNumKeysPerMemtable = 1; | |
697 | SyncPoint::GetInstance()->DisableProcessing(); | |
698 | SyncPoint::GetInstance()->LoadDependency( | |
699 | {{"DBImpl::BackgroundCallFlush:ContextCleanedUp", | |
700 | "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"}}); | |
701 | SyncPoint::GetInstance()->EnableProcessing(); | |
702 | const std::string kCFName1 = "pikachu"; | |
703 | Options options; | |
704 | options.env = env_; | |
705 | options.max_write_buffer_number = 4; | |
706 | options.min_write_buffer_number_to_merge = 2; | |
707 | options.memtable_factory.reset( | |
708 | new SpecialSkipListFactory(kNumKeysPerMemtable)); | |
709 | CreateAndReopenWithCF({kCFName1}, options); | |
710 | ||
711 | Options options1; | |
712 | options1.env = env_; | |
713 | options1.max_open_files = -1; | |
714 | OpenSecondaryWithColumnFamilies({kCFName1}, options1); | |
715 | ASSERT_EQ(2, handles_secondary_.size()); | |
716 | ||
717 | const auto& verify_db = [](DB* db1, | |
718 | const std::vector<ColumnFamilyHandle*>& handles1, | |
719 | DB* db2, | |
720 | const std::vector<ColumnFamilyHandle*>& handles2) { | |
721 | ASSERT_NE(nullptr, db1); | |
722 | ASSERT_NE(nullptr, db2); | |
723 | ReadOptions read_opts; | |
724 | read_opts.verify_checksums = true; | |
725 | ASSERT_EQ(handles1.size(), handles2.size()); | |
726 | for (size_t i = 0; i != handles1.size(); ++i) { | |
727 | std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts, handles1[i])); | |
728 | std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts, handles2[i])); | |
729 | it1->SeekToFirst(); | |
730 | it2->SeekToFirst(); | |
731 | for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) { | |
732 | ASSERT_EQ(it1->key(), it2->key()); | |
733 | ASSERT_EQ(it1->value(), it2->value()); | |
734 | } | |
735 | ASSERT_FALSE(it1->Valid()); | |
736 | ASSERT_FALSE(it2->Valid()); | |
737 | ||
738 | for (it1->SeekToFirst(); it1->Valid(); it1->Next()) { | |
739 | std::string value; | |
740 | ASSERT_OK(db2->Get(read_opts, handles2[i], it1->key(), &value)); | |
741 | ASSERT_EQ(it1->value(), value); | |
742 | } | |
743 | for (it2->SeekToFirst(); it2->Valid(); it2->Next()) { | |
744 | std::string value; | |
745 | ASSERT_OK(db1->Get(read_opts, handles1[i], it2->key(), &value)); | |
746 | ASSERT_EQ(it2->value(), value); | |
747 | } | |
748 | } | |
749 | }; | |
750 | for (int k = 0; k != 8; ++k) { | |
751 | ASSERT_OK( | |
752 | Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); | |
753 | ASSERT_OK( | |
754 | Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); | |
755 | TEST_SYNC_POINT( | |
756 | "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"); | |
757 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
758 | verify_db(dbfull(), handles_, db_secondary_, handles_secondary_); | |
759 | SyncPoint::GetInstance()->ClearTrace(); | |
760 | } | |
761 | } | |
762 | ||
763 | TEST_F(DBSecondaryTest, CatchUpAfterFlush) { | |
764 | const int kNumKeysPerMemtable = 16; | |
765 | Options options; | |
766 | options.env = env_; | |
767 | options.max_write_buffer_number = 4; | |
768 | options.min_write_buffer_number_to_merge = 2; | |
769 | options.memtable_factory.reset( | |
770 | new SpecialSkipListFactory(kNumKeysPerMemtable)); | |
771 | Reopen(options); | |
772 | ||
773 | Options options1; | |
774 | options1.env = env_; | |
775 | options1.max_open_files = -1; | |
776 | OpenSecondary(options1); | |
777 | ||
778 | WriteOptions write_opts; | |
779 | WriteBatch wb; | |
20effc67 TL |
780 | ASSERT_OK(wb.Put("key0", "value0")); |
781 | ASSERT_OK(wb.Put("key1", "value1")); | |
f67539c2 TL |
782 | ASSERT_OK(dbfull()->Write(write_opts, &wb)); |
783 | ReadOptions read_opts; | |
784 | std::unique_ptr<Iterator> iter1(db_secondary_->NewIterator(read_opts)); | |
785 | iter1->Seek("key0"); | |
786 | ASSERT_FALSE(iter1->Valid()); | |
787 | iter1->Seek("key1"); | |
788 | ASSERT_FALSE(iter1->Valid()); | |
789 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
790 | iter1->Seek("key0"); | |
791 | ASSERT_FALSE(iter1->Valid()); | |
792 | iter1->Seek("key1"); | |
793 | ASSERT_FALSE(iter1->Valid()); | |
20effc67 | 794 | ASSERT_OK(iter1->status()); |
f67539c2 TL |
795 | std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(read_opts)); |
796 | iter2->Seek("key0"); | |
797 | ASSERT_TRUE(iter2->Valid()); | |
798 | ASSERT_EQ("value0", iter2->value()); | |
799 | iter2->Seek("key1"); | |
800 | ASSERT_TRUE(iter2->Valid()); | |
20effc67 | 801 | ASSERT_OK(iter2->status()); |
f67539c2 TL |
802 | ASSERT_EQ("value1", iter2->value()); |
803 | ||
804 | { | |
805 | WriteBatch wb1; | |
20effc67 TL |
806 | ASSERT_OK(wb1.Put("key0", "value01")); |
807 | ASSERT_OK(wb1.Put("key1", "value11")); | |
f67539c2 TL |
808 | ASSERT_OK(dbfull()->Write(write_opts, &wb1)); |
809 | } | |
810 | ||
811 | { | |
812 | WriteBatch wb2; | |
20effc67 TL |
813 | ASSERT_OK(wb2.Put("key0", "new_value0")); |
814 | ASSERT_OK(wb2.Delete("key1")); | |
f67539c2 TL |
815 | ASSERT_OK(dbfull()->Write(write_opts, &wb2)); |
816 | } | |
817 | ||
818 | ASSERT_OK(Flush()); | |
819 | ||
820 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
821 | std::unique_ptr<Iterator> iter3(db_secondary_->NewIterator(read_opts)); | |
822 | // iter3 should not see value01 and value11 at all. | |
823 | iter3->Seek("key0"); | |
824 | ASSERT_TRUE(iter3->Valid()); | |
825 | ASSERT_EQ("new_value0", iter3->value()); | |
826 | iter3->Seek("key1"); | |
827 | ASSERT_FALSE(iter3->Valid()); | |
20effc67 | 828 | ASSERT_OK(iter3->status()); |
f67539c2 TL |
829 | } |
830 | ||
831 | TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) { | |
832 | bool called = false; | |
833 | Options options; | |
834 | options.env = env_; | |
835 | options.disable_auto_compactions = true; | |
836 | Reopen(options); | |
837 | SyncPoint::GetInstance()->DisableProcessing(); | |
838 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
839 | SyncPoint::GetInstance()->SetCallBack( | |
840 | "DBImplSecondary::CheckConsistency:AfterFirstAttempt", [&](void* arg) { | |
841 | ASSERT_NE(nullptr, arg); | |
842 | called = true; | |
843 | auto* s = reinterpret_cast<Status*>(arg); | |
844 | ASSERT_NOK(*s); | |
845 | }); | |
846 | SyncPoint::GetInstance()->LoadDependency( | |
847 | {{"DBImpl::CheckConsistency:AfterGetLiveFilesMetaData", | |
848 | "BackgroundCallCompaction:0"}, | |
849 | {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", | |
850 | "DBImpl::CheckConsistency:BeforeGetFileSize"}}); | |
851 | SyncPoint::GetInstance()->EnableProcessing(); | |
852 | ||
853 | ASSERT_OK(Put("a", "value0")); | |
854 | ASSERT_OK(Put("c", "value0")); | |
855 | ASSERT_OK(Flush()); | |
856 | ASSERT_OK(Put("b", "value1")); | |
857 | ASSERT_OK(Put("d", "value1")); | |
858 | ASSERT_OK(Flush()); | |
859 | port::Thread thread([this]() { | |
860 | Options opts; | |
861 | opts.env = env_; | |
862 | opts.max_open_files = -1; | |
863 | OpenSecondary(opts); | |
864 | }); | |
865 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
866 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
867 | thread.join(); | |
868 | ASSERT_TRUE(called); | |
869 | } | |
20effc67 TL |
870 | |
871 | TEST_F(DBSecondaryTest, StartFromInconsistent) { | |
872 | Options options = CurrentOptions(); | |
873 | DestroyAndReopen(options); | |
874 | ASSERT_OK(Put("foo", "value")); | |
875 | ASSERT_OK(Flush()); | |
876 | SyncPoint::GetInstance()->DisableProcessing(); | |
877 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
878 | SyncPoint::GetInstance()->SetCallBack( | |
879 | "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) { | |
880 | ASSERT_NE(nullptr, arg); | |
881 | *(reinterpret_cast<Status*>(arg)) = | |
882 | Status::Corruption("Inject corruption"); | |
883 | }); | |
884 | SyncPoint::GetInstance()->EnableProcessing(); | |
885 | Options options1; | |
886 | options1.env = env_; | |
887 | Status s = TryOpenSecondary(options1); | |
888 | ASSERT_TRUE(s.IsCorruption()); | |
889 | } | |
890 | ||
891 | TEST_F(DBSecondaryTest, InconsistencyDuringCatchUp) { | |
892 | Options options = CurrentOptions(); | |
893 | DestroyAndReopen(options); | |
894 | ASSERT_OK(Put("foo", "value")); | |
895 | ASSERT_OK(Flush()); | |
896 | ||
897 | Options options1; | |
898 | options1.env = env_; | |
899 | OpenSecondary(options1); | |
900 | ||
901 | { | |
902 | std::string value; | |
903 | ASSERT_OK(db_secondary_->Get(ReadOptions(), "foo", &value)); | |
904 | ASSERT_EQ("value", value); | |
905 | } | |
906 | ||
907 | ASSERT_OK(Put("bar", "value1")); | |
908 | ASSERT_OK(Flush()); | |
909 | ||
910 | SyncPoint::GetInstance()->DisableProcessing(); | |
911 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
912 | SyncPoint::GetInstance()->SetCallBack( | |
913 | "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) { | |
914 | ASSERT_NE(nullptr, arg); | |
915 | *(reinterpret_cast<Status*>(arg)) = | |
916 | Status::Corruption("Inject corruption"); | |
917 | }); | |
918 | SyncPoint::GetInstance()->EnableProcessing(); | |
919 | Status s = db_secondary_->TryCatchUpWithPrimary(); | |
920 | ASSERT_TRUE(s.IsCorruption()); | |
921 | } | |
494da23a TL |
922 | #endif //! ROCKSDB_LITE |
923 | ||
f67539c2 | 924 | } // namespace ROCKSDB_NAMESPACE |
494da23a TL |
925 | |
926 | int main(int argc, char** argv) { | |
f67539c2 | 927 | ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); |
494da23a TL |
928 | ::testing::InitGoogleTest(&argc, argv); |
929 | return RUN_ALL_TESTS(); | |
930 | } |