]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/external_sst_file_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / external_sst_file_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
8#include <functional>
9#include "db/db_test_util.h"
f67539c2 10#include "file/filename.h"
7c673cae
FG
11#include "port/port.h"
12#include "port/stack_trace.h"
13#include "rocksdb/sst_file_writer.h"
f67539c2
TL
14#include "test_util/fault_injection_test_env.h"
15#include "test_util/testutil.h"
7c673cae 16
f67539c2
TL
17namespace ROCKSDB_NAMESPACE {
18
19// A test environment that can be configured to fail the Link operation.
20class ExternalSSTTestEnv : public EnvWrapper {
21 public:
22 ExternalSSTTestEnv(Env* t, bool fail_link)
23 : EnvWrapper(t), fail_link_(fail_link) {}
24
25 Status LinkFile(const std::string& s, const std::string& t) override {
26 if (fail_link_) {
27 return Status::NotSupported("Link failed");
28 }
29 return target()->LinkFile(s, t);
30 }
31
32 void set_fail_link(bool fail_link) { fail_link_ = fail_link; }
33
34 private:
35 bool fail_link_;
36};
37
38class ExternSSTFileLinkFailFallbackTest
39 : public DBTestBase,
40 public ::testing::WithParamInterface<std::tuple<bool, bool>> {
41 public:
42 ExternSSTFileLinkFailFallbackTest()
43 : DBTestBase("/external_sst_file_test"),
44 test_env_(new ExternalSSTTestEnv(env_, true)) {
45 sst_files_dir_ = dbname_ + "/sst_files/";
46 test::DestroyDir(env_, sst_files_dir_);
47 env_->CreateDir(sst_files_dir_);
48 options_ = CurrentOptions();
49 options_.disable_auto_compactions = true;
50 options_.env = test_env_;
51 }
52
53 void TearDown() override {
54 delete db_;
55 db_ = nullptr;
56 ASSERT_OK(DestroyDB(dbname_, options_));
57 delete test_env_;
58 test_env_ = nullptr;
59 }
60
61 protected:
62 std::string sst_files_dir_;
63 Options options_;
64 ExternalSSTTestEnv* test_env_;
65};
7c673cae 66
494da23a
TL
67class ExternalSSTFileTest
68 : public DBTestBase,
69 public ::testing::WithParamInterface<std::tuple<bool, bool>> {
7c673cae
FG
70 public:
71 ExternalSSTFileTest() : DBTestBase("/external_sst_file_test") {
72 sst_files_dir_ = dbname_ + "/sst_files/";
73 DestroyAndRecreateExternalSSTFilesDir();
74 }
75
76 void DestroyAndRecreateExternalSSTFilesDir() {
77 test::DestroyDir(env_, sst_files_dir_);
78 env_->CreateDir(sst_files_dir_);
79 }
80
494da23a
TL
81 Status GenerateOneExternalFile(
82 const Options& options, ColumnFamilyHandle* cfh,
83 std::vector<std::pair<std::string, std::string>>& data, int file_id,
84 bool sort_data, std::string* external_file_path,
85 std::map<std::string, std::string>* true_data) {
7c673cae 86 // Generate a file id if not provided
494da23a
TL
87 if (-1 == file_id) {
88 file_id = (++last_file_id_);
7c673cae 89 }
7c673cae
FG
90 // Sort data if asked to do so
91 if (sort_data) {
92 std::sort(data.begin(), data.end(),
93 [&](const std::pair<std::string, std::string>& e1,
94 const std::pair<std::string, std::string>& e2) {
95 return options.comparator->Compare(e1.first, e2.first) < 0;
96 });
97 auto uniq_iter = std::unique(
98 data.begin(), data.end(),
99 [&](const std::pair<std::string, std::string>& e1,
100 const std::pair<std::string, std::string>& e2) {
101 return options.comparator->Compare(e1.first, e2.first) == 0;
102 });
103 data.resize(uniq_iter - data.begin());
104 }
105 std::string file_path = sst_files_dir_ + ToString(file_id);
106 SstFileWriter sst_file_writer(EnvOptions(), options, cfh);
7c673cae
FG
107 Status s = sst_file_writer.Open(file_path);
108 if (!s.ok()) {
109 return s;
110 }
494da23a 111 for (const auto& entry : data) {
11fdf7f2 112 s = sst_file_writer.Put(entry.first, entry.second);
7c673cae
FG
113 if (!s.ok()) {
114 sst_file_writer.Finish();
115 return s;
116 }
117 }
118 s = sst_file_writer.Finish();
494da23a
TL
119 if (s.ok() && external_file_path != nullptr) {
120 *external_file_path = file_path;
7c673cae 121 }
494da23a
TL
122 if (s.ok() && nullptr != true_data) {
123 for (const auto& entry : data) {
124 true_data->insert({entry.first, entry.second});
7c673cae
FG
125 }
126 }
7c673cae
FG
127 return s;
128 }
129
494da23a
TL
130 Status GenerateAndAddExternalFile(
131 const Options options,
11fdf7f2 132 std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
494da23a
TL
133 bool allow_global_seqno = false, bool write_global_seqno = false,
134 bool verify_checksums_before_ingest = true, bool ingest_behind = false,
11fdf7f2
TL
135 bool sort_data = false,
136 std::map<std::string, std::string>* true_data = nullptr,
137 ColumnFamilyHandle* cfh = nullptr) {
138 // Generate a file id if not provided
139 if (file_id == -1) {
140 file_id = last_file_id_ + 1;
141 last_file_id_++;
142 }
143
144 // Sort data if asked to do so
145 if (sort_data) {
146 std::sort(data.begin(), data.end(),
147 [&](const std::pair<std::string, std::string>& e1,
148 const std::pair<std::string, std::string>& e2) {
149 return options.comparator->Compare(e1.first, e2.first) < 0;
150 });
151 auto uniq_iter = std::unique(
152 data.begin(), data.end(),
153 [&](const std::pair<std::string, std::string>& e1,
154 const std::pair<std::string, std::string>& e2) {
155 return options.comparator->Compare(e1.first, e2.first) == 0;
156 });
157 data.resize(uniq_iter - data.begin());
158 }
159 std::string file_path = sst_files_dir_ + ToString(file_id);
160 SstFileWriter sst_file_writer(EnvOptions(), options, cfh);
161
162 Status s = sst_file_writer.Open(file_path);
163 if (!s.ok()) {
164 return s;
165 }
166 for (auto& entry : data) {
167 s = sst_file_writer.Put(entry.first, entry.second);
168 if (!s.ok()) {
169 sst_file_writer.Finish();
170 return s;
171 }
172 }
173 s = sst_file_writer.Finish();
174
175 if (s.ok()) {
494da23a
TL
176 IngestExternalFileOptions ifo;
177 ifo.allow_global_seqno = allow_global_seqno;
178 ifo.write_global_seqno = allow_global_seqno ? write_global_seqno : false;
179 ifo.verify_checksums_before_ingest = verify_checksums_before_ingest;
180 ifo.ingest_behind = ingest_behind;
11fdf7f2
TL
181 if (cfh) {
182 s = db_->IngestExternalFile(cfh, {file_path}, ifo);
183 } else {
184 s = db_->IngestExternalFile({file_path}, ifo);
185 }
186 }
187
188 if (s.ok() && true_data) {
189 for (auto& entry : data) {
190 (*true_data)[entry.first] = entry.second;
191 }
192 }
193
194 return s;
195 }
196
494da23a
TL
197 Status GenerateAndAddExternalFiles(
198 const Options& options,
199 const std::vector<ColumnFamilyHandle*>& column_families,
200 const std::vector<IngestExternalFileOptions>& ifos,
201 std::vector<std::vector<std::pair<std::string, std::string>>>& data,
202 int file_id, bool sort_data,
203 std::vector<std::map<std::string, std::string>>& true_data) {
204 if (-1 == file_id) {
205 file_id = (++last_file_id_);
206 }
207 // Generate external SST files, one for each column family
208 size_t num_cfs = column_families.size();
209 assert(ifos.size() == num_cfs);
210 assert(data.size() == num_cfs);
211 Status s;
212 std::vector<IngestExternalFileArg> args(num_cfs);
213 for (size_t i = 0; i != num_cfs; ++i) {
214 std::string external_file_path;
215 s = GenerateOneExternalFile(
216 options, column_families[i], data[i], file_id, sort_data,
217 &external_file_path,
218 true_data.size() == num_cfs ? &true_data[i] : nullptr);
219 if (!s.ok()) {
220 return s;
221 }
222 ++file_id;
11fdf7f2 223
494da23a
TL
224 args[i].column_family = column_families[i];
225 args[i].external_files.push_back(external_file_path);
226 args[i].options = ifos[i];
227 }
228 s = db_->IngestExternalFiles(args);
229 return s;
230 }
11fdf7f2 231
7c673cae
FG
232 Status GenerateAndAddExternalFile(
233 const Options options, std::vector<std::pair<int, std::string>> data,
494da23a
TL
234 int file_id = -1, bool allow_global_seqno = false,
235 bool write_global_seqno = false,
236 bool verify_checksums_before_ingest = true, bool ingest_behind = false,
237 bool sort_data = false,
7c673cae
FG
238 std::map<std::string, std::string>* true_data = nullptr,
239 ColumnFamilyHandle* cfh = nullptr) {
240 std::vector<std::pair<std::string, std::string>> file_data;
241 for (auto& entry : data) {
242 file_data.emplace_back(Key(entry.first), entry.second);
243 }
244 return GenerateAndAddExternalFile(options, file_data, file_id,
494da23a
TL
245 allow_global_seqno, write_global_seqno,
246 verify_checksums_before_ingest,
247 ingest_behind, sort_data, true_data, cfh);
7c673cae
FG
248 }
249
250 Status GenerateAndAddExternalFile(
251 const Options options, std::vector<int> keys, int file_id = -1,
494da23a
TL
252 bool allow_global_seqno = false, bool write_global_seqno = false,
253 bool verify_checksums_before_ingest = true, bool ingest_behind = false,
254 bool sort_data = false,
7c673cae
FG
255 std::map<std::string, std::string>* true_data = nullptr,
256 ColumnFamilyHandle* cfh = nullptr) {
257 std::vector<std::pair<std::string, std::string>> file_data;
258 for (auto& k : keys) {
259 file_data.emplace_back(Key(k), Key(k) + ToString(file_id));
260 }
261 return GenerateAndAddExternalFile(options, file_data, file_id,
494da23a
TL
262 allow_global_seqno, write_global_seqno,
263 verify_checksums_before_ingest,
264 ingest_behind, sort_data, true_data, cfh);
7c673cae
FG
265 }
266
267 Status DeprecatedAddFile(const std::vector<std::string>& files,
268 bool move_files = false,
494da23a
TL
269 bool skip_snapshot_check = false,
270 bool skip_write_global_seqno = false) {
7c673cae
FG
271 IngestExternalFileOptions opts;
272 opts.move_files = move_files;
273 opts.snapshot_consistency = !skip_snapshot_check;
274 opts.allow_global_seqno = false;
275 opts.allow_blocking_flush = false;
494da23a 276 opts.write_global_seqno = !skip_write_global_seqno;
7c673cae
FG
277 return db_->IngestExternalFile(files, opts);
278 }
279
494da23a 280 ~ExternalSSTFileTest() override { test::DestroyDir(env_, sst_files_dir_); }
7c673cae
FG
281
282 protected:
283 int last_file_id_ = 0;
284 std::string sst_files_dir_;
285};
286
287TEST_F(ExternalSSTFileTest, Basic) {
288 do {
289 Options options = CurrentOptions();
290
291 SstFileWriter sst_file_writer(EnvOptions(), options);
292
293 // Current file size should be 0 after sst_file_writer init and before open a file.
294 ASSERT_EQ(sst_file_writer.FileSize(), 0);
295
296 // file1.sst (0 => 99)
297 std::string file1 = sst_files_dir_ + "file1.sst";
298 ASSERT_OK(sst_file_writer.Open(file1));
299 for (int k = 0; k < 100; k++) {
11fdf7f2 300 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
7c673cae
FG
301 }
302 ExternalSstFileInfo file1_info;
303 Status s = sst_file_writer.Finish(&file1_info);
304 ASSERT_TRUE(s.ok()) << s.ToString();
305
306 // Current file size should be non-zero after success write.
307 ASSERT_GT(sst_file_writer.FileSize(), 0);
308
309 ASSERT_EQ(file1_info.file_path, file1);
310 ASSERT_EQ(file1_info.num_entries, 100);
311 ASSERT_EQ(file1_info.smallest_key, Key(0));
312 ASSERT_EQ(file1_info.largest_key, Key(99));
11fdf7f2
TL
313 ASSERT_EQ(file1_info.num_range_del_entries, 0);
314 ASSERT_EQ(file1_info.smallest_range_del_key, "");
315 ASSERT_EQ(file1_info.largest_range_del_key, "");
7c673cae 316 // sst_file_writer already finished, cannot add this value
11fdf7f2 317 s = sst_file_writer.Put(Key(100), "bad_val");
7c673cae
FG
318 ASSERT_FALSE(s.ok()) << s.ToString();
319
320 // file2.sst (100 => 199)
321 std::string file2 = sst_files_dir_ + "file2.sst";
322 ASSERT_OK(sst_file_writer.Open(file2));
323 for (int k = 100; k < 200; k++) {
11fdf7f2 324 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
7c673cae
FG
325 }
326 // Cannot add this key because it's not after last added key
11fdf7f2 327 s = sst_file_writer.Put(Key(99), "bad_val");
7c673cae
FG
328 ASSERT_FALSE(s.ok()) << s.ToString();
329 ExternalSstFileInfo file2_info;
330 s = sst_file_writer.Finish(&file2_info);
331 ASSERT_TRUE(s.ok()) << s.ToString();
332 ASSERT_EQ(file2_info.file_path, file2);
333 ASSERT_EQ(file2_info.num_entries, 100);
334 ASSERT_EQ(file2_info.smallest_key, Key(100));
335 ASSERT_EQ(file2_info.largest_key, Key(199));
336
337 // file3.sst (195 => 299)
338 // This file values overlap with file2 values
339 std::string file3 = sst_files_dir_ + "file3.sst";
340 ASSERT_OK(sst_file_writer.Open(file3));
341 for (int k = 195; k < 300; k++) {
11fdf7f2 342 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
7c673cae
FG
343 }
344 ExternalSstFileInfo file3_info;
345 s = sst_file_writer.Finish(&file3_info);
346
347 ASSERT_TRUE(s.ok()) << s.ToString();
348 // Current file size should be non-zero after success finish.
349 ASSERT_GT(sst_file_writer.FileSize(), 0);
350 ASSERT_EQ(file3_info.file_path, file3);
351 ASSERT_EQ(file3_info.num_entries, 105);
352 ASSERT_EQ(file3_info.smallest_key, Key(195));
353 ASSERT_EQ(file3_info.largest_key, Key(299));
354
355 // file4.sst (30 => 39)
356 // This file values overlap with file1 values
357 std::string file4 = sst_files_dir_ + "file4.sst";
358 ASSERT_OK(sst_file_writer.Open(file4));
359 for (int k = 30; k < 40; k++) {
11fdf7f2 360 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
7c673cae
FG
361 }
362 ExternalSstFileInfo file4_info;
363 s = sst_file_writer.Finish(&file4_info);
364 ASSERT_TRUE(s.ok()) << s.ToString();
365 ASSERT_EQ(file4_info.file_path, file4);
366 ASSERT_EQ(file4_info.num_entries, 10);
367 ASSERT_EQ(file4_info.smallest_key, Key(30));
368 ASSERT_EQ(file4_info.largest_key, Key(39));
369
370 // file5.sst (400 => 499)
371 std::string file5 = sst_files_dir_ + "file5.sst";
372 ASSERT_OK(sst_file_writer.Open(file5));
373 for (int k = 400; k < 500; k++) {
11fdf7f2 374 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
7c673cae
FG
375 }
376 ExternalSstFileInfo file5_info;
377 s = sst_file_writer.Finish(&file5_info);
378 ASSERT_TRUE(s.ok()) << s.ToString();
379 ASSERT_EQ(file5_info.file_path, file5);
380 ASSERT_EQ(file5_info.num_entries, 100);
381 ASSERT_EQ(file5_info.smallest_key, Key(400));
382 ASSERT_EQ(file5_info.largest_key, Key(499));
383
11fdf7f2
TL
384 // file6.sst (delete 400 => 500)
385 std::string file6 = sst_files_dir_ + "file6.sst";
386 ASSERT_OK(sst_file_writer.Open(file6));
387 sst_file_writer.DeleteRange(Key(400), Key(500));
388 ExternalSstFileInfo file6_info;
389 s = sst_file_writer.Finish(&file6_info);
390 ASSERT_TRUE(s.ok()) << s.ToString();
391 ASSERT_EQ(file6_info.file_path, file6);
392 ASSERT_EQ(file6_info.num_entries, 0);
393 ASSERT_EQ(file6_info.smallest_key, "");
394 ASSERT_EQ(file6_info.largest_key, "");
395 ASSERT_EQ(file6_info.num_range_del_entries, 1);
396 ASSERT_EQ(file6_info.smallest_range_del_key, Key(400));
397 ASSERT_EQ(file6_info.largest_range_del_key, Key(500));
398
399 // file7.sst (delete 500 => 570, put 520 => 599 divisible by 2)
400 std::string file7 = sst_files_dir_ + "file7.sst";
401 ASSERT_OK(sst_file_writer.Open(file7));
402 sst_file_writer.DeleteRange(Key(500), Key(550));
403 for (int k = 520; k < 560; k += 2) {
404 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
405 }
406 sst_file_writer.DeleteRange(Key(525), Key(575));
407 for (int k = 560; k < 600; k += 2) {
408 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
409 }
410 ExternalSstFileInfo file7_info;
411 s = sst_file_writer.Finish(&file7_info);
412 ASSERT_TRUE(s.ok()) << s.ToString();
413 ASSERT_EQ(file7_info.file_path, file7);
414 ASSERT_EQ(file7_info.num_entries, 40);
415 ASSERT_EQ(file7_info.smallest_key, Key(520));
416 ASSERT_EQ(file7_info.largest_key, Key(598));
417 ASSERT_EQ(file7_info.num_range_del_entries, 2);
418 ASSERT_EQ(file7_info.smallest_range_del_key, Key(500));
419 ASSERT_EQ(file7_info.largest_range_del_key, Key(575));
420
421 // file8.sst (delete 600 => 700)
422 std::string file8 = sst_files_dir_ + "file8.sst";
423 ASSERT_OK(sst_file_writer.Open(file8));
424 sst_file_writer.DeleteRange(Key(600), Key(700));
425 ExternalSstFileInfo file8_info;
426 s = sst_file_writer.Finish(&file8_info);
427 ASSERT_TRUE(s.ok()) << s.ToString();
428 ASSERT_EQ(file8_info.file_path, file8);
429 ASSERT_EQ(file8_info.num_entries, 0);
430 ASSERT_EQ(file8_info.smallest_key, "");
431 ASSERT_EQ(file8_info.largest_key, "");
432 ASSERT_EQ(file8_info.num_range_del_entries, 1);
433 ASSERT_EQ(file8_info.smallest_range_del_key, Key(600));
434 ASSERT_EQ(file8_info.largest_range_del_key, Key(700));
435
7c673cae
FG
436 // Cannot create an empty sst file
437 std::string file_empty = sst_files_dir_ + "file_empty.sst";
438 ExternalSstFileInfo file_empty_info;
439 s = sst_file_writer.Finish(&file_empty_info);
440 ASSERT_NOK(s);
441
442 DestroyAndReopen(options);
443 // Add file using file path
444 s = DeprecatedAddFile({file1});
445 ASSERT_TRUE(s.ok()) << s.ToString();
446 ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
447 for (int k = 0; k < 100; k++) {
448 ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
449 }
450
451 // Add file while holding a snapshot will fail
452 const Snapshot* s1 = db_->GetSnapshot();
453 if (s1 != nullptr) {
454 ASSERT_NOK(DeprecatedAddFile({file2}));
455 db_->ReleaseSnapshot(s1);
456 }
457 // We can add the file after releaseing the snapshot
458 ASSERT_OK(DeprecatedAddFile({file2}));
459
460 ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
461 for (int k = 0; k < 200; k++) {
462 ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
463 }
464
465 // This file has overlapping values with the existing data
466 s = DeprecatedAddFile({file3});
467 ASSERT_FALSE(s.ok()) << s.ToString();
468
469 // This file has overlapping values with the existing data
470 s = DeprecatedAddFile({file4});
471 ASSERT_FALSE(s.ok()) << s.ToString();
472
473 // Overwrite values of keys divisible by 5
474 for (int k = 0; k < 200; k += 5) {
475 ASSERT_OK(Put(Key(k), Key(k) + "_val_new"));
476 }
477 ASSERT_NE(db_->GetLatestSequenceNumber(), 0U);
478
479 // Key range of file5 (400 => 499) dont overlap with any keys in DB
480 ASSERT_OK(DeprecatedAddFile({file5}));
481
11fdf7f2
TL
482 // This file has overlapping values with the existing data
483 s = DeprecatedAddFile({file6});
484 ASSERT_FALSE(s.ok()) << s.ToString();
485
486 // Key range of file7 (500 => 598) dont overlap with any keys in DB
487 ASSERT_OK(DeprecatedAddFile({file7}));
488
489 // Key range of file7 (600 => 700) dont overlap with any keys in DB
490 ASSERT_OK(DeprecatedAddFile({file8}));
491
7c673cae
FG
492 // Make sure values are correct before and after flush/compaction
493 for (int i = 0; i < 2; i++) {
494 for (int k = 0; k < 200; k++) {
495 std::string value = Key(k) + "_val";
496 if (k % 5 == 0) {
497 value += "_new";
498 }
499 ASSERT_EQ(Get(Key(k)), value);
500 }
501 for (int k = 400; k < 500; k++) {
502 std::string value = Key(k) + "_val";
503 ASSERT_EQ(Get(Key(k)), value);
504 }
11fdf7f2
TL
505 for (int k = 500; k < 600; k++) {
506 std::string value = Key(k) + "_val";
507 if (k < 520 || k % 2 == 1) {
508 value = "NOT_FOUND";
509 }
510 ASSERT_EQ(Get(Key(k)), value);
511 }
7c673cae
FG
512 ASSERT_OK(Flush());
513 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
514 }
515
516 Close();
517 options.disable_auto_compactions = true;
518 Reopen(options);
519
520 // Delete keys in range (400 => 499)
521 for (int k = 400; k < 500; k++) {
522 ASSERT_OK(Delete(Key(k)));
523 }
524 // We deleted range (400 => 499) but cannot add file5 because
525 // of the range tombstones
526 ASSERT_NOK(DeprecatedAddFile({file5}));
527
528 // Compacting the DB will remove the tombstones
529 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
530
531 // Now we can add the file
532 ASSERT_OK(DeprecatedAddFile({file5}));
533
534 // Verify values of file5 in DB
535 for (int k = 400; k < 500; k++) {
536 std::string value = Key(k) + "_val";
537 ASSERT_EQ(Get(Key(k)), value);
538 }
539 DestroyAndRecreateExternalSSTFilesDir();
11fdf7f2
TL
540 } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
541 kRangeDelSkipConfigs));
7c673cae 542}
494da23a 543
7c673cae
FG
544class SstFileWriterCollector : public TablePropertiesCollector {
545 public:
546 explicit SstFileWriterCollector(const std::string prefix) : prefix_(prefix) {
547 name_ = prefix_ + "_SstFileWriterCollector";
548 }
549
550 const char* Name() const override { return name_.c_str(); }
551
552 Status Finish(UserCollectedProperties* properties) override {
11fdf7f2 553 std::string count = std::to_string(count_);
7c673cae
FG
554 *properties = UserCollectedProperties{
555 {prefix_ + "_SstFileWriterCollector", "YES"},
11fdf7f2 556 {prefix_ + "_Count", count},
7c673cae
FG
557 };
558 return Status::OK();
559 }
560
11fdf7f2
TL
561 Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
562 EntryType /*type*/, SequenceNumber /*seq*/,
563 uint64_t /*file_size*/) override {
7c673cae
FG
564 ++count_;
565 return Status::OK();
566 }
567
494da23a 568 UserCollectedProperties GetReadableProperties() const override {
7c673cae
FG
569 return UserCollectedProperties{};
570 }
571
572 private:
573 uint32_t count_ = 0;
574 std::string prefix_;
575 std::string name_;
576};
577
578class SstFileWriterCollectorFactory : public TablePropertiesCollectorFactory {
579 public:
580 explicit SstFileWriterCollectorFactory(std::string prefix)
581 : prefix_(prefix), num_created_(0) {}
494da23a 582 TablePropertiesCollector* CreateTablePropertiesCollector(
11fdf7f2 583 TablePropertiesCollectorFactory::Context /*context*/) override {
7c673cae
FG
584 num_created_++;
585 return new SstFileWriterCollector(prefix_);
586 }
587 const char* Name() const override { return "SstFileWriterCollectorFactory"; }
588
589 std::string prefix_;
590 uint32_t num_created_;
591};
592
593TEST_F(ExternalSSTFileTest, AddList) {
594 do {
595 Options options = CurrentOptions();
596
597 auto abc_collector = std::make_shared<SstFileWriterCollectorFactory>("abc");
598 auto xyz_collector = std::make_shared<SstFileWriterCollectorFactory>("xyz");
599
600 options.table_properties_collector_factories.emplace_back(abc_collector);
601 options.table_properties_collector_factories.emplace_back(xyz_collector);
602
603 SstFileWriter sst_file_writer(EnvOptions(), options);
604
605 // file1.sst (0 => 99)
606 std::string file1 = sst_files_dir_ + "file1.sst";
607 ASSERT_OK(sst_file_writer.Open(file1));
608 for (int k = 0; k < 100; k++) {
11fdf7f2 609 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
7c673cae
FG
610 }
611 ExternalSstFileInfo file1_info;
612 Status s = sst_file_writer.Finish(&file1_info);
613 ASSERT_TRUE(s.ok()) << s.ToString();
614 ASSERT_EQ(file1_info.file_path, file1);
615 ASSERT_EQ(file1_info.num_entries, 100);
616 ASSERT_EQ(file1_info.smallest_key, Key(0));
617 ASSERT_EQ(file1_info.largest_key, Key(99));
618 // sst_file_writer already finished, cannot add this value
11fdf7f2 619 s = sst_file_writer.Put(Key(100), "bad_val");
7c673cae
FG
620 ASSERT_FALSE(s.ok()) << s.ToString();
621
622 // file2.sst (100 => 199)
623 std::string file2 = sst_files_dir_ + "file2.sst";
624 ASSERT_OK(sst_file_writer.Open(file2));
625 for (int k = 100; k < 200; k++) {
11fdf7f2 626 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
7c673cae
FG
627 }
628 // Cannot add this key because it's not after last added key
11fdf7f2 629 s = sst_file_writer.Put(Key(99), "bad_val");
7c673cae
FG
630 ASSERT_FALSE(s.ok()) << s.ToString();
631 ExternalSstFileInfo file2_info;
632 s = sst_file_writer.Finish(&file2_info);
633 ASSERT_TRUE(s.ok()) << s.ToString();
634 ASSERT_EQ(file2_info.file_path, file2);
635 ASSERT_EQ(file2_info.num_entries, 100);
636 ASSERT_EQ(file2_info.smallest_key, Key(100));
637 ASSERT_EQ(file2_info.largest_key, Key(199));
638
639 // file3.sst (195 => 199)
640 // This file values overlap with file2 values
641 std::string file3 = sst_files_dir_ + "file3.sst";
642 ASSERT_OK(sst_file_writer.Open(file3));
643 for (int k = 195; k < 200; k++) {
11fdf7f2 644 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
7c673cae
FG
645 }
646 ExternalSstFileInfo file3_info;
647 s = sst_file_writer.Finish(&file3_info);
648 ASSERT_TRUE(s.ok()) << s.ToString();
649 ASSERT_EQ(file3_info.file_path, file3);
650 ASSERT_EQ(file3_info.num_entries, 5);
651 ASSERT_EQ(file3_info.smallest_key, Key(195));
652 ASSERT_EQ(file3_info.largest_key, Key(199));
653
654 // file4.sst (30 => 39)
655 // This file values overlap with file1 values
656 std::string file4 = sst_files_dir_ + "file4.sst";
657 ASSERT_OK(sst_file_writer.Open(file4));
658 for (int k = 30; k < 40; k++) {
11fdf7f2 659 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap"));
7c673cae
FG
660 }
661 ExternalSstFileInfo file4_info;
662 s = sst_file_writer.Finish(&file4_info);
663 ASSERT_TRUE(s.ok()) << s.ToString();
664 ASSERT_EQ(file4_info.file_path, file4);
665 ASSERT_EQ(file4_info.num_entries, 10);
666 ASSERT_EQ(file4_info.smallest_key, Key(30));
667 ASSERT_EQ(file4_info.largest_key, Key(39));
668
669 // file5.sst (200 => 299)
670 std::string file5 = sst_files_dir_ + "file5.sst";
671 ASSERT_OK(sst_file_writer.Open(file5));
672 for (int k = 200; k < 300; k++) {
11fdf7f2 673 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
7c673cae
FG
674 }
675 ExternalSstFileInfo file5_info;
676 s = sst_file_writer.Finish(&file5_info);
677 ASSERT_TRUE(s.ok()) << s.ToString();
678 ASSERT_EQ(file5_info.file_path, file5);
679 ASSERT_EQ(file5_info.num_entries, 100);
680 ASSERT_EQ(file5_info.smallest_key, Key(200));
681 ASSERT_EQ(file5_info.largest_key, Key(299));
682
11fdf7f2
TL
683 // file6.sst (delete 0 => 100)
684 std::string file6 = sst_files_dir_ + "file6.sst";
685 ASSERT_OK(sst_file_writer.Open(file6));
686 ASSERT_OK(sst_file_writer.DeleteRange(Key(0), Key(75)));
687 ASSERT_OK(sst_file_writer.DeleteRange(Key(25), Key(100)));
688 ExternalSstFileInfo file6_info;
689 s = sst_file_writer.Finish(&file6_info);
690 ASSERT_TRUE(s.ok()) << s.ToString();
691 ASSERT_EQ(file6_info.file_path, file6);
692 ASSERT_EQ(file6_info.num_entries, 0);
693 ASSERT_EQ(file6_info.smallest_key, "");
694 ASSERT_EQ(file6_info.largest_key, "");
695 ASSERT_EQ(file6_info.num_range_del_entries, 2);
696 ASSERT_EQ(file6_info.smallest_range_del_key, Key(0));
697 ASSERT_EQ(file6_info.largest_range_del_key, Key(100));
698
f67539c2 699 // file7.sst (delete 99 => 201)
11fdf7f2
TL
700 std::string file7 = sst_files_dir_ + "file7.sst";
701 ASSERT_OK(sst_file_writer.Open(file7));
f67539c2 702 ASSERT_OK(sst_file_writer.DeleteRange(Key(99), Key(201)));
11fdf7f2
TL
703 ExternalSstFileInfo file7_info;
704 s = sst_file_writer.Finish(&file7_info);
705 ASSERT_TRUE(s.ok()) << s.ToString();
706 ASSERT_EQ(file7_info.file_path, file7);
707 ASSERT_EQ(file7_info.num_entries, 0);
708 ASSERT_EQ(file7_info.smallest_key, "");
709 ASSERT_EQ(file7_info.largest_key, "");
710 ASSERT_EQ(file7_info.num_range_del_entries, 1);
f67539c2
TL
711 ASSERT_EQ(file7_info.smallest_range_del_key, Key(99));
712 ASSERT_EQ(file7_info.largest_range_del_key, Key(201));
11fdf7f2 713
7c673cae
FG
714 // list 1 has internal key range conflict
715 std::vector<std::string> file_list0({file1, file2});
716 std::vector<std::string> file_list1({file3, file2, file1});
717 std::vector<std::string> file_list2({file5});
718 std::vector<std::string> file_list3({file3, file4});
11fdf7f2
TL
719 std::vector<std::string> file_list4({file5, file7});
720 std::vector<std::string> file_list5({file6, file7});
7c673cae
FG
721
722 DestroyAndReopen(options);
723
11fdf7f2 724 // These lists of files have key ranges that overlap with each other
7c673cae
FG
725 s = DeprecatedAddFile(file_list1);
726 ASSERT_FALSE(s.ok()) << s.ToString();
f67539c2 727 // Both of the following overlap on the range deletion tombstone.
11fdf7f2
TL
728 s = DeprecatedAddFile(file_list4);
729 ASSERT_FALSE(s.ok()) << s.ToString();
730 s = DeprecatedAddFile(file_list5);
731 ASSERT_FALSE(s.ok()) << s.ToString();
7c673cae
FG
732
733 // Add files using file path list
734 s = DeprecatedAddFile(file_list0);
735 ASSERT_TRUE(s.ok()) << s.ToString();
736 ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
737 for (int k = 0; k < 200; k++) {
738 ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
739 }
740
741 TablePropertiesCollection props;
742 ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
743 ASSERT_EQ(props.size(), 2);
744 for (auto file_props : props) {
745 auto user_props = file_props.second->user_collected_properties;
746 ASSERT_EQ(user_props["abc_SstFileWriterCollector"], "YES");
747 ASSERT_EQ(user_props["xyz_SstFileWriterCollector"], "YES");
748 ASSERT_EQ(user_props["abc_Count"], "100");
749 ASSERT_EQ(user_props["xyz_Count"], "100");
750 }
751
752 // Add file while holding a snapshot will fail
753 const Snapshot* s1 = db_->GetSnapshot();
754 if (s1 != nullptr) {
755 ASSERT_NOK(DeprecatedAddFile(file_list2));
756 db_->ReleaseSnapshot(s1);
757 }
758 // We can add the file after releaseing the snapshot
759 ASSERT_OK(DeprecatedAddFile(file_list2));
760 ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
761 for (int k = 0; k < 300; k++) {
762 ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
763 }
764
765 ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
766 ASSERT_EQ(props.size(), 3);
767 for (auto file_props : props) {
768 auto user_props = file_props.second->user_collected_properties;
769 ASSERT_EQ(user_props["abc_SstFileWriterCollector"], "YES");
770 ASSERT_EQ(user_props["xyz_SstFileWriterCollector"], "YES");
771 ASSERT_EQ(user_props["abc_Count"], "100");
772 ASSERT_EQ(user_props["xyz_Count"], "100");
773 }
774
775 // This file list has overlapping values with the existing data
776 s = DeprecatedAddFile(file_list3);
777 ASSERT_FALSE(s.ok()) << s.ToString();
778
779 // Overwrite values of keys divisible by 5
780 for (int k = 0; k < 200; k += 5) {
781 ASSERT_OK(Put(Key(k), Key(k) + "_val_new"));
782 }
783 ASSERT_NE(db_->GetLatestSequenceNumber(), 0U);
784
785 // Make sure values are correct before and after flush/compaction
786 for (int i = 0; i < 2; i++) {
787 for (int k = 0; k < 200; k++) {
788 std::string value = Key(k) + "_val";
789 if (k % 5 == 0) {
790 value += "_new";
791 }
792 ASSERT_EQ(Get(Key(k)), value);
793 }
794 for (int k = 200; k < 300; k++) {
795 std::string value = Key(k) + "_val";
796 ASSERT_EQ(Get(Key(k)), value);
797 }
798 ASSERT_OK(Flush());
799 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
800 }
801
802 // Delete keys in range (200 => 299)
803 for (int k = 200; k < 300; k++) {
804 ASSERT_OK(Delete(Key(k)));
805 }
806 // We deleted range (200 => 299) but cannot add file5 because
807 // of the range tombstones
808 ASSERT_NOK(DeprecatedAddFile(file_list2));
809
810 // Compacting the DB will remove the tombstones
811 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
812
813 // Now we can add the file
814 ASSERT_OK(DeprecatedAddFile(file_list2));
815
816 // Verify values of file5 in DB
817 for (int k = 200; k < 300; k++) {
818 std::string value = Key(k) + "_val";
819 ASSERT_EQ(Get(Key(k)), value);
820 }
821 DestroyAndRecreateExternalSSTFilesDir();
11fdf7f2
TL
822 } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
823 kRangeDelSkipConfigs));
7c673cae
FG
824}
825
826TEST_F(ExternalSSTFileTest, AddListAtomicity) {
827 do {
828 Options options = CurrentOptions();
829
830 SstFileWriter sst_file_writer(EnvOptions(), options);
831
832 // files[0].sst (0 => 99)
833 // files[1].sst (100 => 199)
834 // ...
835 // file[8].sst (800 => 899)
836 int n = 9;
837 std::vector<std::string> files(n);
838 std::vector<ExternalSstFileInfo> files_info(n);
839 for (int i = 0; i < n; i++) {
840 files[i] = sst_files_dir_ + "file" + std::to_string(i) + ".sst";
841 ASSERT_OK(sst_file_writer.Open(files[i]));
842 for (int k = i * 100; k < (i + 1) * 100; k++) {
11fdf7f2 843 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
7c673cae
FG
844 }
845 Status s = sst_file_writer.Finish(&files_info[i]);
846 ASSERT_TRUE(s.ok()) << s.ToString();
847 ASSERT_EQ(files_info[i].file_path, files[i]);
848 ASSERT_EQ(files_info[i].num_entries, 100);
849 ASSERT_EQ(files_info[i].smallest_key, Key(i * 100));
850 ASSERT_EQ(files_info[i].largest_key, Key((i + 1) * 100 - 1));
851 }
852 files.push_back(sst_files_dir_ + "file" + std::to_string(n) + ".sst");
853 auto s = DeprecatedAddFile(files);
854 ASSERT_NOK(s) << s.ToString();
855 for (int k = 0; k < n * 100; k++) {
856 ASSERT_EQ("NOT_FOUND", Get(Key(k)));
857 }
858 files.pop_back();
859 ASSERT_OK(DeprecatedAddFile(files));
860 for (int k = 0; k < n * 100; k++) {
861 std::string value = Key(k) + "_val";
862 ASSERT_EQ(Get(Key(k)), value);
863 }
864 DestroyAndRecreateExternalSSTFilesDir();
865 } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
866}
867// This test reporduce a bug that can happen in some cases if the DB started
868// purging obsolete files when we are adding an external sst file.
869// This situation may result in deleting the file while it's being added.
870TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) {
871 Options options = CurrentOptions();
872 SstFileWriter sst_file_writer(EnvOptions(), options);
873
874 // file1.sst (0 => 500)
875 std::string sst_file_path = sst_files_dir_ + "file1.sst";
876 Status s = sst_file_writer.Open(sst_file_path);
877 ASSERT_OK(s);
878 for (int i = 0; i < 500; i++) {
879 std::string k = Key(i);
11fdf7f2 880 s = sst_file_writer.Put(k, k + "_val");
7c673cae
FG
881 ASSERT_OK(s);
882 }
883
884 ExternalSstFileInfo sst_file_info;
885 s = sst_file_writer.Finish(&sst_file_info);
886 ASSERT_OK(s);
887
888 options.delete_obsolete_files_period_micros = 0;
889 options.disable_auto_compactions = true;
890 DestroyAndReopen(options);
891
f67539c2 892 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 893 "ExternalSstFileIngestionJob::Prepare:FileAdded", [&](void* /* arg */) {
7c673cae
FG
894 ASSERT_OK(Put("aaa", "bbb"));
895 ASSERT_OK(Flush());
896 ASSERT_OK(Put("aaa", "xxx"));
897 ASSERT_OK(Flush());
898 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
899 });
f67539c2 900 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
901
902 s = DeprecatedAddFile({sst_file_path});
903 ASSERT_OK(s);
904
905 for (int i = 0; i < 500; i++) {
906 std::string k = Key(i);
907 std::string v = k + "_val";
908 ASSERT_EQ(Get(k), v);
909 }
910
f67539c2 911 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
912}
913
914TEST_F(ExternalSSTFileTest, SkipSnapshot) {
915 Options options = CurrentOptions();
916
917 SstFileWriter sst_file_writer(EnvOptions(), options);
918
919 // file1.sst (0 => 99)
920 std::string file1 = sst_files_dir_ + "file1.sst";
921 ASSERT_OK(sst_file_writer.Open(file1));
922 for (int k = 0; k < 100; k++) {
11fdf7f2 923 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
7c673cae
FG
924 }
925 ExternalSstFileInfo file1_info;
926 Status s = sst_file_writer.Finish(&file1_info);
927 ASSERT_TRUE(s.ok()) << s.ToString();
928 ASSERT_EQ(file1_info.file_path, file1);
929 ASSERT_EQ(file1_info.num_entries, 100);
930 ASSERT_EQ(file1_info.smallest_key, Key(0));
931 ASSERT_EQ(file1_info.largest_key, Key(99));
932
933 // file2.sst (100 => 299)
934 std::string file2 = sst_files_dir_ + "file2.sst";
935 ASSERT_OK(sst_file_writer.Open(file2));
936 for (int k = 100; k < 300; k++) {
11fdf7f2 937 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
7c673cae
FG
938 }
939 ExternalSstFileInfo file2_info;
940 s = sst_file_writer.Finish(&file2_info);
941 ASSERT_TRUE(s.ok()) << s.ToString();
942 ASSERT_EQ(file2_info.file_path, file2);
943 ASSERT_EQ(file2_info.num_entries, 200);
944 ASSERT_EQ(file2_info.smallest_key, Key(100));
945 ASSERT_EQ(file2_info.largest_key, Key(299));
946
947 ASSERT_OK(DeprecatedAddFile({file1}));
948
949 // Add file will fail when holding snapshot and use the default
950 // skip_snapshot_check to false
951 const Snapshot* s1 = db_->GetSnapshot();
952 if (s1 != nullptr) {
953 ASSERT_NOK(DeprecatedAddFile({file2}));
954 }
955
956 // Add file will success when set skip_snapshot_check to true even db holding
957 // snapshot
958 if (s1 != nullptr) {
959 ASSERT_OK(DeprecatedAddFile({file2}, false, true));
960 db_->ReleaseSnapshot(s1);
961 }
962
963 // file3.sst (300 => 399)
964 std::string file3 = sst_files_dir_ + "file3.sst";
965 ASSERT_OK(sst_file_writer.Open(file3));
966 for (int k = 300; k < 400; k++) {
11fdf7f2 967 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
7c673cae
FG
968 }
969 ExternalSstFileInfo file3_info;
970 s = sst_file_writer.Finish(&file3_info);
971 ASSERT_TRUE(s.ok()) << s.ToString();
972 ASSERT_EQ(file3_info.file_path, file3);
973 ASSERT_EQ(file3_info.num_entries, 100);
974 ASSERT_EQ(file3_info.smallest_key, Key(300));
975 ASSERT_EQ(file3_info.largest_key, Key(399));
976
977 // check that we have change the old key
978 ASSERT_EQ(Get(Key(300)), "NOT_FOUND");
979 const Snapshot* s2 = db_->GetSnapshot();
980 ASSERT_OK(DeprecatedAddFile({file3}, false, true));
981 ASSERT_EQ(Get(Key(300)), Key(300) + ("_val"));
982 ASSERT_EQ(Get(Key(300), s2), Key(300) + ("_val"));
983
984 db_->ReleaseSnapshot(s2);
985}
986
987TEST_F(ExternalSSTFileTest, MultiThreaded) {
988 // Bulk load 10 files every file contain 1000 keys
989 int num_files = 10;
990 int keys_per_file = 1000;
991
992 // Generate file names
993 std::vector<std::string> file_names;
994 for (int i = 0; i < num_files; i++) {
995 std::string file_name = "file_" + ToString(i) + ".sst";
996 file_names.push_back(sst_files_dir_ + file_name);
997 }
998
999 do {
1000 Options options = CurrentOptions();
1001
1002 std::atomic<int> thread_num(0);
1003 std::function<void()> write_file_func = [&]() {
1004 int file_idx = thread_num.fetch_add(1);
1005 int range_start = file_idx * keys_per_file;
1006 int range_end = range_start + keys_per_file;
1007
1008 SstFileWriter sst_file_writer(EnvOptions(), options);
1009
1010 ASSERT_OK(sst_file_writer.Open(file_names[file_idx]));
1011
1012 for (int k = range_start; k < range_end; k++) {
11fdf7f2 1013 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k)));
7c673cae
FG
1014 }
1015
1016 Status s = sst_file_writer.Finish();
1017 ASSERT_TRUE(s.ok()) << s.ToString();
1018 };
1019 // Write num_files files in parallel
1020 std::vector<port::Thread> sst_writer_threads;
1021 for (int i = 0; i < num_files; ++i) {
1022 sst_writer_threads.emplace_back(write_file_func);
1023 }
1024
1025 for (auto& t : sst_writer_threads) {
1026 t.join();
1027 }
1028
1029 fprintf(stderr, "Wrote %d files (%d keys)\n", num_files,
1030 num_files * keys_per_file);
1031
1032 thread_num.store(0);
1033 std::atomic<int> files_added(0);
1034 // Thread 0 -> Load {f0,f1}
1035 // Thread 1 -> Load {f0,f1}
1036 // Thread 2 -> Load {f2,f3}
1037 // Thread 3 -> Load {f2,f3}
1038 // Thread 4 -> Load {f4,f5}
1039 // Thread 5 -> Load {f4,f5}
1040 // ...
1041 std::function<void()> load_file_func = [&]() {
1042 // We intentionally add every file twice, and assert that it was added
1043 // only once and the other add failed
1044 int thread_id = thread_num.fetch_add(1);
1045 int file_idx = (thread_id / 2) * 2;
1046 // sometimes we use copy, sometimes link .. the result should be the same
1047 bool move_file = (thread_id % 3 == 0);
1048
1049 std::vector<std::string> files_to_add;
1050
1051 files_to_add = {file_names[file_idx]};
1052 if (static_cast<size_t>(file_idx + 1) < file_names.size()) {
1053 files_to_add.push_back(file_names[file_idx + 1]);
1054 }
1055
1056 Status s = DeprecatedAddFile(files_to_add, move_file);
1057 if (s.ok()) {
1058 files_added += static_cast<int>(files_to_add.size());
1059 }
1060 };
1061
1062 // Bulk load num_files files in parallel
1063 std::vector<port::Thread> add_file_threads;
1064 DestroyAndReopen(options);
1065 for (int i = 0; i < num_files; ++i) {
1066 add_file_threads.emplace_back(load_file_func);
1067 }
1068
1069 for (auto& t : add_file_threads) {
1070 t.join();
1071 }
1072 ASSERT_EQ(files_added.load(), num_files);
1073 fprintf(stderr, "Loaded %d files (%d keys)\n", num_files,
1074 num_files * keys_per_file);
1075
1076 // Overwrite values of keys divisible by 100
1077 for (int k = 0; k < num_files * keys_per_file; k += 100) {
1078 std::string key = Key(k);
1079 Status s = Put(key, key + "_new");
1080 ASSERT_TRUE(s.ok());
1081 }
1082
1083 for (int i = 0; i < 2; i++) {
1084 // Make sure the values are correct before and after flush/compaction
1085 for (int k = 0; k < num_files * keys_per_file; ++k) {
1086 std::string key = Key(k);
1087 std::string value = (k % 100 == 0) ? (key + "_new") : key;
1088 ASSERT_EQ(Get(key), value);
1089 }
1090 ASSERT_OK(Flush());
1091 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1092 }
1093
1094 fprintf(stderr, "Verified %d values\n", num_files * keys_per_file);
1095 DestroyAndRecreateExternalSSTFilesDir();
1096 } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
1097}
1098
1099TEST_F(ExternalSSTFileTest, OverlappingRanges) {
1100 Random rnd(301);
11fdf7f2 1101 SequenceNumber assigned_seqno = 0;
f67539c2
TL
1102 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1103 "ExternalSstFileIngestionJob::Run", [&assigned_seqno](void* arg) {
1104 ASSERT_TRUE(arg != nullptr);
1105 assigned_seqno = *(static_cast<SequenceNumber*>(arg));
1106 });
7c673cae 1107 bool need_flush = false;
f67539c2
TL
1108 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1109 "DBImpl::IngestExternalFile:NeedFlush", [&need_flush](void* arg) {
1110 ASSERT_TRUE(arg != nullptr);
1111 need_flush = *(static_cast<bool*>(arg));
1112 });
7c673cae 1113 bool overlap_with_db = false;
f67539c2 1114 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
7c673cae
FG
1115 "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
1116 [&overlap_with_db](void* arg) {
1117 ASSERT_TRUE(arg != nullptr);
1118 overlap_with_db = *(static_cast<bool*>(arg));
1119 });
f67539c2 1120 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
1121 do {
1122 Options options = CurrentOptions();
1123 DestroyAndReopen(options);
1124
1125 SstFileWriter sst_file_writer(EnvOptions(), options);
1126
1127 printf("Option config = %d\n", option_config_);
1128 std::vector<std::pair<int, int>> key_ranges;
11fdf7f2 1129 for (int i = 0; i < 100; i++) {
7c673cae
FG
1130 int range_start = rnd.Uniform(20000);
1131 int keys_per_range = 10 + rnd.Uniform(41);
1132
1133 key_ranges.emplace_back(range_start, range_start + keys_per_range);
1134 }
1135
1136 int memtable_add = 0;
1137 int success_add_file = 0;
1138 int failed_add_file = 0;
1139 std::map<std::string, std::string> true_data;
1140 for (size_t i = 0; i < key_ranges.size(); i++) {
1141 int range_start = key_ranges[i].first;
1142 int range_end = key_ranges[i].second;
1143
1144 Status s;
1145 std::string range_val = "range_" + ToString(i);
1146
1147 // For 20% of ranges we use DB::Put, for 80% we use DB::AddFile
1148 if (i && i % 5 == 0) {
1149 // Use DB::Put to insert range (insert into memtable)
1150 range_val += "_put";
1151 for (int k = range_start; k <= range_end; k++) {
1152 s = Put(Key(k), range_val);
1153 ASSERT_OK(s);
1154 }
1155 memtable_add++;
1156 } else {
1157 // Use DB::AddFile to insert range
1158 range_val += "_add_file";
1159
1160 // Generate the file containing the range
1161 std::string file_name = sst_files_dir_ + env_->GenerateUniqueId();
1162 ASSERT_OK(sst_file_writer.Open(file_name));
1163 for (int k = range_start; k <= range_end; k++) {
11fdf7f2 1164 s = sst_file_writer.Put(Key(k), range_val);
7c673cae
FG
1165 ASSERT_OK(s);
1166 }
1167 ExternalSstFileInfo file_info;
1168 s = sst_file_writer.Finish(&file_info);
1169 ASSERT_OK(s);
1170
1171 // Insert the generated file
1172 s = DeprecatedAddFile({file_name});
1173 auto it = true_data.lower_bound(Key(range_start));
1174 if (option_config_ != kUniversalCompaction &&
11fdf7f2
TL
1175 option_config_ != kUniversalCompactionMultiLevel &&
1176 option_config_ != kUniversalSubcompactions) {
7c673cae
FG
1177 if (it != true_data.end() && it->first <= Key(range_end)) {
1178 // This range overlap with data already exist in DB
1179 ASSERT_NOK(s);
1180 failed_add_file++;
1181 } else {
1182 ASSERT_OK(s);
1183 success_add_file++;
1184 }
1185 } else {
1186 if ((it != true_data.end() && it->first <= Key(range_end)) ||
11fdf7f2 1187 need_flush || assigned_seqno > 0 || overlap_with_db) {
7c673cae
FG
1188 // This range overlap with data already exist in DB
1189 ASSERT_NOK(s);
1190 failed_add_file++;
1191 } else {
1192 ASSERT_OK(s);
1193 success_add_file++;
1194 }
1195 }
1196 }
1197
1198 if (s.ok()) {
1199 // Update true_data map to include the new inserted data
1200 for (int k = range_start; k <= range_end; k++) {
1201 true_data[Key(k)] = range_val;
1202 }
1203 }
1204
1205 // Flush / Compact the DB
1206 if (i && i % 50 == 0) {
1207 Flush();
1208 }
1209 if (i && i % 75 == 0) {
1210 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
1211 }
1212 }
1213
1214 printf("Total: %" ROCKSDB_PRIszt
1215 " ranges\n"
1216 "AddFile()|Success: %d ranges\n"
1217 "AddFile()|RangeConflict: %d ranges\n"
1218 "Put(): %d ranges\n",
1219 key_ranges.size(), success_add_file, failed_add_file, memtable_add);
1220
1221 // Verify the correctness of the data
1222 for (const auto& kv : true_data) {
1223 ASSERT_EQ(Get(kv.first), kv.second);
1224 }
1225 printf("keys/values verified\n");
1226 DestroyAndRecreateExternalSSTFilesDir();
1227 } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
1228}
1229
494da23a 1230TEST_P(ExternalSSTFileTest, PickedLevel) {
7c673cae
FG
1231 Options options = CurrentOptions();
1232 options.disable_auto_compactions = false;
1233 options.level0_file_num_compaction_trigger = 4;
1234 options.num_levels = 4;
1235 DestroyAndReopen(options);
1236
1237 std::map<std::string, std::string> true_data;
1238
1239 // File 0 will go to last level (L3)
494da23a
TL
1240 ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, -1, false, false, true,
1241 false, false, &true_data));
7c673cae
FG
1242 EXPECT_EQ(FilesPerLevel(), "0,0,0,1");
1243
1244 // File 1 will go to level L2 (since it overlap with file 0 in L3)
494da23a
TL
1245 ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, -1, false, false, true,
1246 false, false, &true_data));
7c673cae
FG
1247 EXPECT_EQ(FilesPerLevel(), "0,0,1,1");
1248
f67539c2 1249 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
7c673cae
FG
1250 {"ExternalSSTFileTest::PickedLevel:0", "BackgroundCallCompaction:0"},
1251 {"DBImpl::BackgroundCompaction:Start",
1252 "ExternalSSTFileTest::PickedLevel:1"},
1253 {"ExternalSSTFileTest::PickedLevel:2",
1254 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
1255 });
f67539c2 1256 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
1257
1258 // Flush 4 files containing the same keys
1259 for (int i = 0; i < 4; i++) {
1260 ASSERT_OK(Put(Key(3), Key(3) + "put"));
1261 ASSERT_OK(Put(Key(8), Key(8) + "put"));
1262 true_data[Key(3)] = Key(3) + "put";
1263 true_data[Key(8)] = Key(8) + "put";
1264 ASSERT_OK(Flush());
1265 }
1266
1267 // Wait for BackgroundCompaction() to be called
1268 TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevel:0");
1269 TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevel:1");
1270
1271 EXPECT_EQ(FilesPerLevel(), "4,0,1,1");
1272
1273 // This file overlaps with file 0 (L3), file 1 (L2) and the
1274 // output of compaction going to L1
494da23a
TL
1275 ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1, false, false, true,
1276 false, false, &true_data));
7c673cae
FG
1277 EXPECT_EQ(FilesPerLevel(), "5,0,1,1");
1278
1279 // This file does not overlap with any file or with the running compaction
1280 ASSERT_OK(GenerateAndAddExternalFile(options, {9000, 9001}, -1, false, false,
494da23a 1281 false, false, false, &true_data));
7c673cae
FG
1282 EXPECT_EQ(FilesPerLevel(), "5,0,1,2");
1283
1284 // Hold compaction from finishing
1285 TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevel:2");
1286
1287 dbfull()->TEST_WaitForCompact();
1288 EXPECT_EQ(FilesPerLevel(), "1,1,1,2");
1289
1290 size_t kcnt = 0;
1291 VerifyDBFromMap(true_data, &kcnt, false);
1292
f67539c2 1293 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
1294}
1295
1296TEST_F(ExternalSSTFileTest, PickedLevelBug) {
1297 Options options = CurrentOptions();
1298 options.disable_auto_compactions = false;
1299 options.level0_file_num_compaction_trigger = 3;
1300 options.num_levels = 2;
1301 DestroyAndReopen(options);
1302
1303 std::vector<int> file_keys;
1304
1305 // file #1 in L0
1306 file_keys = {0, 5, 7};
1307 for (int k : file_keys) {
1308 ASSERT_OK(Put(Key(k), Key(k)));
1309 }
1310 ASSERT_OK(Flush());
1311
1312 // file #2 in L0
1313 file_keys = {4, 6, 8, 9};
1314 for (int k : file_keys) {
1315 ASSERT_OK(Put(Key(k), Key(k)));
1316 }
1317 ASSERT_OK(Flush());
1318
1319 // We have 2 overlapping files in L0
1320 EXPECT_EQ(FilesPerLevel(), "2");
1321
f67539c2 1322 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
7c673cae
FG
1323 {{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
1324 {"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
1325 {"ExternalSSTFileTest::PickedLevelBug:2",
1326 "DBImpl::RunManualCompaction:0"},
1327 {"ExternalSSTFileTest::PickedLevelBug:3",
1328 "DBImpl::RunManualCompaction:1"}});
1329
1330 std::atomic<bool> bg_compact_started(false);
f67539c2 1331 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
7c673cae 1332 "DBImpl::BackgroundCompaction:Start",
11fdf7f2 1333 [&](void* /*arg*/) { bg_compact_started.store(true); });
7c673cae 1334
f67539c2 1335 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
1336
1337 // While writing the MANIFEST start a thread that will ask for compaction
f67539c2 1338 ROCKSDB_NAMESPACE::port::Thread bg_compact([&]() {
7c673cae
FG
1339 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1340 });
1341 TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
1342
1343 // Start a thread that will ingest a new file
f67539c2 1344 ROCKSDB_NAMESPACE::port::Thread bg_addfile([&]() {
7c673cae
FG
1345 file_keys = {1, 2, 3};
1346 ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1));
1347 });
1348
1349 // Wait for AddFile to start picking levels and writing MANIFEST
1350 TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
1351
1352 TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");
1353
1354 // We need to verify that no compactions can run while AddFile is
1355 // ingesting the files into the levels it find suitable. So we will
1356 // wait for 2 seconds to give a chance for compactions to run during
1357 // this period, and then make sure that no compactions where able to run
1358 env_->SleepForMicroseconds(1000000 * 2);
1359 ASSERT_FALSE(bg_compact_started.load());
1360
1361 // Hold AddFile from finishing writing the MANIFEST
1362 TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1");
1363
1364 bg_addfile.join();
1365 bg_compact.join();
1366
1367 dbfull()->TEST_WaitForCompact();
1368
1369 int total_keys = 0;
1370 Iterator* iter = db_->NewIterator(ReadOptions());
1371 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1372 ASSERT_OK(iter->status());
1373 total_keys++;
1374 }
1375 ASSERT_EQ(total_keys, 10);
1376
1377 delete iter;
1378
f67539c2 1379 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
1380}
1381
11fdf7f2
TL
1382TEST_F(ExternalSSTFileTest, IngestNonExistingFile) {
1383 Options options = CurrentOptions();
1384 DestroyAndReopen(options);
1385
1386 Status s = db_->IngestExternalFile({"non_existing_file"},
1387 IngestExternalFileOptions());
1388 ASSERT_NOK(s);
1389
1390 // Verify file deletion is not impacted (verify a bug fix)
1391 ASSERT_OK(Put(Key(1), Key(1)));
1392 ASSERT_OK(Put(Key(9), Key(9)));
1393 ASSERT_OK(Flush());
1394
1395 ASSERT_OK(Put(Key(1), Key(1)));
1396 ASSERT_OK(Put(Key(9), Key(9)));
1397 ASSERT_OK(Flush());
1398
1399 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
494da23a 1400 ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
11fdf7f2
TL
1401
1402 // After full compaction, there should be only 1 file.
1403 std::vector<std::string> files;
1404 env_->GetChildren(dbname_, &files);
1405 int num_sst_files = 0;
1406 for (auto& f : files) {
1407 uint64_t number;
1408 FileType type;
1409 if (ParseFileName(f, &number, &type) && type == kTableFile) {
1410 num_sst_files++;
1411 }
1412 }
1413 ASSERT_EQ(1, num_sst_files);
1414}
1415
7c673cae
FG
1416TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
1417 Options options = CurrentOptions();
1418 options.disable_auto_compactions = false;
1419 options.level0_file_num_compaction_trigger = 2;
1420 options.num_levels = 2;
1421 DestroyAndReopen(options);
1422
1423 std::function<void()> bg_compact = [&]() {
1424 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1425 };
1426
1427 int range_id = 0;
1428 std::vector<int> file_keys;
1429 std::function<void()> bg_addfile = [&]() {
1430 ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id));
1431 };
1432
11fdf7f2 1433 const int num_of_ranges = 1000;
7c673cae 1434 std::vector<port::Thread> threads;
11fdf7f2 1435 while (range_id < num_of_ranges) {
7c673cae
FG
1436 int range_start = range_id * 10;
1437 int range_end = range_start + 10;
1438
1439 file_keys.clear();
1440 for (int k = range_start + 1; k < range_end; k++) {
1441 file_keys.push_back(k);
1442 }
1443 ASSERT_OK(Put(Key(range_start), Key(range_start)));
1444 ASSERT_OK(Put(Key(range_end), Key(range_end)));
1445 ASSERT_OK(Flush());
1446
1447 if (range_id % 10 == 0) {
1448 threads.emplace_back(bg_compact);
1449 }
1450 threads.emplace_back(bg_addfile);
1451
1452 for (auto& t : threads) {
1453 t.join();
1454 }
1455 threads.clear();
1456
1457 range_id++;
1458 }
1459
11fdf7f2 1460 for (int rid = 0; rid < num_of_ranges; rid++) {
7c673cae
FG
1461 int range_start = rid * 10;
1462 int range_end = range_start + 10;
1463
1464 ASSERT_EQ(Get(Key(range_start)), Key(range_start)) << rid;
1465 ASSERT_EQ(Get(Key(range_end)), Key(range_end)) << rid;
1466 for (int k = range_start + 1; k < range_end; k++) {
1467 std::string v = Key(k) + ToString(rid);
1468 ASSERT_EQ(Get(Key(k)), v) << rid;
1469 }
1470 }
1471}
1472
1473TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
1474 Options options = CurrentOptions();
1475 options.disable_auto_compactions = false;
1476 options.level0_file_num_compaction_trigger = 4;
1477 options.level_compaction_dynamic_level_bytes = true;
1478 options.num_levels = 4;
1479 DestroyAndReopen(options);
1480 std::map<std::string, std::string> true_data;
1481
f67539c2 1482 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
7c673cae
FG
1483 {"ExternalSSTFileTest::PickedLevelDynamic:0",
1484 "BackgroundCallCompaction:0"},
1485 {"DBImpl::BackgroundCompaction:Start",
1486 "ExternalSSTFileTest::PickedLevelDynamic:1"},
1487 {"ExternalSSTFileTest::PickedLevelDynamic:2",
1488 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
1489 });
f67539c2 1490 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
1491
1492 // Flush 4 files containing the same keys
1493 for (int i = 0; i < 4; i++) {
1494 for (int k = 20; k <= 30; k++) {
1495 ASSERT_OK(Put(Key(k), Key(k) + "put"));
1496 true_data[Key(k)] = Key(k) + "put";
1497 }
1498 for (int k = 50; k <= 60; k++) {
1499 ASSERT_OK(Put(Key(k), Key(k) + "put"));
1500 true_data[Key(k)] = Key(k) + "put";
1501 }
1502 ASSERT_OK(Flush());
1503 }
1504
1505 // Wait for BackgroundCompaction() to be called
1506 TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:0");
1507 TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:1");
1508
1509 // This file overlaps with the output of the compaction (going to L3)
1510 // so the file will be added to L0 since L3 is the base level
1511 ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1, false,
494da23a 1512 false, true, false, false, &true_data));
7c673cae
FG
1513 EXPECT_EQ(FilesPerLevel(), "5");
1514
1515 // This file does not overlap with the current running compactiong
1516 ASSERT_OK(GenerateAndAddExternalFile(options, {9000, 9001}, -1, false, false,
494da23a 1517 true, false, false, &true_data));
7c673cae
FG
1518 EXPECT_EQ(FilesPerLevel(), "5,0,0,1");
1519
1520 // Hold compaction from finishing
1521 TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:2");
1522
1523 // Output of the compaction will go to L3
1524 dbfull()->TEST_WaitForCompact();
1525 EXPECT_EQ(FilesPerLevel(), "1,0,0,2");
1526
1527 Close();
1528 options.disable_auto_compactions = true;
1529 Reopen(options);
1530
1531 ASSERT_OK(GenerateAndAddExternalFile(options, {1, 15, 19}, -1, false, false,
494da23a 1532 true, false, false, &true_data));
7c673cae
FG
1533 ASSERT_EQ(FilesPerLevel(), "1,0,0,3");
1534
1535 ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1001, 1002}, -1, false,
494da23a 1536 false, true, false, false, &true_data));
7c673cae
FG
1537 ASSERT_EQ(FilesPerLevel(), "1,0,0,4");
1538
1539 ASSERT_OK(GenerateAndAddExternalFile(options, {500, 600, 700}, -1, false,
494da23a 1540 false, true, false, false, &true_data));
7c673cae
FG
1541 ASSERT_EQ(FilesPerLevel(), "1,0,0,5");
1542
1543 // File 5 overlaps with file 2 (L3 / base level)
494da23a
TL
1544 ASSERT_OK(GenerateAndAddExternalFile(options, {2, 10}, -1, false, false, true,
1545 false, false, &true_data));
7c673cae
FG
1546 ASSERT_EQ(FilesPerLevel(), "2,0,0,5");
1547
1548 // File 6 overlaps with file 2 (L3 / base level) and file 5 (L0)
494da23a
TL
1549 ASSERT_OK(GenerateAndAddExternalFile(options, {3, 9}, -1, false, false, true,
1550 false, false, &true_data));
7c673cae
FG
1551 ASSERT_EQ(FilesPerLevel(), "3,0,0,5");
1552
1553 // Verify data in files
1554 size_t kcnt = 0;
1555 VerifyDBFromMap(true_data, &kcnt, false);
1556
1557 // Write range [5 => 10] to L0
1558 for (int i = 5; i <= 10; i++) {
1559 std::string k = Key(i);
1560 std::string v = k + "put";
1561 ASSERT_OK(Put(k, v));
1562 true_data[k] = v;
1563 }
1564 ASSERT_OK(Flush());
1565 ASSERT_EQ(FilesPerLevel(), "4,0,0,5");
1566
1567 // File 7 overlaps with file 4 (L3)
1568 ASSERT_OK(GenerateAndAddExternalFile(options, {650, 651, 652}, -1, false,
494da23a 1569 false, true, false, false, &true_data));
7c673cae
FG
1570 ASSERT_EQ(FilesPerLevel(), "5,0,0,5");
1571
1572 VerifyDBFromMap(true_data, &kcnt, false);
1573
f67539c2 1574 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
1575}
1576
1577TEST_F(ExternalSSTFileTest, AddExternalSstFileWithCustomCompartor) {
1578 Options options = CurrentOptions();
1579 options.comparator = ReverseBytewiseComparator();
1580 DestroyAndReopen(options);
1581
1582 SstFileWriter sst_file_writer(EnvOptions(), options);
1583
1584 // Generate files with these key ranges
1585 // {14 -> 0}
1586 // {24 -> 10}
1587 // {34 -> 20}
1588 // {44 -> 30}
1589 // ..
1590 std::vector<std::string> generated_files;
1591 for (int i = 0; i < 10; i++) {
1592 std::string file_name = sst_files_dir_ + env_->GenerateUniqueId();
1593 ASSERT_OK(sst_file_writer.Open(file_name));
1594
1595 int range_end = i * 10;
1596 int range_start = range_end + 15;
1597 for (int k = (range_start - 1); k >= range_end; k--) {
11fdf7f2 1598 ASSERT_OK(sst_file_writer.Put(Key(k), Key(k)));
7c673cae
FG
1599 }
1600 ExternalSstFileInfo file_info;
1601 ASSERT_OK(sst_file_writer.Finish(&file_info));
1602 generated_files.push_back(file_name);
1603 }
1604
1605 std::vector<std::string> in_files;
1606
1607 // These 2nd and 3rd files overlap with each other
1608 in_files = {generated_files[0], generated_files[4], generated_files[5],
1609 generated_files[7]};
1610 ASSERT_NOK(DeprecatedAddFile(in_files));
1611
1612 // These 2 files dont overlap with each other
1613 in_files = {generated_files[0], generated_files[2]};
1614 ASSERT_OK(DeprecatedAddFile(in_files));
1615
1616 // These 2 files dont overlap with each other but overlap with keys in DB
1617 in_files = {generated_files[3], generated_files[7]};
1618 ASSERT_NOK(DeprecatedAddFile(in_files));
1619
1620 // Files dont overlap and dont overlap with DB key range
1621 in_files = {generated_files[4], generated_files[6], generated_files[8]};
1622 ASSERT_OK(DeprecatedAddFile(in_files));
1623
1624 for (int i = 0; i < 100; i++) {
1625 if (i % 20 <= 14) {
1626 ASSERT_EQ(Get(Key(i)), Key(i));
1627 } else {
1628 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
1629 }
1630 }
1631}
1632
1633TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) {
1634 Options options = CurrentOptions();
1635 options.num_levels = 3;
1636 options.IncreaseParallelism(20);
1637 DestroyAndReopen(options);
1638
1639 ASSERT_OK(GenerateAndAddExternalFile(options, {1, 4}, 1)); // L3
1640 ASSERT_OK(GenerateAndAddExternalFile(options, {2, 3}, 2)); // L2
1641
1642 ASSERT_OK(GenerateAndAddExternalFile(options, {10, 14}, 3)); // L3
1643 ASSERT_OK(GenerateAndAddExternalFile(options, {12, 13}, 4)); // L2
1644
1645 ASSERT_OK(GenerateAndAddExternalFile(options, {20, 24}, 5)); // L3
1646 ASSERT_OK(GenerateAndAddExternalFile(options, {22, 23}, 6)); // L2
1647
f67539c2 1648 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 1649 "CompactionJob::Run():Start", [&](void* /*arg*/) {
7c673cae
FG
1650 // fit in L3 but will overlap with compaction so will be added
1651 // to L2 but a compaction will trivially move it to L3
1652 // and break LSM consistency
11fdf7f2
TL
1653 static std::atomic<bool> called = {false};
1654 if (!called) {
1655 called = true;
1656 ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}}));
1657 ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7));
1658 }
7c673cae 1659 });
f67539c2 1660 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
1661
1662 CompactRangeOptions cro;
1663 cro.exclusive_manual_compaction = false;
1664 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
1665
1666 dbfull()->TEST_WaitForCompact();
1667
f67539c2 1668 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
1669}
1670
1671TEST_F(ExternalSSTFileTest, CompactAddedFiles) {
1672 Options options = CurrentOptions();
1673 options.num_levels = 3;
1674 DestroyAndReopen(options);
1675
1676 ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, 1)); // L3
1677 ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, 2)); // L2
1678 ASSERT_OK(GenerateAndAddExternalFile(options, {3, 8}, 3)); // L1
1679 ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, 4)); // L0
1680
1681 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1682}
1683
1684TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) {
1685 Options options = CurrentOptions();
1686 DestroyAndReopen(options);
1687 std::string file_path = sst_files_dir_ + "/not_shared";
1688 SstFileWriter sst_file_writer(EnvOptions(), options);
1689
1690 std::string suffix(100, 'X');
1691 ASSERT_OK(sst_file_writer.Open(file_path));
11fdf7f2
TL
1692 ASSERT_OK(sst_file_writer.Put("A" + suffix, "VAL"));
1693 ASSERT_OK(sst_file_writer.Put("BB" + suffix, "VAL"));
1694 ASSERT_OK(sst_file_writer.Put("CC" + suffix, "VAL"));
1695 ASSERT_OK(sst_file_writer.Put("CXD" + suffix, "VAL"));
1696 ASSERT_OK(sst_file_writer.Put("CZZZ" + suffix, "VAL"));
1697 ASSERT_OK(sst_file_writer.Put("ZAAAX" + suffix, "VAL"));
7c673cae
FG
1698
1699 ASSERT_OK(sst_file_writer.Finish());
1700 ASSERT_OK(DeprecatedAddFile({file_path}));
1701}
1702
f67539c2
TL
1703TEST_F(ExternalSSTFileTest, WithUnorderedWrite) {
1704 SyncPoint::GetInstance()->DisableProcessing();
1705 SyncPoint::GetInstance()->LoadDependency(
1706 {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL",
1707 "ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL"},
1708 {"DBImpl::WaitForPendingWrites:BeforeBlock",
1709 "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}});
1710 SyncPoint::GetInstance()->SetCallBack(
1711 "DBImpl::IngestExternalFile:NeedFlush", [&](void* need_flush) {
1712 ASSERT_TRUE(*reinterpret_cast<bool*>(need_flush));
1713 });
1714
1715 Options options = CurrentOptions();
1716 options.unordered_write = true;
1717 DestroyAndReopen(options);
1718 Put("foo", "v1");
1719 SyncPoint::GetInstance()->EnableProcessing();
1720 port::Thread writer([&]() { Put("bar", "v2"); });
1721
1722 TEST_SYNC_POINT("ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL");
1723 ASSERT_OK(GenerateAndAddExternalFile(options, {{"bar", "v3"}}, -1,
1724 true /* allow_global_seqno */));
1725 ASSERT_EQ(Get("bar"), "v3");
1726
1727 writer.join();
1728 SyncPoint::GetInstance()->DisableProcessing();
1729 SyncPoint::GetInstance()->ClearAllCallBacks();
1730}
1731
494da23a 1732TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) {
7c673cae
FG
1733 Options options = CurrentOptions();
1734 options.IncreaseParallelism(20);
1735 options.level0_slowdown_writes_trigger = 256;
1736 options.level0_stop_writes_trigger = 256;
1737
494da23a
TL
1738 bool write_global_seqno = std::get<0>(GetParam());
1739 bool verify_checksums_before_ingest = std::get<1>(GetParam());
7c673cae
FG
1740 for (int iter = 0; iter < 2; iter++) {
1741 bool write_to_memtable = (iter == 0);
1742 DestroyAndReopen(options);
1743
1744 Random rnd(301);
1745 std::map<std::string, std::string> true_data;
11fdf7f2 1746 for (int i = 0; i < 500; i++) {
7c673cae
FG
1747 std::vector<std::pair<std::string, std::string>> random_data;
1748 for (int j = 0; j < 100; j++) {
1749 std::string k;
1750 std::string v;
1751 test::RandomString(&rnd, rnd.Next() % 20, &k);
1752 test::RandomString(&rnd, rnd.Next() % 50, &v);
1753 random_data.emplace_back(k, v);
1754 }
1755
1756 if (write_to_memtable && rnd.OneIn(4)) {
1757 // 25% of writes go through memtable
1758 for (auto& entry : random_data) {
1759 ASSERT_OK(Put(entry.first, entry.second));
1760 true_data[entry.first] = entry.second;
1761 }
1762 } else {
494da23a
TL
1763 ASSERT_OK(GenerateAndAddExternalFile(
1764 options, random_data, -1, true, write_global_seqno,
1765 verify_checksums_before_ingest, false, true, &true_data));
7c673cae
FG
1766 }
1767 }
1768 size_t kcnt = 0;
1769 VerifyDBFromMap(true_data, &kcnt, false);
1770 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
1771 VerifyDBFromMap(true_data, &kcnt, false);
1772 }
1773}
1774
494da23a 1775TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoAssignedLevel) {
7c673cae
FG
1776 Options options = CurrentOptions();
1777 options.num_levels = 5;
1778 options.disable_auto_compactions = true;
1779 DestroyAndReopen(options);
1780 std::vector<std::pair<std::string, std::string>> file_data;
1781 std::map<std::string, std::string> true_data;
1782
1783 // Insert 100 -> 200 into the memtable
1784 for (int i = 100; i <= 200; i++) {
1785 ASSERT_OK(Put(Key(i), "memtable"));
1786 true_data[Key(i)] = "memtable";
1787 }
1788
1789 // Insert 0 -> 20 using AddFile
1790 file_data.clear();
1791 for (int i = 0; i <= 20; i++) {
1792 file_data.emplace_back(Key(i), "L4");
1793 }
494da23a
TL
1794 bool write_global_seqno = std::get<0>(GetParam());
1795 bool verify_checksums_before_ingest = std::get<1>(GetParam());
1796 ASSERT_OK(GenerateAndAddExternalFile(
1797 options, file_data, -1, true, write_global_seqno,
1798 verify_checksums_before_ingest, false, false, &true_data));
7c673cae
FG
1799
1800 // This file dont overlap with anything in the DB, will go to L4
1801 ASSERT_EQ("0,0,0,0,1", FilesPerLevel());
1802
1803 // Insert 80 -> 130 using AddFile
1804 file_data.clear();
1805 for (int i = 80; i <= 130; i++) {
1806 file_data.emplace_back(Key(i), "L0");
1807 }
494da23a
TL
1808 ASSERT_OK(GenerateAndAddExternalFile(
1809 options, file_data, -1, true, write_global_seqno,
1810 verify_checksums_before_ingest, false, false, &true_data));
7c673cae
FG
1811
1812 // This file overlap with the memtable, so it will flush it and add
1813 // it self to L0
1814 ASSERT_EQ("2,0,0,0,1", FilesPerLevel());
1815
1816 // Insert 30 -> 50 using AddFile
1817 file_data.clear();
1818 for (int i = 30; i <= 50; i++) {
1819 file_data.emplace_back(Key(i), "L4");
1820 }
494da23a
TL
1821 ASSERT_OK(GenerateAndAddExternalFile(
1822 options, file_data, -1, true, write_global_seqno,
1823 verify_checksums_before_ingest, false, false, &true_data));
7c673cae
FG
1824
1825 // This file dont overlap with anything in the DB and fit in L4 as well
1826 ASSERT_EQ("2,0,0,0,2", FilesPerLevel());
1827
1828 // Insert 10 -> 40 using AddFile
1829 file_data.clear();
1830 for (int i = 10; i <= 40; i++) {
1831 file_data.emplace_back(Key(i), "L3");
1832 }
494da23a
TL
1833 ASSERT_OK(GenerateAndAddExternalFile(
1834 options, file_data, -1, true, write_global_seqno,
1835 verify_checksums_before_ingest, false, false, &true_data));
7c673cae
FG
1836
1837 // This file overlap with files in L4, we will ingest it in L3
1838 ASSERT_EQ("2,0,0,1,2", FilesPerLevel());
1839
1840 size_t kcnt = 0;
1841 VerifyDBFromMap(true_data, &kcnt, false);
1842}
1843
494da23a 1844TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoMemtableFlush) {
7c673cae
FG
1845 Options options = CurrentOptions();
1846 DestroyAndReopen(options);
1847 uint64_t entries_in_memtable;
1848 std::map<std::string, std::string> true_data;
1849
1850 for (int k : {10, 20, 40, 80}) {
1851 ASSERT_OK(Put(Key(k), "memtable"));
1852 true_data[Key(k)] = "memtable";
1853 }
1854 db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
1855 &entries_in_memtable);
1856 ASSERT_GE(entries_in_memtable, 1);
1857
494da23a
TL
1858 bool write_global_seqno = std::get<0>(GetParam());
1859 bool verify_checksums_before_ingest = std::get<1>(GetParam());
7c673cae 1860 // No need for flush
494da23a
TL
1861 ASSERT_OK(GenerateAndAddExternalFile(
1862 options, {90, 100, 110}, -1, true, write_global_seqno,
1863 verify_checksums_before_ingest, false, false, &true_data));
7c673cae
FG
1864 db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
1865 &entries_in_memtable);
1866 ASSERT_GE(entries_in_memtable, 1);
1867
1868 // This file will flush the memtable
494da23a
TL
1869 ASSERT_OK(GenerateAndAddExternalFile(
1870 options, {19, 20, 21}, -1, true, write_global_seqno,
1871 verify_checksums_before_ingest, false, false, &true_data));
7c673cae
FG
1872 db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
1873 &entries_in_memtable);
1874 ASSERT_EQ(entries_in_memtable, 0);
1875
1876 for (int k : {200, 201, 205, 206}) {
1877 ASSERT_OK(Put(Key(k), "memtable"));
1878 true_data[Key(k)] = "memtable";
1879 }
1880 db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
1881 &entries_in_memtable);
1882 ASSERT_GE(entries_in_memtable, 1);
1883
1884 // No need for flush, this file keys fit between the memtable keys
494da23a
TL
1885 ASSERT_OK(GenerateAndAddExternalFile(
1886 options, {202, 203, 204}, -1, true, write_global_seqno,
1887 verify_checksums_before_ingest, false, false, &true_data));
7c673cae
FG
1888 db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
1889 &entries_in_memtable);
1890 ASSERT_GE(entries_in_memtable, 1);
1891
1892 // This file will flush the memtable
494da23a
TL
1893 ASSERT_OK(GenerateAndAddExternalFile(
1894 options, {206, 207}, -1, true, write_global_seqno,
1895 verify_checksums_before_ingest, false, false, &true_data));
7c673cae
FG
1896 db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
1897 &entries_in_memtable);
1898 ASSERT_EQ(entries_in_memtable, 0);
1899
1900 size_t kcnt = 0;
1901 VerifyDBFromMap(true_data, &kcnt, false);
1902}
1903
494da23a 1904TEST_P(ExternalSSTFileTest, L0SortingIssue) {
7c673cae
FG
1905 Options options = CurrentOptions();
1906 options.num_levels = 2;
1907 DestroyAndReopen(options);
1908 std::map<std::string, std::string> true_data;
1909
1910 ASSERT_OK(Put(Key(1), "memtable"));
1911 ASSERT_OK(Put(Key(10), "memtable"));
1912
494da23a
TL
1913 bool write_global_seqno = std::get<0>(GetParam());
1914 bool verify_checksums_before_ingest = std::get<1>(GetParam());
7c673cae 1915 // No Flush needed, No global seqno needed, Ingest in L1
494da23a
TL
1916 ASSERT_OK(
1917 GenerateAndAddExternalFile(options, {7, 8}, -1, true, write_global_seqno,
1918 verify_checksums_before_ingest, false, false));
7c673cae 1919 // No Flush needed, but need a global seqno, Ingest in L0
494da23a
TL
1920 ASSERT_OK(
1921 GenerateAndAddExternalFile(options, {7, 8}, -1, true, write_global_seqno,
1922 verify_checksums_before_ingest, false, false));
7c673cae
FG
1923 printf("%s\n", FilesPerLevel().c_str());
1924
1925 // Overwrite what we added using external files
1926 ASSERT_OK(Put(Key(7), "memtable"));
1927 ASSERT_OK(Put(Key(8), "memtable"));
1928
1929 // Read values from memtable
1930 ASSERT_EQ(Get(Key(7)), "memtable");
1931 ASSERT_EQ(Get(Key(8)), "memtable");
1932
1933 // Flush and read from L0
1934 ASSERT_OK(Flush());
1935 printf("%s\n", FilesPerLevel().c_str());
1936 ASSERT_EQ(Get(Key(7)), "memtable");
1937 ASSERT_EQ(Get(Key(8)), "memtable");
1938}
1939
1940TEST_F(ExternalSSTFileTest, CompactionDeadlock) {
1941 Options options = CurrentOptions();
1942 options.num_levels = 2;
1943 options.level0_file_num_compaction_trigger = 4;
1944 options.level0_slowdown_writes_trigger = 4;
1945 options.level0_stop_writes_trigger = 4;
1946 DestroyAndReopen(options);
1947
1948 // atomic conter of currently running bg threads
1949 std::atomic<int> running_threads(0);
1950
f67539c2 1951 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
7c673cae
FG
1952 {"DBImpl::DelayWrite:Wait", "ExternalSSTFileTest::DeadLock:0"},
1953 {"ExternalSSTFileTest::DeadLock:1", "DBImpl::AddFile:Start"},
1954 {"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::DeadLock:2"},
1955 {"ExternalSSTFileTest::DeadLock:3", "BackgroundCallCompaction:0"},
1956 });
f67539c2 1957 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
1958
1959 // Start ingesting and extrnal file in the background
f67539c2 1960 ROCKSDB_NAMESPACE::port::Thread bg_ingest_file([&]() {
7c673cae
FG
1961 running_threads += 1;
1962 ASSERT_OK(GenerateAndAddExternalFile(options, {5, 6}));
1963 running_threads -= 1;
1964 });
1965
1966 ASSERT_OK(Put(Key(1), "memtable"));
1967 ASSERT_OK(Flush());
1968
1969 ASSERT_OK(Put(Key(2), "memtable"));
1970 ASSERT_OK(Flush());
1971
1972 ASSERT_OK(Put(Key(3), "memtable"));
1973 ASSERT_OK(Flush());
1974
1975 ASSERT_OK(Put(Key(4), "memtable"));
1976 ASSERT_OK(Flush());
1977
1978 // This thread will try to insert into the memtable but since we have 4 L0
1979 // files this thread will be blocked and hold the writer thread
f67539c2 1980 ROCKSDB_NAMESPACE::port::Thread bg_block_put([&]() {
7c673cae
FG
1981 running_threads += 1;
1982 ASSERT_OK(Put(Key(10), "memtable"));
1983 running_threads -= 1;
1984 });
1985
1986 // Make sure DelayWrite is called first
1987 TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:0");
1988
1989 // `DBImpl::AddFile:Start` will wait until we be here
1990 TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:1");
1991
1992 // Wait for IngestExternalFile() to start and aquire mutex
1993 TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:2");
1994
1995 // Now let compaction start
1996 TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:3");
1997
1998 // Wait for max 5 seconds, if we did not finish all bg threads
1999 // then we hit the deadlock bug
2000 for (int i = 0; i < 10; i++) {
2001 if (running_threads.load() == 0) {
2002 break;
2003 }
2004 env_->SleepForMicroseconds(500000);
2005 }
2006
2007 ASSERT_EQ(running_threads.load(), 0);
2008
2009 bg_ingest_file.join();
2010 bg_block_put.join();
2011}
2012
2013TEST_F(ExternalSSTFileTest, DirtyExit) {
2014 Options options = CurrentOptions();
2015 DestroyAndReopen(options);
2016 std::string file_path = sst_files_dir_ + "/dirty_exit";
2017 std::unique_ptr<SstFileWriter> sst_file_writer;
2018
2019 // Destruct SstFileWriter without calling Finish()
2020 sst_file_writer.reset(new SstFileWriter(EnvOptions(), options));
2021 ASSERT_OK(sst_file_writer->Open(file_path));
2022 sst_file_writer.reset();
2023
2024 // Destruct SstFileWriter with a failing Finish
2025 sst_file_writer.reset(new SstFileWriter(EnvOptions(), options));
2026 ASSERT_OK(sst_file_writer->Open(file_path));
2027 ASSERT_NOK(sst_file_writer->Finish());
2028}
2029
2030TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
2031 Options options = CurrentOptions();
2032 CreateAndReopenWithCF({"koko", "toto"}, options);
2033
2034 SstFileWriter sfw_default(EnvOptions(), options, handles_[0]);
2035 SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
2036 SstFileWriter sfw_cf2(EnvOptions(), options, handles_[2]);
2037 SstFileWriter sfw_unknown(EnvOptions(), options);
2038
2039 // default_cf.sst
2040 const std::string cf_default_sst = sst_files_dir_ + "/default_cf.sst";
2041 ASSERT_OK(sfw_default.Open(cf_default_sst));
11fdf7f2
TL
2042 ASSERT_OK(sfw_default.Put("K1", "V1"));
2043 ASSERT_OK(sfw_default.Put("K2", "V2"));
7c673cae
FG
2044 ASSERT_OK(sfw_default.Finish());
2045
2046 // cf1.sst
2047 const std::string cf1_sst = sst_files_dir_ + "/cf1.sst";
2048 ASSERT_OK(sfw_cf1.Open(cf1_sst));
11fdf7f2
TL
2049 ASSERT_OK(sfw_cf1.Put("K3", "V1"));
2050 ASSERT_OK(sfw_cf1.Put("K4", "V2"));
7c673cae
FG
2051 ASSERT_OK(sfw_cf1.Finish());
2052
2053 // cf_unknown.sst
2054 const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst";
2055 ASSERT_OK(sfw_unknown.Open(unknown_sst));
11fdf7f2
TL
2056 ASSERT_OK(sfw_unknown.Put("K5", "V1"));
2057 ASSERT_OK(sfw_unknown.Put("K6", "V2"));
7c673cae
FG
2058 ASSERT_OK(sfw_unknown.Finish());
2059
2060 IngestExternalFileOptions ifo;
2061
2062 // SST CF dont match
2063 ASSERT_NOK(db_->IngestExternalFile(handles_[0], {cf1_sst}, ifo));
2064 // SST CF dont match
2065 ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf1_sst}, ifo));
2066 // SST CF match
2067 ASSERT_OK(db_->IngestExternalFile(handles_[1], {cf1_sst}, ifo));
2068
2069 // SST CF dont match
2070 ASSERT_NOK(db_->IngestExternalFile(handles_[1], {cf_default_sst}, ifo));
2071 // SST CF dont match
2072 ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf_default_sst}, ifo));
2073 // SST CF match
2074 ASSERT_OK(db_->IngestExternalFile(handles_[0], {cf_default_sst}, ifo));
2075
2076 // SST CF unknown
2077 ASSERT_OK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo));
2078 // SST CF unknown
2079 ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
2080 // SST CF unknown
2081 ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo));
2082
2083 // Cannot ingest a file into a dropped CF
2084 ASSERT_OK(db_->DropColumnFamily(handles_[1]));
2085 ASSERT_NOK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo));
2086
2087 // CF was not dropped, ok to Ingest
2088 ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
2089}
2090
11fdf7f2 2091/*
f67539c2
TL
2092 * Test and verify the functionality of ingestion_options.move_files and
2093 * ingestion_options.failed_move_fall_back_to_copy
11fdf7f2 2094 */
f67539c2
TL
2095TEST_P(ExternSSTFileLinkFailFallbackTest, LinkFailFallBackExternalSst) {
2096 const bool fail_link = std::get<0>(GetParam());
2097 const bool failed_move_fall_back_to_copy = std::get<1>(GetParam());
2098 test_env_->set_fail_link(fail_link);
2099 const EnvOptions env_options;
2100 DestroyAndReopen(options_);
11fdf7f2 2101 const int kNumKeys = 10000;
f67539c2
TL
2102 IngestExternalFileOptions ifo;
2103 ifo.move_files = true;
2104 ifo.failed_move_fall_back_to_copy = failed_move_fall_back_to_copy;
11fdf7f2
TL
2105
2106 std::string file_path = sst_files_dir_ + "file1.sst";
2107 // Create SstFileWriter for default column family
f67539c2 2108 SstFileWriter sst_file_writer(env_options, options_);
11fdf7f2
TL
2109 ASSERT_OK(sst_file_writer.Open(file_path));
2110 for (int i = 0; i < kNumKeys; i++) {
2111 ASSERT_OK(sst_file_writer.Put(Key(i), Key(i) + "_value"));
2112 }
2113 ASSERT_OK(sst_file_writer.Finish());
2114 uint64_t file_size = 0;
2115 ASSERT_OK(env_->GetFileSize(file_path, &file_size));
2116
f67539c2
TL
2117 bool copyfile = false;
2118 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2119 "ExternalSstFileIngestionJob::Prepare:CopyFile",
2120 [&](void* /* arg */) { copyfile = true; });
2121 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2122
2123 const Status s = db_->IngestExternalFile({file_path}, ifo);
11fdf7f2
TL
2124
2125 ColumnFamilyHandleImpl* cfh =
2126 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
2127 ColumnFamilyData* cfd = cfh->cfd();
2128 const InternalStats* internal_stats_ptr = cfd->internal_stats();
2129 const std::vector<InternalStats::CompactionStats>& comp_stats =
2130 internal_stats_ptr->TEST_GetCompactionStats();
2131 uint64_t bytes_copied = 0;
2132 uint64_t bytes_moved = 0;
2133 for (const auto& stats : comp_stats) {
2134 bytes_copied += stats.bytes_written;
2135 bytes_moved += stats.bytes_moved;
2136 }
f67539c2
TL
2137
2138 if (!fail_link) {
2139 // Link operation succeeds. External SST should be moved.
2140 ASSERT_OK(s);
11fdf7f2
TL
2141 ASSERT_EQ(0, bytes_copied);
2142 ASSERT_EQ(file_size, bytes_moved);
f67539c2 2143 ASSERT_FALSE(copyfile);
11fdf7f2 2144 } else {
f67539c2
TL
2145 // Link operation fails.
2146 ASSERT_EQ(0, bytes_moved);
2147 if (failed_move_fall_back_to_copy) {
2148 ASSERT_OK(s);
2149 // Copy file is true since a failed link falls back to copy file.
2150 ASSERT_TRUE(copyfile);
2151 ASSERT_EQ(file_size, bytes_copied);
2152 } else {
2153 ASSERT_TRUE(s.IsNotSupported());
2154 // Copy file is false since a failed link does not fall back to copy file.
2155 ASSERT_FALSE(copyfile);
2156 ASSERT_EQ(0, bytes_copied);
2157 }
11fdf7f2 2158 }
f67539c2 2159 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
11fdf7f2
TL
2160}
2161
7c673cae
FG
2162class TestIngestExternalFileListener : public EventListener {
2163 public:
11fdf7f2 2164 void OnExternalFileIngested(DB* /*db*/,
7c673cae
FG
2165 const ExternalFileIngestionInfo& info) override {
2166 ingested_files.push_back(info);
2167 }
2168
2169 std::vector<ExternalFileIngestionInfo> ingested_files;
2170};
2171
494da23a 2172TEST_P(ExternalSSTFileTest, IngestionListener) {
7c673cae
FG
2173 Options options = CurrentOptions();
2174 TestIngestExternalFileListener* listener =
2175 new TestIngestExternalFileListener();
2176 options.listeners.emplace_back(listener);
2177 CreateAndReopenWithCF({"koko", "toto"}, options);
2178
494da23a
TL
2179 bool write_global_seqno = std::get<0>(GetParam());
2180 bool verify_checksums_before_ingest = std::get<1>(GetParam());
7c673cae 2181 // Ingest into default cf
494da23a
TL
2182 ASSERT_OK(GenerateAndAddExternalFile(
2183 options, {1, 2}, -1, true, write_global_seqno,
2184 verify_checksums_before_ingest, false, true, nullptr, handles_[0]));
7c673cae
FG
2185 ASSERT_EQ(listener->ingested_files.size(), 1);
2186 ASSERT_EQ(listener->ingested_files.back().cf_name, "default");
2187 ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
2188 ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
2189 0);
2190 ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
2191 "default");
2192
2193 // Ingest into cf1
494da23a
TL
2194 ASSERT_OK(GenerateAndAddExternalFile(
2195 options, {1, 2}, -1, true, write_global_seqno,
2196 verify_checksums_before_ingest, false, true, nullptr, handles_[1]));
7c673cae
FG
2197 ASSERT_EQ(listener->ingested_files.size(), 2);
2198 ASSERT_EQ(listener->ingested_files.back().cf_name, "koko");
2199 ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
2200 ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
2201 1);
2202 ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
2203 "koko");
2204
2205 // Ingest into cf2
494da23a
TL
2206 ASSERT_OK(GenerateAndAddExternalFile(
2207 options, {1, 2}, -1, true, write_global_seqno,
2208 verify_checksums_before_ingest, false, true, nullptr, handles_[2]));
7c673cae
FG
2209 ASSERT_EQ(listener->ingested_files.size(), 3);
2210 ASSERT_EQ(listener->ingested_files.back().cf_name, "toto");
2211 ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
2212 ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
2213 2);
2214 ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
2215 "toto");
2216}
2217
2218TEST_F(ExternalSSTFileTest, SnapshotInconsistencyBug) {
2219 Options options = CurrentOptions();
2220 DestroyAndReopen(options);
2221 const int kNumKeys = 10000;
2222
2223 // Insert keys using normal path and take a snapshot
2224 for (int i = 0; i < kNumKeys; i++) {
2225 ASSERT_OK(Put(Key(i), Key(i) + "_V1"));
2226 }
2227 const Snapshot* snap = db_->GetSnapshot();
2228
2229 // Overwrite all keys using IngestExternalFile
2230 std::string sst_file_path = sst_files_dir_ + "file1.sst";
2231 SstFileWriter sst_file_writer(EnvOptions(), options);
2232 ASSERT_OK(sst_file_writer.Open(sst_file_path));
2233 for (int i = 0; i < kNumKeys; i++) {
11fdf7f2 2234 ASSERT_OK(sst_file_writer.Put(Key(i), Key(i) + "_V2"));
7c673cae
FG
2235 }
2236 ASSERT_OK(sst_file_writer.Finish());
2237
2238 IngestExternalFileOptions ifo;
2239 ifo.move_files = true;
2240 ASSERT_OK(db_->IngestExternalFile({sst_file_path}, ifo));
2241
2242 for (int i = 0; i < kNumKeys; i++) {
2243 ASSERT_EQ(Get(Key(i), snap), Key(i) + "_V1");
2244 ASSERT_EQ(Get(Key(i)), Key(i) + "_V2");
2245 }
2246
2247 db_->ReleaseSnapshot(snap);
2248}
11fdf7f2 2249
494da23a 2250TEST_P(ExternalSSTFileTest, IngestBehind) {
11fdf7f2
TL
2251 Options options = CurrentOptions();
2252 options.compaction_style = kCompactionStyleUniversal;
2253 options.num_levels = 3;
2254 options.disable_auto_compactions = false;
2255 DestroyAndReopen(options);
2256 std::vector<std::pair<std::string, std::string>> file_data;
2257 std::map<std::string, std::string> true_data;
2258
2259 // Insert 100 -> 200 into the memtable
2260 for (int i = 100; i <= 200; i++) {
2261 ASSERT_OK(Put(Key(i), "memtable"));
2262 true_data[Key(i)] = "memtable";
2263 }
2264
2265 // Insert 100 -> 200 using IngestExternalFile
2266 file_data.clear();
2267 for (int i = 0; i <= 20; i++) {
2268 file_data.emplace_back(Key(i), "ingest_behind");
2269 }
2270
494da23a
TL
2271 bool allow_global_seqno = true;
2272 bool ingest_behind = true;
2273 bool write_global_seqno = std::get<0>(GetParam());
2274 bool verify_checksums_before_ingest = std::get<1>(GetParam());
11fdf7f2
TL
2275
2276 // Can't ingest behind since allow_ingest_behind isn't set to true
494da23a
TL
2277 ASSERT_NOK(GenerateAndAddExternalFile(
2278 options, file_data, -1, allow_global_seqno, write_global_seqno,
2279 verify_checksums_before_ingest, ingest_behind, false /*sort_data*/,
2280 &true_data));
11fdf7f2
TL
2281
2282 options.allow_ingest_behind = true;
2283 // check that we still can open the DB, as num_levels should be
2284 // sanitized to 3
2285 options.num_levels = 2;
2286 DestroyAndReopen(options);
2287
2288 options.num_levels = 3;
2289 DestroyAndReopen(options);
2290 // Insert 100 -> 200 into the memtable
2291 for (int i = 100; i <= 200; i++) {
2292 ASSERT_OK(Put(Key(i), "memtable"));
2293 true_data[Key(i)] = "memtable";
2294 }
2295 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
2296 // Universal picker should go at second from the bottom level
2297 ASSERT_EQ("0,1", FilesPerLevel());
494da23a
TL
2298 ASSERT_OK(GenerateAndAddExternalFile(
2299 options, file_data, -1, allow_global_seqno, write_global_seqno,
2300 verify_checksums_before_ingest, true /*ingest_behind*/,
2301 false /*sort_data*/, &true_data));
11fdf7f2
TL
2302 ASSERT_EQ("0,1,1", FilesPerLevel());
2303 // this time ingest should fail as the file doesn't fit to the bottom level
494da23a
TL
2304 ASSERT_NOK(GenerateAndAddExternalFile(
2305 options, file_data, -1, allow_global_seqno, write_global_seqno,
2306 verify_checksums_before_ingest, true /*ingest_behind*/,
2307 false /*sort_data*/, &true_data));
11fdf7f2
TL
2308 ASSERT_EQ("0,1,1", FilesPerLevel());
2309 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
2310 // bottom level should be empty
2311 ASSERT_EQ("0,1", FilesPerLevel());
2312
2313 size_t kcnt = 0;
2314 VerifyDBFromMap(true_data, &kcnt, false);
2315}
2316
2317TEST_F(ExternalSSTFileTest, SkipBloomFilter) {
2318 Options options = CurrentOptions();
2319
2320 BlockBasedTableOptions table_options;
2321 table_options.filter_policy.reset(NewBloomFilterPolicy(10));
2322 table_options.cache_index_and_filter_blocks = true;
2323 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
2324
2325
2326 // Create external SST file and include bloom filters
f67539c2 2327 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
11fdf7f2
TL
2328 DestroyAndReopen(options);
2329 {
2330 std::string file_path = sst_files_dir_ + "sst_with_bloom.sst";
2331 SstFileWriter sst_file_writer(EnvOptions(), options);
2332 ASSERT_OK(sst_file_writer.Open(file_path));
2333 ASSERT_OK(sst_file_writer.Put("Key1", "Value1"));
2334 ASSERT_OK(sst_file_writer.Finish());
2335
2336 ASSERT_OK(
2337 db_->IngestExternalFile({file_path}, IngestExternalFileOptions()));
2338
2339 ASSERT_EQ(Get("Key1"), "Value1");
2340 ASSERT_GE(
2341 options.statistics->getTickerCount(Tickers::BLOCK_CACHE_FILTER_ADD), 1);
2342 }
2343
2344 // Create external SST file but skip bloom filters
f67539c2 2345 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
11fdf7f2
TL
2346 DestroyAndReopen(options);
2347 {
2348 std::string file_path = sst_files_dir_ + "sst_with_no_bloom.sst";
2349 SstFileWriter sst_file_writer(EnvOptions(), options, nullptr, true,
2350 Env::IOPriority::IO_TOTAL,
2351 true /* skip_filters */);
2352 ASSERT_OK(sst_file_writer.Open(file_path));
2353 ASSERT_OK(sst_file_writer.Put("Key1", "Value1"));
2354 ASSERT_OK(sst_file_writer.Finish());
2355
2356 ASSERT_OK(
2357 db_->IngestExternalFile({file_path}, IngestExternalFileOptions()));
2358
2359 ASSERT_EQ(Get("Key1"), "Value1");
2360 ASSERT_EQ(
2361 options.statistics->getTickerCount(Tickers::BLOCK_CACHE_FILTER_ADD), 0);
2362 }
2363}
2364
494da23a
TL
2365TEST_F(ExternalSSTFileTest, IngestFileWrittenWithCompressionDictionary) {
2366 if (!ZSTD_Supported()) {
2367 return;
2368 }
2369 const int kNumEntries = 1 << 10;
2370 const int kNumBytesPerEntry = 1 << 10;
2371 Options options = CurrentOptions();
2372 options.compression = kZSTD;
2373 options.compression_opts.max_dict_bytes = 1 << 14; // 16KB
2374 options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
2375 DestroyAndReopen(options);
2376
2377 std::atomic<int> num_compression_dicts(0);
f67539c2 2378 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
494da23a
TL
2379 "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
2380 [&](void* /* arg */) { ++num_compression_dicts; });
f67539c2 2381 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
494da23a
TL
2382
2383 Random rnd(301);
2384 std::vector<std::pair<std::string, std::string>> random_data;
2385 for (int i = 0; i < kNumEntries; i++) {
2386 std::string val;
2387 test::RandomString(&rnd, kNumBytesPerEntry, &val);
2388 random_data.emplace_back(Key(i), std::move(val));
2389 }
2390 ASSERT_OK(GenerateAndAddExternalFile(options, std::move(random_data)));
2391 ASSERT_EQ(1, num_compression_dicts);
2392}
2393
2394TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
2395 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
2396 new FaultInjectionTestEnv(env_));
2397 Options options = CurrentOptions();
2398 options.env = fault_injection_env.get();
f67539c2 2399 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
494da23a
TL
2400 std::vector<ColumnFamilyHandle*> column_families;
2401 column_families.push_back(handles_[0]);
2402 column_families.push_back(handles_[1]);
f67539c2 2403 column_families.push_back(handles_[2]);
494da23a
TL
2404 std::vector<IngestExternalFileOptions> ifos(column_families.size());
2405 for (auto& ifo : ifos) {
2406 ifo.allow_global_seqno = true; // Always allow global_seqno
2407 // May or may not write global_seqno
2408 ifo.write_global_seqno = std::get<0>(GetParam());
2409 // Whether to verify checksums before ingestion
2410 ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
2411 }
2412 std::vector<std::vector<std::pair<std::string, std::string>>> data;
2413 data.push_back(
2414 {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
2415 data.push_back(
2416 {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
f67539c2
TL
2417 data.push_back(
2418 {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
2419
494da23a
TL
2420 // Resize the true_data vector upon construction to avoid re-alloc
2421 std::vector<std::map<std::string, std::string>> true_data(
2422 column_families.size());
2423 Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
2424 -1, true, true_data);
2425 ASSERT_OK(s);
2426 Close();
f67539c2
TL
2427 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
2428 options);
2429 ASSERT_EQ(3, handles_.size());
494da23a
TL
2430 int cf = 0;
2431 for (const auto& verify_map : true_data) {
2432 for (const auto& elem : verify_map) {
2433 const std::string& key = elem.first;
2434 const std::string& value = elem.second;
2435 ASSERT_EQ(value, Get(cf, key));
2436 }
2437 ++cf;
2438 }
2439 Close();
2440 Destroy(options, true /* delete_cf_paths */);
2441}
2442
2443TEST_P(ExternalSSTFileTest,
2444 IngestFilesIntoMultipleColumnFamilies_NoMixedStateWithSnapshot) {
2445 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
2446 new FaultInjectionTestEnv(env_));
2447 SyncPoint::GetInstance()->DisableProcessing();
2448 SyncPoint::GetInstance()->ClearAllCallBacks();
2449 SyncPoint::GetInstance()->LoadDependency({
2450 {"DBImpl::IngestExternalFiles:InstallSVForFirstCF:0",
2451 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
2452 "BeforeRead"},
2453 {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
2454 "AfterRead",
2455 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"},
2456 });
2457 SyncPoint::GetInstance()->EnableProcessing();
2458
2459 Options options = CurrentOptions();
2460 options.env = fault_injection_env.get();
f67539c2 2461 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
494da23a
TL
2462 const std::vector<std::map<std::string, std::string>> data_before_ingestion =
2463 {{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
f67539c2
TL
2464 {{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}},
2465 {{"bar4", "bv4_0"}, {"bar5", "bv5_0"}, {"bar6", "bv6_0"}}};
494da23a
TL
2466 for (size_t i = 0; i != handles_.size(); ++i) {
2467 int cf = static_cast<int>(i);
2468 const auto& orig_data = data_before_ingestion[i];
2469 for (const auto& kv : orig_data) {
2470 ASSERT_OK(Put(cf, kv.first, kv.second));
2471 }
2472 ASSERT_OK(Flush(cf));
2473 }
2474
2475 std::vector<ColumnFamilyHandle*> column_families;
2476 column_families.push_back(handles_[0]);
2477 column_families.push_back(handles_[1]);
f67539c2 2478 column_families.push_back(handles_[2]);
494da23a
TL
2479 std::vector<IngestExternalFileOptions> ifos(column_families.size());
2480 for (auto& ifo : ifos) {
2481 ifo.allow_global_seqno = true; // Always allow global_seqno
2482 // May or may not write global_seqno
2483 ifo.write_global_seqno = std::get<0>(GetParam());
2484 // Whether to verify checksums before ingestion
2485 ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
2486 }
2487 std::vector<std::vector<std::pair<std::string, std::string>>> data;
2488 data.push_back(
2489 {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
2490 data.push_back(
2491 {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
f67539c2
TL
2492 data.push_back(
2493 {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
494da23a
TL
2494 // Resize the true_data vector upon construction to avoid re-alloc
2495 std::vector<std::map<std::string, std::string>> true_data(
2496 column_families.size());
2497 // Take snapshot before ingestion starts
2498 ReadOptions read_opts;
2499 read_opts.total_order_seek = true;
2500 read_opts.snapshot = dbfull()->GetSnapshot();
2501 std::vector<Iterator*> iters(handles_.size());
2502
2503 // Range scan checks first kv of each CF before ingestion starts.
2504 for (size_t i = 0; i != handles_.size(); ++i) {
2505 iters[i] = dbfull()->NewIterator(read_opts, handles_[i]);
2506 iters[i]->SeekToFirst();
2507 ASSERT_TRUE(iters[i]->Valid());
2508 const std::string& key = iters[i]->key().ToString();
2509 const std::string& value = iters[i]->value().ToString();
2510 const std::map<std::string, std::string>& orig_data =
2511 data_before_ingestion[i];
2512 std::map<std::string, std::string>::const_iterator it = orig_data.find(key);
2513 ASSERT_NE(orig_data.end(), it);
2514 ASSERT_EQ(it->second, value);
2515 iters[i]->Next();
2516 }
2517 port::Thread ingest_thread([&]() {
2518 ASSERT_OK(GenerateAndAddExternalFiles(options, column_families, ifos, data,
2519 -1, true, true_data));
2520 });
2521 TEST_SYNC_POINT(
2522 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
2523 "BeforeRead");
2524 // Should see only data before ingestion
2525 for (size_t i = 0; i != handles_.size(); ++i) {
2526 const auto& orig_data = data_before_ingestion[i];
2527 for (; iters[i]->Valid(); iters[i]->Next()) {
2528 const std::string& key = iters[i]->key().ToString();
2529 const std::string& value = iters[i]->value().ToString();
2530 std::map<std::string, std::string>::const_iterator it =
2531 orig_data.find(key);
2532 ASSERT_NE(orig_data.end(), it);
2533 ASSERT_EQ(it->second, value);
2534 }
2535 }
2536 TEST_SYNC_POINT(
2537 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
2538 "AfterRead");
2539 ingest_thread.join();
2540 for (auto* iter : iters) {
2541 delete iter;
2542 }
2543 iters.clear();
2544 dbfull()->ReleaseSnapshot(read_opts.snapshot);
2545
2546 Close();
f67539c2
TL
2547 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
2548 options);
494da23a
TL
2549 // Should see consistent state after ingestion for all column families even
2550 // without snapshot.
f67539c2 2551 ASSERT_EQ(3, handles_.size());
494da23a
TL
2552 int cf = 0;
2553 for (const auto& verify_map : true_data) {
2554 for (const auto& elem : verify_map) {
2555 const std::string& key = elem.first;
2556 const std::string& value = elem.second;
2557 ASSERT_EQ(value, Get(cf, key));
2558 }
2559 ++cf;
2560 }
2561 Close();
2562 Destroy(options, true /* delete_cf_paths */);
2563}
2564
2565TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
2566 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
2567 new FaultInjectionTestEnv(env_));
2568 Options options = CurrentOptions();
2569 options.env = fault_injection_env.get();
2570 SyncPoint::GetInstance()->DisableProcessing();
2571 SyncPoint::GetInstance()->ClearAllCallBacks();
2572 SyncPoint::GetInstance()->LoadDependency({
2573 {"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0",
2574 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:"
2575 "0"},
2576 {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:"
2577 "1",
2578 "DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
2579 });
2580 SyncPoint::GetInstance()->EnableProcessing();
f67539c2 2581 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
494da23a
TL
2582 std::vector<ColumnFamilyHandle*> column_families;
2583 column_families.push_back(handles_[0]);
2584 column_families.push_back(handles_[1]);
f67539c2 2585 column_families.push_back(handles_[2]);
494da23a
TL
2586 std::vector<IngestExternalFileOptions> ifos(column_families.size());
2587 for (auto& ifo : ifos) {
2588 ifo.allow_global_seqno = true; // Always allow global_seqno
2589 // May or may not write global_seqno
2590 ifo.write_global_seqno = std::get<0>(GetParam());
2591 // Whether to verify block checksums before ingest
2592 ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
2593 }
2594 std::vector<std::vector<std::pair<std::string, std::string>>> data;
2595 data.push_back(
2596 {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
2597 data.push_back(
2598 {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
f67539c2
TL
2599 data.push_back(
2600 {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
2601
494da23a
TL
2602 // Resize the true_data vector upon construction to avoid re-alloc
2603 std::vector<std::map<std::string, std::string>> true_data(
2604 column_families.size());
2605 port::Thread ingest_thread([&]() {
2606 Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
2607 -1, true, true_data);
2608 ASSERT_NOK(s);
2609 });
2610 TEST_SYNC_POINT(
2611 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:"
2612 "0");
2613 fault_injection_env->SetFilesystemActive(false);
2614 TEST_SYNC_POINT(
2615 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:"
2616 "1");
2617 ingest_thread.join();
2618
2619 fault_injection_env->SetFilesystemActive(true);
2620 Close();
f67539c2
TL
2621 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
2622 options);
2623 ASSERT_EQ(3, handles_.size());
494da23a
TL
2624 int cf = 0;
2625 for (const auto& verify_map : true_data) {
2626 for (const auto& elem : verify_map) {
2627 const std::string& key = elem.first;
2628 ASSERT_EQ("NOT_FOUND", Get(cf, key));
2629 }
2630 ++cf;
2631 }
2632 Close();
2633 Destroy(options, true /* delete_cf_paths */);
2634}
2635
2636TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
2637 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
2638 new FaultInjectionTestEnv(env_));
2639 Options options = CurrentOptions();
2640 options.env = fault_injection_env.get();
2641 SyncPoint::GetInstance()->DisableProcessing();
2642 SyncPoint::GetInstance()->ClearAllCallBacks();
2643 SyncPoint::GetInstance()->LoadDependency({
2644 {"DBImpl::IngestExternalFiles:BeforeJobsRun:0",
2645 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
2646 "0"},
2647 {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
2648 "1",
2649 "DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
2650 });
2651 SyncPoint::GetInstance()->EnableProcessing();
f67539c2 2652 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
494da23a
TL
2653 std::vector<ColumnFamilyHandle*> column_families;
2654 column_families.push_back(handles_[0]);
2655 column_families.push_back(handles_[1]);
f67539c2 2656 column_families.push_back(handles_[2]);
494da23a
TL
2657 std::vector<IngestExternalFileOptions> ifos(column_families.size());
2658 for (auto& ifo : ifos) {
2659 ifo.allow_global_seqno = true; // Always allow global_seqno
2660 // May or may not write global_seqno
2661 ifo.write_global_seqno = std::get<0>(GetParam());
2662 // Whether to verify block checksums before ingestion
2663 ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
2664 }
2665 std::vector<std::vector<std::pair<std::string, std::string>>> data;
2666 data.push_back(
2667 {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
2668 data.push_back(
2669 {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
f67539c2
TL
2670 data.push_back(
2671 {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
494da23a
TL
2672 // Resize the true_data vector upon construction to avoid re-alloc
2673 std::vector<std::map<std::string, std::string>> true_data(
2674 column_families.size());
2675 port::Thread ingest_thread([&]() {
2676 Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
2677 -1, true, true_data);
2678 ASSERT_NOK(s);
2679 });
2680 TEST_SYNC_POINT(
2681 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
2682 "0");
2683 fault_injection_env->SetFilesystemActive(false);
2684 TEST_SYNC_POINT(
2685 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
2686 "1");
2687 ingest_thread.join();
2688
2689 fault_injection_env->SetFilesystemActive(true);
2690 Close();
f67539c2
TL
2691 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
2692 options);
2693 ASSERT_EQ(3, handles_.size());
494da23a
TL
2694 int cf = 0;
2695 for (const auto& verify_map : true_data) {
2696 for (const auto& elem : verify_map) {
2697 const std::string& key = elem.first;
2698 ASSERT_EQ("NOT_FOUND", Get(cf, key));
2699 }
2700 ++cf;
2701 }
2702 Close();
2703 Destroy(options, true /* delete_cf_paths */);
2704}
2705
2706TEST_P(ExternalSSTFileTest,
2707 IngestFilesIntoMultipleColumnFamilies_PartialManifestWriteFail) {
2708 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
2709 new FaultInjectionTestEnv(env_));
2710 Options options = CurrentOptions();
2711 options.env = fault_injection_env.get();
2712
f67539c2 2713 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
494da23a
TL
2714
2715 SyncPoint::GetInstance()->ClearTrace();
2716 SyncPoint::GetInstance()->DisableProcessing();
2717 SyncPoint::GetInstance()->ClearAllCallBacks();
2718 SyncPoint::GetInstance()->LoadDependency({
2719 {"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
2720 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
2721 "PartialManifestWriteFail:0"},
2722 {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
2723 "PartialManifestWriteFail:1",
2724 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1"},
2725 });
2726 SyncPoint::GetInstance()->EnableProcessing();
2727
2728 std::vector<ColumnFamilyHandle*> column_families;
2729 column_families.push_back(handles_[0]);
2730 column_families.push_back(handles_[1]);
f67539c2 2731 column_families.push_back(handles_[2]);
494da23a
TL
2732 std::vector<IngestExternalFileOptions> ifos(column_families.size());
2733 for (auto& ifo : ifos) {
2734 ifo.allow_global_seqno = true; // Always allow global_seqno
2735 // May or may not write global_seqno
2736 ifo.write_global_seqno = std::get<0>(GetParam());
2737 // Whether to verify block checksums before ingestion
2738 ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
2739 }
2740 std::vector<std::vector<std::pair<std::string, std::string>>> data;
2741 data.push_back(
2742 {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
2743 data.push_back(
2744 {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
f67539c2
TL
2745 data.push_back(
2746 {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")});
494da23a
TL
2747 // Resize the true_data vector upon construction to avoid re-alloc
2748 std::vector<std::map<std::string, std::string>> true_data(
2749 column_families.size());
2750 port::Thread ingest_thread([&]() {
2751 Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
2752 -1, true, true_data);
2753 ASSERT_NOK(s);
2754 });
2755 TEST_SYNC_POINT(
2756 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
2757 "PartialManifestWriteFail:0");
2758 fault_injection_env->SetFilesystemActive(false);
2759 TEST_SYNC_POINT(
2760 "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
2761 "PartialManifestWriteFail:1");
2762 ingest_thread.join();
2763
2764 fault_injection_env->DropUnsyncedFileData();
2765 fault_injection_env->SetFilesystemActive(true);
2766 Close();
f67539c2
TL
2767 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
2768 options);
2769 ASSERT_EQ(3, handles_.size());
494da23a
TL
2770 int cf = 0;
2771 for (const auto& verify_map : true_data) {
2772 for (const auto& elem : verify_map) {
2773 const std::string& key = elem.first;
2774 ASSERT_EQ("NOT_FOUND", Get(cf, key));
2775 }
2776 ++cf;
2777 }
2778 Close();
2779 Destroy(options, true /* delete_cf_paths */);
2780}
2781
f67539c2
TL
2782TEST_P(ExternalSSTFileTest, IngestFilesTriggerFlushingWithTwoWriteQueue) {
2783 Options options = CurrentOptions();
2784 // Use large buffer to avoid memtable flush
2785 options.write_buffer_size = 1024 * 1024;
2786 options.two_write_queues = true;
2787 DestroyAndReopen(options);
2788
2789 ASSERT_OK(dbfull()->Put(WriteOptions(), "1000", "v1"));
2790 ASSERT_OK(dbfull()->Put(WriteOptions(), "1001", "v1"));
2791 ASSERT_OK(dbfull()->Put(WriteOptions(), "9999", "v1"));
2792
2793 // Put one key which is overlap with keys in memtable.
2794 // It will trigger flushing memtable and require this thread is
2795 // currently at the front of the 2nd writer queue. We must make
2796 // sure that it won't enter the 2nd writer queue for the second time.
2797 std::vector<std::pair<std::string, std::string>> data;
2798 data.push_back(std::make_pair("1001", "v2"));
2799 GenerateAndAddExternalFile(options, data);
2800}
2801
494da23a
TL
2802INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
2803 testing::Values(std::make_tuple(false, false),
2804 std::make_tuple(false, true),
2805 std::make_tuple(true, false),
2806 std::make_tuple(true, true)));
2807
f67539c2
TL
2808INSTANTIATE_TEST_CASE_P(ExternSSTFileLinkFailFallbackTest,
2809 ExternSSTFileLinkFailFallbackTest,
2810 testing::Values(std::make_tuple(true, false),
2811 std::make_tuple(true, true),
2812 std::make_tuple(false, false)));
2813
2814} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
2815
2816int main(int argc, char** argv) {
f67539c2 2817 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
7c673cae
FG
2818 ::testing::InitGoogleTest(&argc, argv);
2819 return RUN_ALL_TESTS();
2820}
2821
2822#else
2823#include <stdio.h>
2824
11fdf7f2 2825int main(int /*argc*/, char** /*argv*/) {
7c673cae
FG
2826 fprintf(stderr,
2827 "SKIPPED as External SST File Writer and Ingestion are not supported "
2828 "in ROCKSDB_LITE\n");
2829 return 0;
2830}
2831
2832#endif // !ROCKSDB_LITE