]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_secondary_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_secondary_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_impl_secondary.h"
11 #include "db/db_test_util.h"
12 #include "port/stack_trace.h"
13 #include "util/fault_injection_test_env.h"
14 #include "util/sync_point.h"
15
16 namespace rocksdb {
17
18 #ifndef ROCKSDB_LITE
19 class DBSecondaryTest : public DBTestBase {
20 public:
21 DBSecondaryTest()
22 : DBTestBase("/db_secondary_test"),
23 secondary_path_(),
24 handles_secondary_(),
25 db_secondary_(nullptr) {
26 secondary_path_ =
27 test::PerThreadDBPath(env_, "/db_secondary_test_secondary");
28 }
29
30 ~DBSecondaryTest() override {
31 CloseSecondary();
32 if (getenv("KEEP_DB") != nullptr) {
33 fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str());
34 } else {
35 Options options;
36 options.env = env_;
37 EXPECT_OK(DestroyDB(secondary_path_, options));
38 }
39 }
40
41 protected:
42 Status ReopenAsSecondary(const Options& options) {
43 return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_);
44 }
45
46 void OpenSecondary(const Options& options);
47
48 void OpenSecondaryWithColumnFamilies(
49 const std::vector<std::string>& column_families, const Options& options);
50
51 void CloseSecondary() {
52 for (auto h : handles_secondary_) {
53 db_secondary_->DestroyColumnFamilyHandle(h);
54 }
55 handles_secondary_.clear();
56 delete db_secondary_;
57 db_secondary_ = nullptr;
58 }
59
60 DBImplSecondary* db_secondary_full() {
61 return static_cast<DBImplSecondary*>(db_secondary_);
62 }
63
64 void CheckFileTypeCounts(const std::string& dir, int expected_log,
65 int expected_sst, int expected_manifest) const;
66
67 std::string secondary_path_;
68 std::vector<ColumnFamilyHandle*> handles_secondary_;
69 DB* db_secondary_;
70 };
71
72 void DBSecondaryTest::OpenSecondary(const Options& options) {
73 Status s =
74 DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
75 ASSERT_OK(s);
76 }
77
78 void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
79 const std::vector<std::string>& column_families, const Options& options) {
80 std::vector<ColumnFamilyDescriptor> cf_descs;
81 cf_descs.emplace_back(kDefaultColumnFamilyName, options);
82 for (const auto& cf_name : column_families) {
83 cf_descs.emplace_back(cf_name, options);
84 }
85 Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs,
86 &handles_secondary_, &db_secondary_);
87 ASSERT_OK(s);
88 }
89
90 void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir,
91 int expected_log, int expected_sst,
92 int expected_manifest) const {
93 std::vector<std::string> filenames;
94 env_->GetChildren(dir, &filenames);
95
96 int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0;
97 for (auto file : filenames) {
98 uint64_t number;
99 FileType type;
100 if (ParseFileName(file, &number, &type)) {
101 log_cnt += (type == kLogFile);
102 sst_cnt += (type == kTableFile);
103 manifest_cnt += (type == kDescriptorFile);
104 }
105 }
106 ASSERT_EQ(expected_log, log_cnt);
107 ASSERT_EQ(expected_sst, sst_cnt);
108 ASSERT_EQ(expected_manifest, manifest_cnt);
109 }
110
111 TEST_F(DBSecondaryTest, ReopenAsSecondary) {
112 Options options;
113 options.env = env_;
114 Reopen(options);
115 ASSERT_OK(Put("foo", "foo_value"));
116 ASSERT_OK(Put("bar", "bar_value"));
117 ASSERT_OK(dbfull()->Flush(FlushOptions()));
118 Close();
119
120 ASSERT_OK(ReopenAsSecondary(options));
121 ASSERT_EQ("foo_value", Get("foo"));
122 ASSERT_EQ("bar_value", Get("bar"));
123 ReadOptions ropts;
124 ropts.verify_checksums = true;
125 auto db1 = static_cast<DBImplSecondary*>(db_);
126 ASSERT_NE(nullptr, db1);
127 Iterator* iter = db1->NewIterator(ropts);
128 ASSERT_NE(nullptr, iter);
129 size_t count = 0;
130 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
131 if (0 == count) {
132 ASSERT_EQ("bar", iter->key().ToString());
133 ASSERT_EQ("bar_value", iter->value().ToString());
134 } else if (1 == count) {
135 ASSERT_EQ("foo", iter->key().ToString());
136 ASSERT_EQ("foo_value", iter->value().ToString());
137 }
138 ++count;
139 }
140 delete iter;
141 ASSERT_EQ(2, count);
142 }
143
144 TEST_F(DBSecondaryTest, OpenAsSecondary) {
145 Options options;
146 options.env = env_;
147 options.level0_file_num_compaction_trigger = 4;
148 Reopen(options);
149 for (int i = 0; i < 3; ++i) {
150 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
151 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
152 ASSERT_OK(Flush());
153 }
154 Options options1;
155 options1.env = env_;
156 options1.max_open_files = -1;
157 OpenSecondary(options1);
158 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
159 ASSERT_OK(dbfull()->TEST_WaitForCompact());
160
161 ReadOptions ropts;
162 ropts.verify_checksums = true;
163 const auto verify_db_func = [&](const std::string& foo_val,
164 const std::string& bar_val) {
165 std::string value;
166 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
167 ASSERT_EQ(foo_val, value);
168 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
169 ASSERT_EQ(bar_val, value);
170 Iterator* iter = db_secondary_->NewIterator(ropts);
171 ASSERT_NE(nullptr, iter);
172 iter->Seek("foo");
173 ASSERT_TRUE(iter->Valid());
174 ASSERT_EQ("foo", iter->key().ToString());
175 ASSERT_EQ(foo_val, iter->value().ToString());
176 iter->Seek("bar");
177 ASSERT_TRUE(iter->Valid());
178 ASSERT_EQ("bar", iter->key().ToString());
179 ASSERT_EQ(bar_val, iter->value().ToString());
180 size_t count = 0;
181 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
182 ++count;
183 }
184 ASSERT_EQ(2, count);
185 delete iter;
186 };
187
188 verify_db_func("foo_value2", "bar_value2");
189
190 ASSERT_OK(Put("foo", "new_foo_value"));
191 ASSERT_OK(Put("bar", "new_bar_value"));
192 ASSERT_OK(Flush());
193
194 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
195 verify_db_func("new_foo_value", "new_bar_value");
196 }
197
198 TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {
199 Options options;
200 options.env = env_;
201 CreateAndReopenWithCF({"pikachu"}, options);
202
203 Options options1;
204 options1.env = env_;
205 options1.max_open_files = -1;
206 std::vector<ColumnFamilyDescriptor> cf_descs;
207 cf_descs.emplace_back(kDefaultColumnFamilyName, options1);
208 cf_descs.emplace_back("pikachu", options1);
209 cf_descs.emplace_back("eevee", options1);
210 Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs,
211 &handles_secondary_, &db_secondary_);
212 ASSERT_NOK(s);
213 }
214
215 TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) {
216 Options options;
217 options.env = env_;
218 CreateAndReopenWithCF({"pikachu"}, options);
219 Options options1;
220 options1.env = env_;
221 options1.max_open_files = -1;
222 OpenSecondary(options1);
223 ASSERT_EQ(0, handles_secondary_.size());
224 ASSERT_NE(nullptr, db_secondary_);
225
226 ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value"));
227 ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value"));
228 ASSERT_OK(Flush(0 /*cf*/));
229 ASSERT_OK(Flush(1 /*cf*/));
230 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
231 ReadOptions ropts;
232 ropts.verify_checksums = true;
233 std::string value;
234 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
235 ASSERT_EQ("foo_value", value);
236 }
237
238 TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) {
239 Options options;
240 options.env = env_;
241 Reopen(options);
242 Close();
243
244 SyncPoint::GetInstance()->DisableProcessing();
245 SyncPoint::GetInstance()->ClearAllCallBacks();
246 SyncPoint::GetInstance()->LoadDependency(
247 {{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0",
248 "VersionSet::ProcessManifestWrites:BeforeNewManifest"},
249 {"VersionSet::ProcessManifestWrites:AfterNewManifest",
250 "ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:"
251 "1"}});
252 SyncPoint::GetInstance()->EnableProcessing();
253
254 // Make sure db calls RecoverLogFiles so as to trigger a manifest write,
255 // which causes the db to switch to a new MANIFEST upon start.
256 port::Thread ro_db_thread([&]() {
257 Options options1;
258 options1.env = env_;
259 options1.max_open_files = -1;
260 OpenSecondary(options1);
261 CloseSecondary();
262 });
263 Reopen(options);
264 ro_db_thread.join();
265 }
266
267 TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) {
268 Options options;
269 options.env = env_;
270 options.level0_file_num_compaction_trigger = 4;
271 Reopen(options);
272 for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
273 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
274 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
275 ASSERT_OK(dbfull()->Flush(FlushOptions()));
276 }
277 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
278 ASSERT_OK(dbfull()->TEST_WaitForCompact());
279 Options options1;
280 options1.env = env_;
281 options1.max_open_files = -1;
282 OpenSecondary(options1);
283 ReadOptions ropts;
284 ropts.verify_checksums = true;
285 std::string value;
286 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
287 ASSERT_EQ("foo_value" +
288 std::to_string(options.level0_file_num_compaction_trigger - 1),
289 value);
290 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
291 ASSERT_EQ("bar_value" +
292 std::to_string(options.level0_file_num_compaction_trigger - 1),
293 value);
294 Iterator* iter = db_secondary_->NewIterator(ropts);
295 ASSERT_NE(nullptr, iter);
296 iter->Seek("bar");
297 ASSERT_TRUE(iter->Valid());
298 ASSERT_EQ("bar", iter->key().ToString());
299 ASSERT_EQ("bar_value" +
300 std::to_string(options.level0_file_num_compaction_trigger - 1),
301 iter->value().ToString());
302 iter->Seek("foo");
303 ASSERT_TRUE(iter->Valid());
304 ASSERT_EQ("foo", iter->key().ToString());
305 ASSERT_EQ("foo_value" +
306 std::to_string(options.level0_file_num_compaction_trigger - 1),
307 iter->value().ToString());
308 size_t count = 0;
309 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
310 ++count;
311 }
312 ASSERT_EQ(2, count);
313 delete iter;
314 }
315
316 TEST_F(DBSecondaryTest, MissingTableFile) {
317 int table_files_not_exist = 0;
318 SyncPoint::GetInstance()->DisableProcessing();
319 SyncPoint::GetInstance()->ClearAllCallBacks();
320 SyncPoint::GetInstance()->SetCallBack(
321 "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers",
322 [&](void* arg) {
323 Status s = *reinterpret_cast<Status*>(arg);
324 if (s.IsPathNotFound()) {
325 ++table_files_not_exist;
326 } else if (!s.ok()) {
327 assert(false); // Should not reach here
328 }
329 });
330 SyncPoint::GetInstance()->EnableProcessing();
331 Options options;
332 options.env = env_;
333 options.level0_file_num_compaction_trigger = 4;
334 Reopen(options);
335
336 Options options1;
337 options1.env = env_;
338 options1.max_open_files = -1;
339 OpenSecondary(options1);
340
341 for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
342 ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
343 ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
344 ASSERT_OK(dbfull()->Flush(FlushOptions()));
345 }
346 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
347 ASSERT_OK(dbfull()->TEST_WaitForCompact());
348
349 ASSERT_NE(nullptr, db_secondary_full());
350 ReadOptions ropts;
351 ropts.verify_checksums = true;
352 std::string value;
353 ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value));
354 ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value));
355
356 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
357 ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist);
358 ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
359 ASSERT_EQ("foo_value" +
360 std::to_string(options.level0_file_num_compaction_trigger - 1),
361 value);
362 ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
363 ASSERT_EQ("bar_value" +
364 std::to_string(options.level0_file_num_compaction_trigger - 1),
365 value);
366 Iterator* iter = db_secondary_->NewIterator(ropts);
367 ASSERT_NE(nullptr, iter);
368 iter->Seek("bar");
369 ASSERT_TRUE(iter->Valid());
370 ASSERT_EQ("bar", iter->key().ToString());
371 ASSERT_EQ("bar_value" +
372 std::to_string(options.level0_file_num_compaction_trigger - 1),
373 iter->value().ToString());
374 iter->Seek("foo");
375 ASSERT_TRUE(iter->Valid());
376 ASSERT_EQ("foo", iter->key().ToString());
377 ASSERT_EQ("foo_value" +
378 std::to_string(options.level0_file_num_compaction_trigger - 1),
379 iter->value().ToString());
380 size_t count = 0;
381 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
382 ++count;
383 }
384 ASSERT_EQ(2, count);
385 delete iter;
386 }
387
388 TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) {
389 Options options;
390 options.env = env_;
391 const std::string kCfName1 = "pikachu";
392 CreateAndReopenWithCF({kCfName1}, options);
393
394 Options options1;
395 options1.env = env_;
396 options1.max_open_files = -1;
397 OpenSecondaryWithColumnFamilies({kCfName1}, options1);
398 ASSERT_EQ(2, handles_secondary_.size());
399
400 ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1"));
401 ASSERT_OK(Flush(1 /*cf*/));
402
403 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
404 ReadOptions ropts;
405 ropts.verify_checksums = true;
406 std::string value;
407 ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
408 ASSERT_EQ("foo_val_1", value);
409
410 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
411 Close();
412 CheckFileTypeCounts(dbname_, 1, 0, 1);
413 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
414 value.clear();
415 ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
416 ASSERT_EQ("foo_val_1", value);
417 }
418
419 TEST_F(DBSecondaryTest, SwitchManifest) {
420 Options options;
421 options.env = env_;
422 options.level0_file_num_compaction_trigger = 4;
423 Reopen(options);
424
425 Options options1;
426 options1.env = env_;
427 options1.max_open_files = -1;
428 OpenSecondary(options1);
429
430 const int kNumFiles = options.level0_file_num_compaction_trigger - 1;
431 // Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1,
432 // ..., 9.
433 const int kNumKeys = 10;
434 // Create two sst
435 for (int i = 0; i != kNumFiles; ++i) {
436 for (int j = 0; j != kNumKeys; ++j) {
437 ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i)));
438 }
439 ASSERT_OK(Flush());
440 }
441
442 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
443 const auto& range_scan_db = [&]() {
444 ReadOptions tmp_ropts;
445 tmp_ropts.total_order_seek = true;
446 tmp_ropts.verify_checksums = true;
447 std::unique_ptr<Iterator> iter(db_secondary_->NewIterator(tmp_ropts));
448 int cnt = 0;
449 for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) {
450 ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString());
451 ASSERT_EQ("value_" + std::to_string(kNumFiles - 1),
452 iter->value().ToString());
453 }
454 };
455
456 range_scan_db();
457
458 // While secondary instance still keeps old MANIFEST open, we close primary,
459 // restart primary, performs full compaction, close again, restart again so
460 // that next time secondary tries to catch up with primary, the secondary
461 // will skip the MANIFEST in middle.
462 Reopen(options);
463 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
464 ASSERT_OK(dbfull()->TEST_WaitForCompact());
465
466 Reopen(options);
467 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
468
469 ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
470 range_scan_db();
471 }
472 #endif //! ROCKSDB_LITE
473
474 } // namespace rocksdb
475
476 int main(int argc, char** argv) {
477 rocksdb::port::InstallStackTraceHandler();
478 ::testing::InitGoogleTest(&argc, argv);
479 return RUN_ALL_TESTS();
480 }