]>
Commit | Line | Data |
---|---|---|
494da23a TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/db_impl_secondary.h" | |
11 | #include "db/db_test_util.h" | |
12 | #include "port/stack_trace.h" | |
13 | #include "util/fault_injection_test_env.h" | |
14 | #include "util/sync_point.h" | |
15 | ||
16 | namespace rocksdb { | |
17 | ||
18 | #ifndef ROCKSDB_LITE | |
19 | class DBSecondaryTest : public DBTestBase { | |
20 | public: | |
21 | DBSecondaryTest() | |
22 | : DBTestBase("/db_secondary_test"), | |
23 | secondary_path_(), | |
24 | handles_secondary_(), | |
25 | db_secondary_(nullptr) { | |
26 | secondary_path_ = | |
27 | test::PerThreadDBPath(env_, "/db_secondary_test_secondary"); | |
28 | } | |
29 | ||
30 | ~DBSecondaryTest() override { | |
31 | CloseSecondary(); | |
32 | if (getenv("KEEP_DB") != nullptr) { | |
33 | fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str()); | |
34 | } else { | |
35 | Options options; | |
36 | options.env = env_; | |
37 | EXPECT_OK(DestroyDB(secondary_path_, options)); | |
38 | } | |
39 | } | |
40 | ||
41 | protected: | |
42 | Status ReopenAsSecondary(const Options& options) { | |
43 | return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_); | |
44 | } | |
45 | ||
46 | void OpenSecondary(const Options& options); | |
47 | ||
48 | void OpenSecondaryWithColumnFamilies( | |
49 | const std::vector<std::string>& column_families, const Options& options); | |
50 | ||
51 | void CloseSecondary() { | |
52 | for (auto h : handles_secondary_) { | |
53 | db_secondary_->DestroyColumnFamilyHandle(h); | |
54 | } | |
55 | handles_secondary_.clear(); | |
56 | delete db_secondary_; | |
57 | db_secondary_ = nullptr; | |
58 | } | |
59 | ||
60 | DBImplSecondary* db_secondary_full() { | |
61 | return static_cast<DBImplSecondary*>(db_secondary_); | |
62 | } | |
63 | ||
64 | void CheckFileTypeCounts(const std::string& dir, int expected_log, | |
65 | int expected_sst, int expected_manifest) const; | |
66 | ||
67 | std::string secondary_path_; | |
68 | std::vector<ColumnFamilyHandle*> handles_secondary_; | |
69 | DB* db_secondary_; | |
70 | }; | |
71 | ||
72 | void DBSecondaryTest::OpenSecondary(const Options& options) { | |
73 | Status s = | |
74 | DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_); | |
75 | ASSERT_OK(s); | |
76 | } | |
77 | ||
78 | void DBSecondaryTest::OpenSecondaryWithColumnFamilies( | |
79 | const std::vector<std::string>& column_families, const Options& options) { | |
80 | std::vector<ColumnFamilyDescriptor> cf_descs; | |
81 | cf_descs.emplace_back(kDefaultColumnFamilyName, options); | |
82 | for (const auto& cf_name : column_families) { | |
83 | cf_descs.emplace_back(cf_name, options); | |
84 | } | |
85 | Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs, | |
86 | &handles_secondary_, &db_secondary_); | |
87 | ASSERT_OK(s); | |
88 | } | |
89 | ||
90 | void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir, | |
91 | int expected_log, int expected_sst, | |
92 | int expected_manifest) const { | |
93 | std::vector<std::string> filenames; | |
94 | env_->GetChildren(dir, &filenames); | |
95 | ||
96 | int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0; | |
97 | for (auto file : filenames) { | |
98 | uint64_t number; | |
99 | FileType type; | |
100 | if (ParseFileName(file, &number, &type)) { | |
101 | log_cnt += (type == kLogFile); | |
102 | sst_cnt += (type == kTableFile); | |
103 | manifest_cnt += (type == kDescriptorFile); | |
104 | } | |
105 | } | |
106 | ASSERT_EQ(expected_log, log_cnt); | |
107 | ASSERT_EQ(expected_sst, sst_cnt); | |
108 | ASSERT_EQ(expected_manifest, manifest_cnt); | |
109 | } | |
110 | ||
111 | TEST_F(DBSecondaryTest, ReopenAsSecondary) { | |
112 | Options options; | |
113 | options.env = env_; | |
114 | Reopen(options); | |
115 | ASSERT_OK(Put("foo", "foo_value")); | |
116 | ASSERT_OK(Put("bar", "bar_value")); | |
117 | ASSERT_OK(dbfull()->Flush(FlushOptions())); | |
118 | Close(); | |
119 | ||
120 | ASSERT_OK(ReopenAsSecondary(options)); | |
121 | ASSERT_EQ("foo_value", Get("foo")); | |
122 | ASSERT_EQ("bar_value", Get("bar")); | |
123 | ReadOptions ropts; | |
124 | ropts.verify_checksums = true; | |
125 | auto db1 = static_cast<DBImplSecondary*>(db_); | |
126 | ASSERT_NE(nullptr, db1); | |
127 | Iterator* iter = db1->NewIterator(ropts); | |
128 | ASSERT_NE(nullptr, iter); | |
129 | size_t count = 0; | |
130 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
131 | if (0 == count) { | |
132 | ASSERT_EQ("bar", iter->key().ToString()); | |
133 | ASSERT_EQ("bar_value", iter->value().ToString()); | |
134 | } else if (1 == count) { | |
135 | ASSERT_EQ("foo", iter->key().ToString()); | |
136 | ASSERT_EQ("foo_value", iter->value().ToString()); | |
137 | } | |
138 | ++count; | |
139 | } | |
140 | delete iter; | |
141 | ASSERT_EQ(2, count); | |
142 | } | |
143 | ||
144 | TEST_F(DBSecondaryTest, OpenAsSecondary) { | |
145 | Options options; | |
146 | options.env = env_; | |
147 | options.level0_file_num_compaction_trigger = 4; | |
148 | Reopen(options); | |
149 | for (int i = 0; i < 3; ++i) { | |
150 | ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); | |
151 | ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); | |
152 | ASSERT_OK(Flush()); | |
153 | } | |
154 | Options options1; | |
155 | options1.env = env_; | |
156 | options1.max_open_files = -1; | |
157 | OpenSecondary(options1); | |
158 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
159 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
160 | ||
161 | ReadOptions ropts; | |
162 | ropts.verify_checksums = true; | |
163 | const auto verify_db_func = [&](const std::string& foo_val, | |
164 | const std::string& bar_val) { | |
165 | std::string value; | |
166 | ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); | |
167 | ASSERT_EQ(foo_val, value); | |
168 | ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); | |
169 | ASSERT_EQ(bar_val, value); | |
170 | Iterator* iter = db_secondary_->NewIterator(ropts); | |
171 | ASSERT_NE(nullptr, iter); | |
172 | iter->Seek("foo"); | |
173 | ASSERT_TRUE(iter->Valid()); | |
174 | ASSERT_EQ("foo", iter->key().ToString()); | |
175 | ASSERT_EQ(foo_val, iter->value().ToString()); | |
176 | iter->Seek("bar"); | |
177 | ASSERT_TRUE(iter->Valid()); | |
178 | ASSERT_EQ("bar", iter->key().ToString()); | |
179 | ASSERT_EQ(bar_val, iter->value().ToString()); | |
180 | size_t count = 0; | |
181 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
182 | ++count; | |
183 | } | |
184 | ASSERT_EQ(2, count); | |
185 | delete iter; | |
186 | }; | |
187 | ||
188 | verify_db_func("foo_value2", "bar_value2"); | |
189 | ||
190 | ASSERT_OK(Put("foo", "new_foo_value")); | |
191 | ASSERT_OK(Put("bar", "new_bar_value")); | |
192 | ASSERT_OK(Flush()); | |
193 | ||
194 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
195 | verify_db_func("new_foo_value", "new_bar_value"); | |
196 | } | |
197 | ||
198 | TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) { | |
199 | Options options; | |
200 | options.env = env_; | |
201 | CreateAndReopenWithCF({"pikachu"}, options); | |
202 | ||
203 | Options options1; | |
204 | options1.env = env_; | |
205 | options1.max_open_files = -1; | |
206 | std::vector<ColumnFamilyDescriptor> cf_descs; | |
207 | cf_descs.emplace_back(kDefaultColumnFamilyName, options1); | |
208 | cf_descs.emplace_back("pikachu", options1); | |
209 | cf_descs.emplace_back("eevee", options1); | |
210 | Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs, | |
211 | &handles_secondary_, &db_secondary_); | |
212 | ASSERT_NOK(s); | |
213 | } | |
214 | ||
215 | TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) { | |
216 | Options options; | |
217 | options.env = env_; | |
218 | CreateAndReopenWithCF({"pikachu"}, options); | |
219 | Options options1; | |
220 | options1.env = env_; | |
221 | options1.max_open_files = -1; | |
222 | OpenSecondary(options1); | |
223 | ASSERT_EQ(0, handles_secondary_.size()); | |
224 | ASSERT_NE(nullptr, db_secondary_); | |
225 | ||
226 | ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value")); | |
227 | ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value")); | |
228 | ASSERT_OK(Flush(0 /*cf*/)); | |
229 | ASSERT_OK(Flush(1 /*cf*/)); | |
230 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
231 | ReadOptions ropts; | |
232 | ropts.verify_checksums = true; | |
233 | std::string value; | |
234 | ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); | |
235 | ASSERT_EQ("foo_value", value); | |
236 | } | |
237 | ||
238 | TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) { | |
239 | Options options; | |
240 | options.env = env_; | |
241 | Reopen(options); | |
242 | Close(); | |
243 | ||
244 | SyncPoint::GetInstance()->DisableProcessing(); | |
245 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
246 | SyncPoint::GetInstance()->LoadDependency( | |
247 | {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0", | |
248 | "VersionSet::ProcessManifestWrites:BeforeNewManifest"}, | |
249 | {"VersionSet::ProcessManifestWrites:AfterNewManifest", | |
250 | "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:" | |
251 | "1"}}); | |
252 | SyncPoint::GetInstance()->EnableProcessing(); | |
253 | ||
254 | // Make sure db calls RecoverLogFiles so as to trigger a manifest write, | |
255 | // which causes the db to switch to a new MANIFEST upon start. | |
256 | port::Thread ro_db_thread([&]() { | |
257 | Options options1; | |
258 | options1.env = env_; | |
259 | options1.max_open_files = -1; | |
260 | OpenSecondary(options1); | |
261 | CloseSecondary(); | |
262 | }); | |
263 | Reopen(options); | |
264 | ro_db_thread.join(); | |
265 | } | |
266 | ||
267 | TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) { | |
268 | Options options; | |
269 | options.env = env_; | |
270 | options.level0_file_num_compaction_trigger = 4; | |
271 | Reopen(options); | |
272 | for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { | |
273 | ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); | |
274 | ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); | |
275 | ASSERT_OK(dbfull()->Flush(FlushOptions())); | |
276 | } | |
277 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); | |
278 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
279 | Options options1; | |
280 | options1.env = env_; | |
281 | options1.max_open_files = -1; | |
282 | OpenSecondary(options1); | |
283 | ReadOptions ropts; | |
284 | ropts.verify_checksums = true; | |
285 | std::string value; | |
286 | ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); | |
287 | ASSERT_EQ("foo_value" + | |
288 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
289 | value); | |
290 | ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); | |
291 | ASSERT_EQ("bar_value" + | |
292 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
293 | value); | |
294 | Iterator* iter = db_secondary_->NewIterator(ropts); | |
295 | ASSERT_NE(nullptr, iter); | |
296 | iter->Seek("bar"); | |
297 | ASSERT_TRUE(iter->Valid()); | |
298 | ASSERT_EQ("bar", iter->key().ToString()); | |
299 | ASSERT_EQ("bar_value" + | |
300 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
301 | iter->value().ToString()); | |
302 | iter->Seek("foo"); | |
303 | ASSERT_TRUE(iter->Valid()); | |
304 | ASSERT_EQ("foo", iter->key().ToString()); | |
305 | ASSERT_EQ("foo_value" + | |
306 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
307 | iter->value().ToString()); | |
308 | size_t count = 0; | |
309 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
310 | ++count; | |
311 | } | |
312 | ASSERT_EQ(2, count); | |
313 | delete iter; | |
314 | } | |
315 | ||
316 | TEST_F(DBSecondaryTest, MissingTableFile) { | |
317 | int table_files_not_exist = 0; | |
318 | SyncPoint::GetInstance()->DisableProcessing(); | |
319 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
320 | SyncPoint::GetInstance()->SetCallBack( | |
321 | "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", | |
322 | [&](void* arg) { | |
323 | Status s = *reinterpret_cast<Status*>(arg); | |
324 | if (s.IsPathNotFound()) { | |
325 | ++table_files_not_exist; | |
326 | } else if (!s.ok()) { | |
327 | assert(false); // Should not reach here | |
328 | } | |
329 | }); | |
330 | SyncPoint::GetInstance()->EnableProcessing(); | |
331 | Options options; | |
332 | options.env = env_; | |
333 | options.level0_file_num_compaction_trigger = 4; | |
334 | Reopen(options); | |
335 | ||
336 | Options options1; | |
337 | options1.env = env_; | |
338 | options1.max_open_files = -1; | |
339 | OpenSecondary(options1); | |
340 | ||
341 | for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { | |
342 | ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); | |
343 | ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); | |
344 | ASSERT_OK(dbfull()->Flush(FlushOptions())); | |
345 | } | |
346 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); | |
347 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
348 | ||
349 | ASSERT_NE(nullptr, db_secondary_full()); | |
350 | ReadOptions ropts; | |
351 | ropts.verify_checksums = true; | |
352 | std::string value; | |
353 | ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value)); | |
354 | ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value)); | |
355 | ||
356 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
357 | ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist); | |
358 | ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); | |
359 | ASSERT_EQ("foo_value" + | |
360 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
361 | value); | |
362 | ASSERT_OK(db_secondary_->Get(ropts, "bar", &value)); | |
363 | ASSERT_EQ("bar_value" + | |
364 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
365 | value); | |
366 | Iterator* iter = db_secondary_->NewIterator(ropts); | |
367 | ASSERT_NE(nullptr, iter); | |
368 | iter->Seek("bar"); | |
369 | ASSERT_TRUE(iter->Valid()); | |
370 | ASSERT_EQ("bar", iter->key().ToString()); | |
371 | ASSERT_EQ("bar_value" + | |
372 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
373 | iter->value().ToString()); | |
374 | iter->Seek("foo"); | |
375 | ASSERT_TRUE(iter->Valid()); | |
376 | ASSERT_EQ("foo", iter->key().ToString()); | |
377 | ASSERT_EQ("foo_value" + | |
378 | std::to_string(options.level0_file_num_compaction_trigger - 1), | |
379 | iter->value().ToString()); | |
380 | size_t count = 0; | |
381 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
382 | ++count; | |
383 | } | |
384 | ASSERT_EQ(2, count); | |
385 | delete iter; | |
386 | } | |
387 | ||
388 | TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) { | |
389 | Options options; | |
390 | options.env = env_; | |
391 | const std::string kCfName1 = "pikachu"; | |
392 | CreateAndReopenWithCF({kCfName1}, options); | |
393 | ||
394 | Options options1; | |
395 | options1.env = env_; | |
396 | options1.max_open_files = -1; | |
397 | OpenSecondaryWithColumnFamilies({kCfName1}, options1); | |
398 | ASSERT_EQ(2, handles_secondary_.size()); | |
399 | ||
400 | ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1")); | |
401 | ASSERT_OK(Flush(1 /*cf*/)); | |
402 | ||
403 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
404 | ReadOptions ropts; | |
405 | ropts.verify_checksums = true; | |
406 | std::string value; | |
407 | ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value)); | |
408 | ASSERT_EQ("foo_val_1", value); | |
409 | ||
410 | ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); | |
411 | Close(); | |
412 | CheckFileTypeCounts(dbname_, 1, 0, 1); | |
413 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
414 | value.clear(); | |
415 | ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value)); | |
416 | ASSERT_EQ("foo_val_1", value); | |
417 | } | |
418 | ||
419 | TEST_F(DBSecondaryTest, SwitchManifest) { | |
420 | Options options; | |
421 | options.env = env_; | |
422 | options.level0_file_num_compaction_trigger = 4; | |
423 | Reopen(options); | |
424 | ||
425 | Options options1; | |
426 | options1.env = env_; | |
427 | options1.max_open_files = -1; | |
428 | OpenSecondary(options1); | |
429 | ||
430 | const int kNumFiles = options.level0_file_num_compaction_trigger - 1; | |
431 | // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1, | |
432 | // ..., 9. | |
433 | const int kNumKeys = 10; | |
434 | // Create two sst | |
435 | for (int i = 0; i != kNumFiles; ++i) { | |
436 | for (int j = 0; j != kNumKeys; ++j) { | |
437 | ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i))); | |
438 | } | |
439 | ASSERT_OK(Flush()); | |
440 | } | |
441 | ||
442 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
443 | const auto& range_scan_db = [&]() { | |
444 | ReadOptions tmp_ropts; | |
445 | tmp_ropts.total_order_seek = true; | |
446 | tmp_ropts.verify_checksums = true; | |
447 | std::unique_ptr<Iterator> iter(db_secondary_->NewIterator(tmp_ropts)); | |
448 | int cnt = 0; | |
449 | for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) { | |
450 | ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString()); | |
451 | ASSERT_EQ("value_" + std::to_string(kNumFiles - 1), | |
452 | iter->value().ToString()); | |
453 | } | |
454 | }; | |
455 | ||
456 | range_scan_db(); | |
457 | ||
458 | // While secondary instance still keeps old MANIFEST open, we close primary, | |
459 | // restart primary, performs full compaction, close again, restart again so | |
460 | // that next time secondary tries to catch up with primary, the secondary | |
461 | // will skip the MANIFEST in middle. | |
462 | Reopen(options); | |
463 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
464 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
465 | ||
466 | Reopen(options); | |
467 | ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); | |
468 | ||
469 | ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); | |
470 | range_scan_db(); | |
471 | } | |
472 | #endif //! ROCKSDB_LITE | |
473 | ||
474 | } // namespace rocksdb | |
475 | ||
476 | int main(int argc, char** argv) { | |
477 | rocksdb::port::InstallStackTraceHandler(); | |
478 | ::testing::InitGoogleTest(&argc, argv); | |
479 | return RUN_ALL_TESTS(); | |
480 | } |