]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include <functional> | |
9 | #include "db/db_test_util.h" | |
f67539c2 | 10 | #include "file/filename.h" |
7c673cae FG |
11 | #include "port/port.h" |
12 | #include "port/stack_trace.h" | |
13 | #include "rocksdb/sst_file_writer.h" | |
f67539c2 TL |
14 | #include "test_util/fault_injection_test_env.h" |
15 | #include "test_util/testutil.h" | |
7c673cae | 16 | |
f67539c2 TL |
17 | namespace ROCKSDB_NAMESPACE { |
18 | ||
19 | // A test environment that can be configured to fail the Link operation. | |
20 | class ExternalSSTTestEnv : public EnvWrapper { | |
21 | public: | |
22 | ExternalSSTTestEnv(Env* t, bool fail_link) | |
23 | : EnvWrapper(t), fail_link_(fail_link) {} | |
24 | ||
25 | Status LinkFile(const std::string& s, const std::string& t) override { | |
26 | if (fail_link_) { | |
27 | return Status::NotSupported("Link failed"); | |
28 | } | |
29 | return target()->LinkFile(s, t); | |
30 | } | |
31 | ||
32 | void set_fail_link(bool fail_link) { fail_link_ = fail_link; } | |
33 | ||
34 | private: | |
35 | bool fail_link_; | |
36 | }; | |
37 | ||
38 | class ExternSSTFileLinkFailFallbackTest | |
39 | : public DBTestBase, | |
40 | public ::testing::WithParamInterface<std::tuple<bool, bool>> { | |
41 | public: | |
42 | ExternSSTFileLinkFailFallbackTest() | |
43 | : DBTestBase("/external_sst_file_test"), | |
44 | test_env_(new ExternalSSTTestEnv(env_, true)) { | |
45 | sst_files_dir_ = dbname_ + "/sst_files/"; | |
46 | test::DestroyDir(env_, sst_files_dir_); | |
47 | env_->CreateDir(sst_files_dir_); | |
48 | options_ = CurrentOptions(); | |
49 | options_.disable_auto_compactions = true; | |
50 | options_.env = test_env_; | |
51 | } | |
52 | ||
53 | void TearDown() override { | |
54 | delete db_; | |
55 | db_ = nullptr; | |
56 | ASSERT_OK(DestroyDB(dbname_, options_)); | |
57 | delete test_env_; | |
58 | test_env_ = nullptr; | |
59 | } | |
60 | ||
61 | protected: | |
62 | std::string sst_files_dir_; | |
63 | Options options_; | |
64 | ExternalSSTTestEnv* test_env_; | |
65 | }; | |
7c673cae | 66 | |
494da23a TL |
67 | class ExternalSSTFileTest |
68 | : public DBTestBase, | |
69 | public ::testing::WithParamInterface<std::tuple<bool, bool>> { | |
7c673cae FG |
70 | public: |
71 | ExternalSSTFileTest() : DBTestBase("/external_sst_file_test") { | |
72 | sst_files_dir_ = dbname_ + "/sst_files/"; | |
73 | DestroyAndRecreateExternalSSTFilesDir(); | |
74 | } | |
75 | ||
76 | void DestroyAndRecreateExternalSSTFilesDir() { | |
77 | test::DestroyDir(env_, sst_files_dir_); | |
78 | env_->CreateDir(sst_files_dir_); | |
79 | } | |
80 | ||
494da23a TL |
81 | Status GenerateOneExternalFile( |
82 | const Options& options, ColumnFamilyHandle* cfh, | |
83 | std::vector<std::pair<std::string, std::string>>& data, int file_id, | |
84 | bool sort_data, std::string* external_file_path, | |
85 | std::map<std::string, std::string>* true_data) { | |
7c673cae | 86 | // Generate a file id if not provided |
494da23a TL |
87 | if (-1 == file_id) { |
88 | file_id = (++last_file_id_); | |
7c673cae | 89 | } |
7c673cae FG |
90 | // Sort data if asked to do so |
91 | if (sort_data) { | |
92 | std::sort(data.begin(), data.end(), | |
93 | [&](const std::pair<std::string, std::string>& e1, | |
94 | const std::pair<std::string, std::string>& e2) { | |
95 | return options.comparator->Compare(e1.first, e2.first) < 0; | |
96 | }); | |
97 | auto uniq_iter = std::unique( | |
98 | data.begin(), data.end(), | |
99 | [&](const std::pair<std::string, std::string>& e1, | |
100 | const std::pair<std::string, std::string>& e2) { | |
101 | return options.comparator->Compare(e1.first, e2.first) == 0; | |
102 | }); | |
103 | data.resize(uniq_iter - data.begin()); | |
104 | } | |
105 | std::string file_path = sst_files_dir_ + ToString(file_id); | |
106 | SstFileWriter sst_file_writer(EnvOptions(), options, cfh); | |
7c673cae FG |
107 | Status s = sst_file_writer.Open(file_path); |
108 | if (!s.ok()) { | |
109 | return s; | |
110 | } | |
494da23a | 111 | for (const auto& entry : data) { |
11fdf7f2 | 112 | s = sst_file_writer.Put(entry.first, entry.second); |
7c673cae FG |
113 | if (!s.ok()) { |
114 | sst_file_writer.Finish(); | |
115 | return s; | |
116 | } | |
117 | } | |
118 | s = sst_file_writer.Finish(); | |
494da23a TL |
119 | if (s.ok() && external_file_path != nullptr) { |
120 | *external_file_path = file_path; | |
7c673cae | 121 | } |
494da23a TL |
122 | if (s.ok() && nullptr != true_data) { |
123 | for (const auto& entry : data) { | |
124 | true_data->insert({entry.first, entry.second}); | |
7c673cae FG |
125 | } |
126 | } | |
7c673cae FG |
127 | return s; |
128 | } | |
129 | ||
494da23a TL |
130 | Status GenerateAndAddExternalFile( |
131 | const Options options, | |
11fdf7f2 | 132 | std::vector<std::pair<std::string, std::string>> data, int file_id = -1, |
494da23a TL |
133 | bool allow_global_seqno = false, bool write_global_seqno = false, |
134 | bool verify_checksums_before_ingest = true, bool ingest_behind = false, | |
11fdf7f2 TL |
135 | bool sort_data = false, |
136 | std::map<std::string, std::string>* true_data = nullptr, | |
137 | ColumnFamilyHandle* cfh = nullptr) { | |
138 | // Generate a file id if not provided | |
139 | if (file_id == -1) { | |
140 | file_id = last_file_id_ + 1; | |
141 | last_file_id_++; | |
142 | } | |
143 | ||
144 | // Sort data if asked to do so | |
145 | if (sort_data) { | |
146 | std::sort(data.begin(), data.end(), | |
147 | [&](const std::pair<std::string, std::string>& e1, | |
148 | const std::pair<std::string, std::string>& e2) { | |
149 | return options.comparator->Compare(e1.first, e2.first) < 0; | |
150 | }); | |
151 | auto uniq_iter = std::unique( | |
152 | data.begin(), data.end(), | |
153 | [&](const std::pair<std::string, std::string>& e1, | |
154 | const std::pair<std::string, std::string>& e2) { | |
155 | return options.comparator->Compare(e1.first, e2.first) == 0; | |
156 | }); | |
157 | data.resize(uniq_iter - data.begin()); | |
158 | } | |
159 | std::string file_path = sst_files_dir_ + ToString(file_id); | |
160 | SstFileWriter sst_file_writer(EnvOptions(), options, cfh); | |
161 | ||
162 | Status s = sst_file_writer.Open(file_path); | |
163 | if (!s.ok()) { | |
164 | return s; | |
165 | } | |
166 | for (auto& entry : data) { | |
167 | s = sst_file_writer.Put(entry.first, entry.second); | |
168 | if (!s.ok()) { | |
169 | sst_file_writer.Finish(); | |
170 | return s; | |
171 | } | |
172 | } | |
173 | s = sst_file_writer.Finish(); | |
174 | ||
175 | if (s.ok()) { | |
494da23a TL |
176 | IngestExternalFileOptions ifo; |
177 | ifo.allow_global_seqno = allow_global_seqno; | |
178 | ifo.write_global_seqno = allow_global_seqno ? write_global_seqno : false; | |
179 | ifo.verify_checksums_before_ingest = verify_checksums_before_ingest; | |
180 | ifo.ingest_behind = ingest_behind; | |
11fdf7f2 TL |
181 | if (cfh) { |
182 | s = db_->IngestExternalFile(cfh, {file_path}, ifo); | |
183 | } else { | |
184 | s = db_->IngestExternalFile({file_path}, ifo); | |
185 | } | |
186 | } | |
187 | ||
188 | if (s.ok() && true_data) { | |
189 | for (auto& entry : data) { | |
190 | (*true_data)[entry.first] = entry.second; | |
191 | } | |
192 | } | |
193 | ||
194 | return s; | |
195 | } | |
196 | ||
494da23a TL |
197 | Status GenerateAndAddExternalFiles( |
198 | const Options& options, | |
199 | const std::vector<ColumnFamilyHandle*>& column_families, | |
200 | const std::vector<IngestExternalFileOptions>& ifos, | |
201 | std::vector<std::vector<std::pair<std::string, std::string>>>& data, | |
202 | int file_id, bool sort_data, | |
203 | std::vector<std::map<std::string, std::string>>& true_data) { | |
204 | if (-1 == file_id) { | |
205 | file_id = (++last_file_id_); | |
206 | } | |
207 | // Generate external SST files, one for each column family | |
208 | size_t num_cfs = column_families.size(); | |
209 | assert(ifos.size() == num_cfs); | |
210 | assert(data.size() == num_cfs); | |
211 | Status s; | |
212 | std::vector<IngestExternalFileArg> args(num_cfs); | |
213 | for (size_t i = 0; i != num_cfs; ++i) { | |
214 | std::string external_file_path; | |
215 | s = GenerateOneExternalFile( | |
216 | options, column_families[i], data[i], file_id, sort_data, | |
217 | &external_file_path, | |
218 | true_data.size() == num_cfs ? &true_data[i] : nullptr); | |
219 | if (!s.ok()) { | |
220 | return s; | |
221 | } | |
222 | ++file_id; | |
11fdf7f2 | 223 | |
494da23a TL |
224 | args[i].column_family = column_families[i]; |
225 | args[i].external_files.push_back(external_file_path); | |
226 | args[i].options = ifos[i]; | |
227 | } | |
228 | s = db_->IngestExternalFiles(args); | |
229 | return s; | |
230 | } | |
11fdf7f2 | 231 | |
7c673cae FG |
232 | Status GenerateAndAddExternalFile( |
233 | const Options options, std::vector<std::pair<int, std::string>> data, | |
494da23a TL |
234 | int file_id = -1, bool allow_global_seqno = false, |
235 | bool write_global_seqno = false, | |
236 | bool verify_checksums_before_ingest = true, bool ingest_behind = false, | |
237 | bool sort_data = false, | |
7c673cae FG |
238 | std::map<std::string, std::string>* true_data = nullptr, |
239 | ColumnFamilyHandle* cfh = nullptr) { | |
240 | std::vector<std::pair<std::string, std::string>> file_data; | |
241 | for (auto& entry : data) { | |
242 | file_data.emplace_back(Key(entry.first), entry.second); | |
243 | } | |
244 | return GenerateAndAddExternalFile(options, file_data, file_id, | |
494da23a TL |
245 | allow_global_seqno, write_global_seqno, |
246 | verify_checksums_before_ingest, | |
247 | ingest_behind, sort_data, true_data, cfh); | |
7c673cae FG |
248 | } |
249 | ||
250 | Status GenerateAndAddExternalFile( | |
251 | const Options options, std::vector<int> keys, int file_id = -1, | |
494da23a TL |
252 | bool allow_global_seqno = false, bool write_global_seqno = false, |
253 | bool verify_checksums_before_ingest = true, bool ingest_behind = false, | |
254 | bool sort_data = false, | |
7c673cae FG |
255 | std::map<std::string, std::string>* true_data = nullptr, |
256 | ColumnFamilyHandle* cfh = nullptr) { | |
257 | std::vector<std::pair<std::string, std::string>> file_data; | |
258 | for (auto& k : keys) { | |
259 | file_data.emplace_back(Key(k), Key(k) + ToString(file_id)); | |
260 | } | |
261 | return GenerateAndAddExternalFile(options, file_data, file_id, | |
494da23a TL |
262 | allow_global_seqno, write_global_seqno, |
263 | verify_checksums_before_ingest, | |
264 | ingest_behind, sort_data, true_data, cfh); | |
7c673cae FG |
265 | } |
266 | ||
267 | Status DeprecatedAddFile(const std::vector<std::string>& files, | |
268 | bool move_files = false, | |
494da23a TL |
269 | bool skip_snapshot_check = false, |
270 | bool skip_write_global_seqno = false) { | |
7c673cae FG |
271 | IngestExternalFileOptions opts; |
272 | opts.move_files = move_files; | |
273 | opts.snapshot_consistency = !skip_snapshot_check; | |
274 | opts.allow_global_seqno = false; | |
275 | opts.allow_blocking_flush = false; | |
494da23a | 276 | opts.write_global_seqno = !skip_write_global_seqno; |
7c673cae FG |
277 | return db_->IngestExternalFile(files, opts); |
278 | } | |
279 | ||
494da23a | 280 | ~ExternalSSTFileTest() override { test::DestroyDir(env_, sst_files_dir_); } |
7c673cae FG |
281 | |
282 | protected: | |
283 | int last_file_id_ = 0; | |
284 | std::string sst_files_dir_; | |
285 | }; | |
286 | ||
287 | TEST_F(ExternalSSTFileTest, Basic) { | |
288 | do { | |
289 | Options options = CurrentOptions(); | |
290 | ||
291 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
292 | ||
293 | // Current file size should be 0 after sst_file_writer init and before open a file. | |
294 | ASSERT_EQ(sst_file_writer.FileSize(), 0); | |
295 | ||
296 | // file1.sst (0 => 99) | |
297 | std::string file1 = sst_files_dir_ + "file1.sst"; | |
298 | ASSERT_OK(sst_file_writer.Open(file1)); | |
299 | for (int k = 0; k < 100; k++) { | |
11fdf7f2 | 300 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); |
7c673cae FG |
301 | } |
302 | ExternalSstFileInfo file1_info; | |
303 | Status s = sst_file_writer.Finish(&file1_info); | |
304 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
305 | ||
306 | // Current file size should be non-zero after success write. | |
307 | ASSERT_GT(sst_file_writer.FileSize(), 0); | |
308 | ||
309 | ASSERT_EQ(file1_info.file_path, file1); | |
310 | ASSERT_EQ(file1_info.num_entries, 100); | |
311 | ASSERT_EQ(file1_info.smallest_key, Key(0)); | |
312 | ASSERT_EQ(file1_info.largest_key, Key(99)); | |
11fdf7f2 TL |
313 | ASSERT_EQ(file1_info.num_range_del_entries, 0); |
314 | ASSERT_EQ(file1_info.smallest_range_del_key, ""); | |
315 | ASSERT_EQ(file1_info.largest_range_del_key, ""); | |
7c673cae | 316 | // sst_file_writer already finished, cannot add this value |
11fdf7f2 | 317 | s = sst_file_writer.Put(Key(100), "bad_val"); |
7c673cae FG |
318 | ASSERT_FALSE(s.ok()) << s.ToString(); |
319 | ||
320 | // file2.sst (100 => 199) | |
321 | std::string file2 = sst_files_dir_ + "file2.sst"; | |
322 | ASSERT_OK(sst_file_writer.Open(file2)); | |
323 | for (int k = 100; k < 200; k++) { | |
11fdf7f2 | 324 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); |
7c673cae FG |
325 | } |
326 | // Cannot add this key because it's not after last added key | |
11fdf7f2 | 327 | s = sst_file_writer.Put(Key(99), "bad_val"); |
7c673cae FG |
328 | ASSERT_FALSE(s.ok()) << s.ToString(); |
329 | ExternalSstFileInfo file2_info; | |
330 | s = sst_file_writer.Finish(&file2_info); | |
331 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
332 | ASSERT_EQ(file2_info.file_path, file2); | |
333 | ASSERT_EQ(file2_info.num_entries, 100); | |
334 | ASSERT_EQ(file2_info.smallest_key, Key(100)); | |
335 | ASSERT_EQ(file2_info.largest_key, Key(199)); | |
336 | ||
337 | // file3.sst (195 => 299) | |
338 | // This file values overlap with file2 values | |
339 | std::string file3 = sst_files_dir_ + "file3.sst"; | |
340 | ASSERT_OK(sst_file_writer.Open(file3)); | |
341 | for (int k = 195; k < 300; k++) { | |
11fdf7f2 | 342 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); |
7c673cae FG |
343 | } |
344 | ExternalSstFileInfo file3_info; | |
345 | s = sst_file_writer.Finish(&file3_info); | |
346 | ||
347 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
348 | // Current file size should be non-zero after success finish. | |
349 | ASSERT_GT(sst_file_writer.FileSize(), 0); | |
350 | ASSERT_EQ(file3_info.file_path, file3); | |
351 | ASSERT_EQ(file3_info.num_entries, 105); | |
352 | ASSERT_EQ(file3_info.smallest_key, Key(195)); | |
353 | ASSERT_EQ(file3_info.largest_key, Key(299)); | |
354 | ||
355 | // file4.sst (30 => 39) | |
356 | // This file values overlap with file1 values | |
357 | std::string file4 = sst_files_dir_ + "file4.sst"; | |
358 | ASSERT_OK(sst_file_writer.Open(file4)); | |
359 | for (int k = 30; k < 40; k++) { | |
11fdf7f2 | 360 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); |
7c673cae FG |
361 | } |
362 | ExternalSstFileInfo file4_info; | |
363 | s = sst_file_writer.Finish(&file4_info); | |
364 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
365 | ASSERT_EQ(file4_info.file_path, file4); | |
366 | ASSERT_EQ(file4_info.num_entries, 10); | |
367 | ASSERT_EQ(file4_info.smallest_key, Key(30)); | |
368 | ASSERT_EQ(file4_info.largest_key, Key(39)); | |
369 | ||
370 | // file5.sst (400 => 499) | |
371 | std::string file5 = sst_files_dir_ + "file5.sst"; | |
372 | ASSERT_OK(sst_file_writer.Open(file5)); | |
373 | for (int k = 400; k < 500; k++) { | |
11fdf7f2 | 374 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); |
7c673cae FG |
375 | } |
376 | ExternalSstFileInfo file5_info; | |
377 | s = sst_file_writer.Finish(&file5_info); | |
378 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
379 | ASSERT_EQ(file5_info.file_path, file5); | |
380 | ASSERT_EQ(file5_info.num_entries, 100); | |
381 | ASSERT_EQ(file5_info.smallest_key, Key(400)); | |
382 | ASSERT_EQ(file5_info.largest_key, Key(499)); | |
383 | ||
11fdf7f2 TL |
384 | // file6.sst (delete 400 => 500) |
385 | std::string file6 = sst_files_dir_ + "file6.sst"; | |
386 | ASSERT_OK(sst_file_writer.Open(file6)); | |
387 | sst_file_writer.DeleteRange(Key(400), Key(500)); | |
388 | ExternalSstFileInfo file6_info; | |
389 | s = sst_file_writer.Finish(&file6_info); | |
390 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
391 | ASSERT_EQ(file6_info.file_path, file6); | |
392 | ASSERT_EQ(file6_info.num_entries, 0); | |
393 | ASSERT_EQ(file6_info.smallest_key, ""); | |
394 | ASSERT_EQ(file6_info.largest_key, ""); | |
395 | ASSERT_EQ(file6_info.num_range_del_entries, 1); | |
396 | ASSERT_EQ(file6_info.smallest_range_del_key, Key(400)); | |
397 | ASSERT_EQ(file6_info.largest_range_del_key, Key(500)); | |
398 | ||
399 | // file7.sst (delete 500 => 570, put 520 => 599 divisible by 2) | |
400 | std::string file7 = sst_files_dir_ + "file7.sst"; | |
401 | ASSERT_OK(sst_file_writer.Open(file7)); | |
402 | sst_file_writer.DeleteRange(Key(500), Key(550)); | |
403 | for (int k = 520; k < 560; k += 2) { | |
404 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); | |
405 | } | |
406 | sst_file_writer.DeleteRange(Key(525), Key(575)); | |
407 | for (int k = 560; k < 600; k += 2) { | |
408 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); | |
409 | } | |
410 | ExternalSstFileInfo file7_info; | |
411 | s = sst_file_writer.Finish(&file7_info); | |
412 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
413 | ASSERT_EQ(file7_info.file_path, file7); | |
414 | ASSERT_EQ(file7_info.num_entries, 40); | |
415 | ASSERT_EQ(file7_info.smallest_key, Key(520)); | |
416 | ASSERT_EQ(file7_info.largest_key, Key(598)); | |
417 | ASSERT_EQ(file7_info.num_range_del_entries, 2); | |
418 | ASSERT_EQ(file7_info.smallest_range_del_key, Key(500)); | |
419 | ASSERT_EQ(file7_info.largest_range_del_key, Key(575)); | |
420 | ||
421 | // file8.sst (delete 600 => 700) | |
422 | std::string file8 = sst_files_dir_ + "file8.sst"; | |
423 | ASSERT_OK(sst_file_writer.Open(file8)); | |
424 | sst_file_writer.DeleteRange(Key(600), Key(700)); | |
425 | ExternalSstFileInfo file8_info; | |
426 | s = sst_file_writer.Finish(&file8_info); | |
427 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
428 | ASSERT_EQ(file8_info.file_path, file8); | |
429 | ASSERT_EQ(file8_info.num_entries, 0); | |
430 | ASSERT_EQ(file8_info.smallest_key, ""); | |
431 | ASSERT_EQ(file8_info.largest_key, ""); | |
432 | ASSERT_EQ(file8_info.num_range_del_entries, 1); | |
433 | ASSERT_EQ(file8_info.smallest_range_del_key, Key(600)); | |
434 | ASSERT_EQ(file8_info.largest_range_del_key, Key(700)); | |
435 | ||
7c673cae FG |
436 | // Cannot create an empty sst file |
437 | std::string file_empty = sst_files_dir_ + "file_empty.sst"; | |
438 | ExternalSstFileInfo file_empty_info; | |
439 | s = sst_file_writer.Finish(&file_empty_info); | |
440 | ASSERT_NOK(s); | |
441 | ||
442 | DestroyAndReopen(options); | |
443 | // Add file using file path | |
444 | s = DeprecatedAddFile({file1}); | |
445 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
446 | ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); | |
447 | for (int k = 0; k < 100; k++) { | |
448 | ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); | |
449 | } | |
450 | ||
451 | // Add file while holding a snapshot will fail | |
452 | const Snapshot* s1 = db_->GetSnapshot(); | |
453 | if (s1 != nullptr) { | |
454 | ASSERT_NOK(DeprecatedAddFile({file2})); | |
455 | db_->ReleaseSnapshot(s1); | |
456 | } | |
457 | // We can add the file after releaseing the snapshot | |
458 | ASSERT_OK(DeprecatedAddFile({file2})); | |
459 | ||
460 | ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); | |
461 | for (int k = 0; k < 200; k++) { | |
462 | ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); | |
463 | } | |
464 | ||
465 | // This file has overlapping values with the existing data | |
466 | s = DeprecatedAddFile({file3}); | |
467 | ASSERT_FALSE(s.ok()) << s.ToString(); | |
468 | ||
469 | // This file has overlapping values with the existing data | |
470 | s = DeprecatedAddFile({file4}); | |
471 | ASSERT_FALSE(s.ok()) << s.ToString(); | |
472 | ||
473 | // Overwrite values of keys divisible by 5 | |
474 | for (int k = 0; k < 200; k += 5) { | |
475 | ASSERT_OK(Put(Key(k), Key(k) + "_val_new")); | |
476 | } | |
477 | ASSERT_NE(db_->GetLatestSequenceNumber(), 0U); | |
478 | ||
479 | // Key range of file5 (400 => 499) dont overlap with any keys in DB | |
480 | ASSERT_OK(DeprecatedAddFile({file5})); | |
481 | ||
11fdf7f2 TL |
482 | // This file has overlapping values with the existing data |
483 | s = DeprecatedAddFile({file6}); | |
484 | ASSERT_FALSE(s.ok()) << s.ToString(); | |
485 | ||
486 | // Key range of file7 (500 => 598) dont overlap with any keys in DB | |
487 | ASSERT_OK(DeprecatedAddFile({file7})); | |
488 | ||
489 | // Key range of file7 (600 => 700) dont overlap with any keys in DB | |
490 | ASSERT_OK(DeprecatedAddFile({file8})); | |
491 | ||
7c673cae FG |
492 | // Make sure values are correct before and after flush/compaction |
493 | for (int i = 0; i < 2; i++) { | |
494 | for (int k = 0; k < 200; k++) { | |
495 | std::string value = Key(k) + "_val"; | |
496 | if (k % 5 == 0) { | |
497 | value += "_new"; | |
498 | } | |
499 | ASSERT_EQ(Get(Key(k)), value); | |
500 | } | |
501 | for (int k = 400; k < 500; k++) { | |
502 | std::string value = Key(k) + "_val"; | |
503 | ASSERT_EQ(Get(Key(k)), value); | |
504 | } | |
11fdf7f2 TL |
505 | for (int k = 500; k < 600; k++) { |
506 | std::string value = Key(k) + "_val"; | |
507 | if (k < 520 || k % 2 == 1) { | |
508 | value = "NOT_FOUND"; | |
509 | } | |
510 | ASSERT_EQ(Get(Key(k)), value); | |
511 | } | |
7c673cae FG |
512 | ASSERT_OK(Flush()); |
513 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
514 | } | |
515 | ||
516 | Close(); | |
517 | options.disable_auto_compactions = true; | |
518 | Reopen(options); | |
519 | ||
520 | // Delete keys in range (400 => 499) | |
521 | for (int k = 400; k < 500; k++) { | |
522 | ASSERT_OK(Delete(Key(k))); | |
523 | } | |
524 | // We deleted range (400 => 499) but cannot add file5 because | |
525 | // of the range tombstones | |
526 | ASSERT_NOK(DeprecatedAddFile({file5})); | |
527 | ||
528 | // Compacting the DB will remove the tombstones | |
529 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
530 | ||
531 | // Now we can add the file | |
532 | ASSERT_OK(DeprecatedAddFile({file5})); | |
533 | ||
534 | // Verify values of file5 in DB | |
535 | for (int k = 400; k < 500; k++) { | |
536 | std::string value = Key(k) + "_val"; | |
537 | ASSERT_EQ(Get(Key(k)), value); | |
538 | } | |
539 | DestroyAndRecreateExternalSSTFilesDir(); | |
11fdf7f2 TL |
540 | } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction | |
541 | kRangeDelSkipConfigs)); | |
7c673cae | 542 | } |
494da23a | 543 | |
7c673cae FG |
544 | class SstFileWriterCollector : public TablePropertiesCollector { |
545 | public: | |
546 | explicit SstFileWriterCollector(const std::string prefix) : prefix_(prefix) { | |
547 | name_ = prefix_ + "_SstFileWriterCollector"; | |
548 | } | |
549 | ||
550 | const char* Name() const override { return name_.c_str(); } | |
551 | ||
552 | Status Finish(UserCollectedProperties* properties) override { | |
11fdf7f2 | 553 | std::string count = std::to_string(count_); |
7c673cae FG |
554 | *properties = UserCollectedProperties{ |
555 | {prefix_ + "_SstFileWriterCollector", "YES"}, | |
11fdf7f2 | 556 | {prefix_ + "_Count", count}, |
7c673cae FG |
557 | }; |
558 | return Status::OK(); | |
559 | } | |
560 | ||
11fdf7f2 TL |
561 | Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/, |
562 | EntryType /*type*/, SequenceNumber /*seq*/, | |
563 | uint64_t /*file_size*/) override { | |
7c673cae FG |
564 | ++count_; |
565 | return Status::OK(); | |
566 | } | |
567 | ||
494da23a | 568 | UserCollectedProperties GetReadableProperties() const override { |
7c673cae FG |
569 | return UserCollectedProperties{}; |
570 | } | |
571 | ||
572 | private: | |
573 | uint32_t count_ = 0; | |
574 | std::string prefix_; | |
575 | std::string name_; | |
576 | }; | |
577 | ||
578 | class SstFileWriterCollectorFactory : public TablePropertiesCollectorFactory { | |
579 | public: | |
580 | explicit SstFileWriterCollectorFactory(std::string prefix) | |
581 | : prefix_(prefix), num_created_(0) {} | |
494da23a | 582 | TablePropertiesCollector* CreateTablePropertiesCollector( |
11fdf7f2 | 583 | TablePropertiesCollectorFactory::Context /*context*/) override { |
7c673cae FG |
584 | num_created_++; |
585 | return new SstFileWriterCollector(prefix_); | |
586 | } | |
587 | const char* Name() const override { return "SstFileWriterCollectorFactory"; } | |
588 | ||
589 | std::string prefix_; | |
590 | uint32_t num_created_; | |
591 | }; | |
592 | ||
593 | TEST_F(ExternalSSTFileTest, AddList) { | |
594 | do { | |
595 | Options options = CurrentOptions(); | |
596 | ||
597 | auto abc_collector = std::make_shared<SstFileWriterCollectorFactory>("abc"); | |
598 | auto xyz_collector = std::make_shared<SstFileWriterCollectorFactory>("xyz"); | |
599 | ||
600 | options.table_properties_collector_factories.emplace_back(abc_collector); | |
601 | options.table_properties_collector_factories.emplace_back(xyz_collector); | |
602 | ||
603 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
604 | ||
605 | // file1.sst (0 => 99) | |
606 | std::string file1 = sst_files_dir_ + "file1.sst"; | |
607 | ASSERT_OK(sst_file_writer.Open(file1)); | |
608 | for (int k = 0; k < 100; k++) { | |
11fdf7f2 | 609 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); |
7c673cae FG |
610 | } |
611 | ExternalSstFileInfo file1_info; | |
612 | Status s = sst_file_writer.Finish(&file1_info); | |
613 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
614 | ASSERT_EQ(file1_info.file_path, file1); | |
615 | ASSERT_EQ(file1_info.num_entries, 100); | |
616 | ASSERT_EQ(file1_info.smallest_key, Key(0)); | |
617 | ASSERT_EQ(file1_info.largest_key, Key(99)); | |
618 | // sst_file_writer already finished, cannot add this value | |
11fdf7f2 | 619 | s = sst_file_writer.Put(Key(100), "bad_val"); |
7c673cae FG |
620 | ASSERT_FALSE(s.ok()) << s.ToString(); |
621 | ||
622 | // file2.sst (100 => 199) | |
623 | std::string file2 = sst_files_dir_ + "file2.sst"; | |
624 | ASSERT_OK(sst_file_writer.Open(file2)); | |
625 | for (int k = 100; k < 200; k++) { | |
11fdf7f2 | 626 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); |
7c673cae FG |
627 | } |
628 | // Cannot add this key because it's not after last added key | |
11fdf7f2 | 629 | s = sst_file_writer.Put(Key(99), "bad_val"); |
7c673cae FG |
630 | ASSERT_FALSE(s.ok()) << s.ToString(); |
631 | ExternalSstFileInfo file2_info; | |
632 | s = sst_file_writer.Finish(&file2_info); | |
633 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
634 | ASSERT_EQ(file2_info.file_path, file2); | |
635 | ASSERT_EQ(file2_info.num_entries, 100); | |
636 | ASSERT_EQ(file2_info.smallest_key, Key(100)); | |
637 | ASSERT_EQ(file2_info.largest_key, Key(199)); | |
638 | ||
639 | // file3.sst (195 => 199) | |
640 | // This file values overlap with file2 values | |
641 | std::string file3 = sst_files_dir_ + "file3.sst"; | |
642 | ASSERT_OK(sst_file_writer.Open(file3)); | |
643 | for (int k = 195; k < 200; k++) { | |
11fdf7f2 | 644 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); |
7c673cae FG |
645 | } |
646 | ExternalSstFileInfo file3_info; | |
647 | s = sst_file_writer.Finish(&file3_info); | |
648 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
649 | ASSERT_EQ(file3_info.file_path, file3); | |
650 | ASSERT_EQ(file3_info.num_entries, 5); | |
651 | ASSERT_EQ(file3_info.smallest_key, Key(195)); | |
652 | ASSERT_EQ(file3_info.largest_key, Key(199)); | |
653 | ||
654 | // file4.sst (30 => 39) | |
655 | // This file values overlap with file1 values | |
656 | std::string file4 = sst_files_dir_ + "file4.sst"; | |
657 | ASSERT_OK(sst_file_writer.Open(file4)); | |
658 | for (int k = 30; k < 40; k++) { | |
11fdf7f2 | 659 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); |
7c673cae FG |
660 | } |
661 | ExternalSstFileInfo file4_info; | |
662 | s = sst_file_writer.Finish(&file4_info); | |
663 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
664 | ASSERT_EQ(file4_info.file_path, file4); | |
665 | ASSERT_EQ(file4_info.num_entries, 10); | |
666 | ASSERT_EQ(file4_info.smallest_key, Key(30)); | |
667 | ASSERT_EQ(file4_info.largest_key, Key(39)); | |
668 | ||
669 | // file5.sst (200 => 299) | |
670 | std::string file5 = sst_files_dir_ + "file5.sst"; | |
671 | ASSERT_OK(sst_file_writer.Open(file5)); | |
672 | for (int k = 200; k < 300; k++) { | |
11fdf7f2 | 673 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); |
7c673cae FG |
674 | } |
675 | ExternalSstFileInfo file5_info; | |
676 | s = sst_file_writer.Finish(&file5_info); | |
677 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
678 | ASSERT_EQ(file5_info.file_path, file5); | |
679 | ASSERT_EQ(file5_info.num_entries, 100); | |
680 | ASSERT_EQ(file5_info.smallest_key, Key(200)); | |
681 | ASSERT_EQ(file5_info.largest_key, Key(299)); | |
682 | ||
11fdf7f2 TL |
683 | // file6.sst (delete 0 => 100) |
684 | std::string file6 = sst_files_dir_ + "file6.sst"; | |
685 | ASSERT_OK(sst_file_writer.Open(file6)); | |
686 | ASSERT_OK(sst_file_writer.DeleteRange(Key(0), Key(75))); | |
687 | ASSERT_OK(sst_file_writer.DeleteRange(Key(25), Key(100))); | |
688 | ExternalSstFileInfo file6_info; | |
689 | s = sst_file_writer.Finish(&file6_info); | |
690 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
691 | ASSERT_EQ(file6_info.file_path, file6); | |
692 | ASSERT_EQ(file6_info.num_entries, 0); | |
693 | ASSERT_EQ(file6_info.smallest_key, ""); | |
694 | ASSERT_EQ(file6_info.largest_key, ""); | |
695 | ASSERT_EQ(file6_info.num_range_del_entries, 2); | |
696 | ASSERT_EQ(file6_info.smallest_range_del_key, Key(0)); | |
697 | ASSERT_EQ(file6_info.largest_range_del_key, Key(100)); | |
698 | ||
f67539c2 | 699 | // file7.sst (delete 99 => 201) |
11fdf7f2 TL |
700 | std::string file7 = sst_files_dir_ + "file7.sst"; |
701 | ASSERT_OK(sst_file_writer.Open(file7)); | |
f67539c2 | 702 | ASSERT_OK(sst_file_writer.DeleteRange(Key(99), Key(201))); |
11fdf7f2 TL |
703 | ExternalSstFileInfo file7_info; |
704 | s = sst_file_writer.Finish(&file7_info); | |
705 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
706 | ASSERT_EQ(file7_info.file_path, file7); | |
707 | ASSERT_EQ(file7_info.num_entries, 0); | |
708 | ASSERT_EQ(file7_info.smallest_key, ""); | |
709 | ASSERT_EQ(file7_info.largest_key, ""); | |
710 | ASSERT_EQ(file7_info.num_range_del_entries, 1); | |
f67539c2 TL |
711 | ASSERT_EQ(file7_info.smallest_range_del_key, Key(99)); |
712 | ASSERT_EQ(file7_info.largest_range_del_key, Key(201)); | |
11fdf7f2 | 713 | |
7c673cae FG |
714 | // list 1 has internal key range conflict |
715 | std::vector<std::string> file_list0({file1, file2}); | |
716 | std::vector<std::string> file_list1({file3, file2, file1}); | |
717 | std::vector<std::string> file_list2({file5}); | |
718 | std::vector<std::string> file_list3({file3, file4}); | |
11fdf7f2 TL |
719 | std::vector<std::string> file_list4({file5, file7}); |
720 | std::vector<std::string> file_list5({file6, file7}); | |
7c673cae FG |
721 | |
722 | DestroyAndReopen(options); | |
723 | ||
11fdf7f2 | 724 | // These lists of files have key ranges that overlap with each other |
7c673cae FG |
725 | s = DeprecatedAddFile(file_list1); |
726 | ASSERT_FALSE(s.ok()) << s.ToString(); | |
f67539c2 | 727 | // Both of the following overlap on the range deletion tombstone. |
11fdf7f2 TL |
728 | s = DeprecatedAddFile(file_list4); |
729 | ASSERT_FALSE(s.ok()) << s.ToString(); | |
730 | s = DeprecatedAddFile(file_list5); | |
731 | ASSERT_FALSE(s.ok()) << s.ToString(); | |
7c673cae FG |
732 | |
733 | // Add files using file path list | |
734 | s = DeprecatedAddFile(file_list0); | |
735 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
736 | ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); | |
737 | for (int k = 0; k < 200; k++) { | |
738 | ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); | |
739 | } | |
740 | ||
741 | TablePropertiesCollection props; | |
742 | ASSERT_OK(db_->GetPropertiesOfAllTables(&props)); | |
743 | ASSERT_EQ(props.size(), 2); | |
744 | for (auto file_props : props) { | |
745 | auto user_props = file_props.second->user_collected_properties; | |
746 | ASSERT_EQ(user_props["abc_SstFileWriterCollector"], "YES"); | |
747 | ASSERT_EQ(user_props["xyz_SstFileWriterCollector"], "YES"); | |
748 | ASSERT_EQ(user_props["abc_Count"], "100"); | |
749 | ASSERT_EQ(user_props["xyz_Count"], "100"); | |
750 | } | |
751 | ||
752 | // Add file while holding a snapshot will fail | |
753 | const Snapshot* s1 = db_->GetSnapshot(); | |
754 | if (s1 != nullptr) { | |
755 | ASSERT_NOK(DeprecatedAddFile(file_list2)); | |
756 | db_->ReleaseSnapshot(s1); | |
757 | } | |
758 | // We can add the file after releaseing the snapshot | |
759 | ASSERT_OK(DeprecatedAddFile(file_list2)); | |
760 | ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); | |
761 | for (int k = 0; k < 300; k++) { | |
762 | ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); | |
763 | } | |
764 | ||
765 | ASSERT_OK(db_->GetPropertiesOfAllTables(&props)); | |
766 | ASSERT_EQ(props.size(), 3); | |
767 | for (auto file_props : props) { | |
768 | auto user_props = file_props.second->user_collected_properties; | |
769 | ASSERT_EQ(user_props["abc_SstFileWriterCollector"], "YES"); | |
770 | ASSERT_EQ(user_props["xyz_SstFileWriterCollector"], "YES"); | |
771 | ASSERT_EQ(user_props["abc_Count"], "100"); | |
772 | ASSERT_EQ(user_props["xyz_Count"], "100"); | |
773 | } | |
774 | ||
775 | // This file list has overlapping values with the existing data | |
776 | s = DeprecatedAddFile(file_list3); | |
777 | ASSERT_FALSE(s.ok()) << s.ToString(); | |
778 | ||
779 | // Overwrite values of keys divisible by 5 | |
780 | for (int k = 0; k < 200; k += 5) { | |
781 | ASSERT_OK(Put(Key(k), Key(k) + "_val_new")); | |
782 | } | |
783 | ASSERT_NE(db_->GetLatestSequenceNumber(), 0U); | |
784 | ||
785 | // Make sure values are correct before and after flush/compaction | |
786 | for (int i = 0; i < 2; i++) { | |
787 | for (int k = 0; k < 200; k++) { | |
788 | std::string value = Key(k) + "_val"; | |
789 | if (k % 5 == 0) { | |
790 | value += "_new"; | |
791 | } | |
792 | ASSERT_EQ(Get(Key(k)), value); | |
793 | } | |
794 | for (int k = 200; k < 300; k++) { | |
795 | std::string value = Key(k) + "_val"; | |
796 | ASSERT_EQ(Get(Key(k)), value); | |
797 | } | |
798 | ASSERT_OK(Flush()); | |
799 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
800 | } | |
801 | ||
802 | // Delete keys in range (200 => 299) | |
803 | for (int k = 200; k < 300; k++) { | |
804 | ASSERT_OK(Delete(Key(k))); | |
805 | } | |
806 | // We deleted range (200 => 299) but cannot add file5 because | |
807 | // of the range tombstones | |
808 | ASSERT_NOK(DeprecatedAddFile(file_list2)); | |
809 | ||
810 | // Compacting the DB will remove the tombstones | |
811 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
812 | ||
813 | // Now we can add the file | |
814 | ASSERT_OK(DeprecatedAddFile(file_list2)); | |
815 | ||
816 | // Verify values of file5 in DB | |
817 | for (int k = 200; k < 300; k++) { | |
818 | std::string value = Key(k) + "_val"; | |
819 | ASSERT_EQ(Get(Key(k)), value); | |
820 | } | |
821 | DestroyAndRecreateExternalSSTFilesDir(); | |
11fdf7f2 TL |
822 | } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction | |
823 | kRangeDelSkipConfigs)); | |
7c673cae FG |
824 | } |
825 | ||
826 | TEST_F(ExternalSSTFileTest, AddListAtomicity) { | |
827 | do { | |
828 | Options options = CurrentOptions(); | |
829 | ||
830 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
831 | ||
832 | // files[0].sst (0 => 99) | |
833 | // files[1].sst (100 => 199) | |
834 | // ... | |
835 | // file[8].sst (800 => 899) | |
836 | int n = 9; | |
837 | std::vector<std::string> files(n); | |
838 | std::vector<ExternalSstFileInfo> files_info(n); | |
839 | for (int i = 0; i < n; i++) { | |
840 | files[i] = sst_files_dir_ + "file" + std::to_string(i) + ".sst"; | |
841 | ASSERT_OK(sst_file_writer.Open(files[i])); | |
842 | for (int k = i * 100; k < (i + 1) * 100; k++) { | |
11fdf7f2 | 843 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); |
7c673cae FG |
844 | } |
845 | Status s = sst_file_writer.Finish(&files_info[i]); | |
846 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
847 | ASSERT_EQ(files_info[i].file_path, files[i]); | |
848 | ASSERT_EQ(files_info[i].num_entries, 100); | |
849 | ASSERT_EQ(files_info[i].smallest_key, Key(i * 100)); | |
850 | ASSERT_EQ(files_info[i].largest_key, Key((i + 1) * 100 - 1)); | |
851 | } | |
852 | files.push_back(sst_files_dir_ + "file" + std::to_string(n) + ".sst"); | |
853 | auto s = DeprecatedAddFile(files); | |
854 | ASSERT_NOK(s) << s.ToString(); | |
855 | for (int k = 0; k < n * 100; k++) { | |
856 | ASSERT_EQ("NOT_FOUND", Get(Key(k))); | |
857 | } | |
858 | files.pop_back(); | |
859 | ASSERT_OK(DeprecatedAddFile(files)); | |
860 | for (int k = 0; k < n * 100; k++) { | |
861 | std::string value = Key(k) + "_val"; | |
862 | ASSERT_EQ(Get(Key(k)), value); | |
863 | } | |
864 | DestroyAndRecreateExternalSSTFilesDir(); | |
865 | } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); | |
866 | } | |
867 | // This test reporduce a bug that can happen in some cases if the DB started | |
868 | // purging obsolete files when we are adding an external sst file. | |
869 | // This situation may result in deleting the file while it's being added. | |
870 | TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) { | |
871 | Options options = CurrentOptions(); | |
872 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
873 | ||
874 | // file1.sst (0 => 500) | |
875 | std::string sst_file_path = sst_files_dir_ + "file1.sst"; | |
876 | Status s = sst_file_writer.Open(sst_file_path); | |
877 | ASSERT_OK(s); | |
878 | for (int i = 0; i < 500; i++) { | |
879 | std::string k = Key(i); | |
11fdf7f2 | 880 | s = sst_file_writer.Put(k, k + "_val"); |
7c673cae FG |
881 | ASSERT_OK(s); |
882 | } | |
883 | ||
884 | ExternalSstFileInfo sst_file_info; | |
885 | s = sst_file_writer.Finish(&sst_file_info); | |
886 | ASSERT_OK(s); | |
887 | ||
888 | options.delete_obsolete_files_period_micros = 0; | |
889 | options.disable_auto_compactions = true; | |
890 | DestroyAndReopen(options); | |
891 | ||
f67539c2 | 892 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
11fdf7f2 | 893 | "ExternalSstFileIngestionJob::Prepare:FileAdded", [&](void* /* arg */) { |
7c673cae FG |
894 | ASSERT_OK(Put("aaa", "bbb")); |
895 | ASSERT_OK(Flush()); | |
896 | ASSERT_OK(Put("aaa", "xxx")); | |
897 | ASSERT_OK(Flush()); | |
898 | db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
899 | }); | |
f67539c2 | 900 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
7c673cae FG |
901 | |
902 | s = DeprecatedAddFile({sst_file_path}); | |
903 | ASSERT_OK(s); | |
904 | ||
905 | for (int i = 0; i < 500; i++) { | |
906 | std::string k = Key(i); | |
907 | std::string v = k + "_val"; | |
908 | ASSERT_EQ(Get(k), v); | |
909 | } | |
910 | ||
f67539c2 | 911 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
7c673cae FG |
912 | } |
913 | ||
914 | TEST_F(ExternalSSTFileTest, SkipSnapshot) { | |
915 | Options options = CurrentOptions(); | |
916 | ||
917 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
918 | ||
919 | // file1.sst (0 => 99) | |
920 | std::string file1 = sst_files_dir_ + "file1.sst"; | |
921 | ASSERT_OK(sst_file_writer.Open(file1)); | |
922 | for (int k = 0; k < 100; k++) { | |
11fdf7f2 | 923 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); |
7c673cae FG |
924 | } |
925 | ExternalSstFileInfo file1_info; | |
926 | Status s = sst_file_writer.Finish(&file1_info); | |
927 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
928 | ASSERT_EQ(file1_info.file_path, file1); | |
929 | ASSERT_EQ(file1_info.num_entries, 100); | |
930 | ASSERT_EQ(file1_info.smallest_key, Key(0)); | |
931 | ASSERT_EQ(file1_info.largest_key, Key(99)); | |
932 | ||
933 | // file2.sst (100 => 299) | |
934 | std::string file2 = sst_files_dir_ + "file2.sst"; | |
935 | ASSERT_OK(sst_file_writer.Open(file2)); | |
936 | for (int k = 100; k < 300; k++) { | |
11fdf7f2 | 937 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); |
7c673cae FG |
938 | } |
939 | ExternalSstFileInfo file2_info; | |
940 | s = sst_file_writer.Finish(&file2_info); | |
941 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
942 | ASSERT_EQ(file2_info.file_path, file2); | |
943 | ASSERT_EQ(file2_info.num_entries, 200); | |
944 | ASSERT_EQ(file2_info.smallest_key, Key(100)); | |
945 | ASSERT_EQ(file2_info.largest_key, Key(299)); | |
946 | ||
947 | ASSERT_OK(DeprecatedAddFile({file1})); | |
948 | ||
949 | // Add file will fail when holding snapshot and use the default | |
950 | // skip_snapshot_check to false | |
951 | const Snapshot* s1 = db_->GetSnapshot(); | |
952 | if (s1 != nullptr) { | |
953 | ASSERT_NOK(DeprecatedAddFile({file2})); | |
954 | } | |
955 | ||
956 | // Add file will success when set skip_snapshot_check to true even db holding | |
957 | // snapshot | |
958 | if (s1 != nullptr) { | |
959 | ASSERT_OK(DeprecatedAddFile({file2}, false, true)); | |
960 | db_->ReleaseSnapshot(s1); | |
961 | } | |
962 | ||
963 | // file3.sst (300 => 399) | |
964 | std::string file3 = sst_files_dir_ + "file3.sst"; | |
965 | ASSERT_OK(sst_file_writer.Open(file3)); | |
966 | for (int k = 300; k < 400; k++) { | |
11fdf7f2 | 967 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); |
7c673cae FG |
968 | } |
969 | ExternalSstFileInfo file3_info; | |
970 | s = sst_file_writer.Finish(&file3_info); | |
971 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
972 | ASSERT_EQ(file3_info.file_path, file3); | |
973 | ASSERT_EQ(file3_info.num_entries, 100); | |
974 | ASSERT_EQ(file3_info.smallest_key, Key(300)); | |
975 | ASSERT_EQ(file3_info.largest_key, Key(399)); | |
976 | ||
977 | // check that we have change the old key | |
978 | ASSERT_EQ(Get(Key(300)), "NOT_FOUND"); | |
979 | const Snapshot* s2 = db_->GetSnapshot(); | |
980 | ASSERT_OK(DeprecatedAddFile({file3}, false, true)); | |
981 | ASSERT_EQ(Get(Key(300)), Key(300) + ("_val")); | |
982 | ASSERT_EQ(Get(Key(300), s2), Key(300) + ("_val")); | |
983 | ||
984 | db_->ReleaseSnapshot(s2); | |
985 | } | |
986 | ||
987 | TEST_F(ExternalSSTFileTest, MultiThreaded) { | |
988 | // Bulk load 10 files every file contain 1000 keys | |
989 | int num_files = 10; | |
990 | int keys_per_file = 1000; | |
991 | ||
992 | // Generate file names | |
993 | std::vector<std::string> file_names; | |
994 | for (int i = 0; i < num_files; i++) { | |
995 | std::string file_name = "file_" + ToString(i) + ".sst"; | |
996 | file_names.push_back(sst_files_dir_ + file_name); | |
997 | } | |
998 | ||
999 | do { | |
1000 | Options options = CurrentOptions(); | |
1001 | ||
1002 | std::atomic<int> thread_num(0); | |
1003 | std::function<void()> write_file_func = [&]() { | |
1004 | int file_idx = thread_num.fetch_add(1); | |
1005 | int range_start = file_idx * keys_per_file; | |
1006 | int range_end = range_start + keys_per_file; | |
1007 | ||
1008 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
1009 | ||
1010 | ASSERT_OK(sst_file_writer.Open(file_names[file_idx])); | |
1011 | ||
1012 | for (int k = range_start; k < range_end; k++) { | |
11fdf7f2 | 1013 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k))); |
7c673cae FG |
1014 | } |
1015 | ||
1016 | Status s = sst_file_writer.Finish(); | |
1017 | ASSERT_TRUE(s.ok()) << s.ToString(); | |
1018 | }; | |
1019 | // Write num_files files in parallel | |
1020 | std::vector<port::Thread> sst_writer_threads; | |
1021 | for (int i = 0; i < num_files; ++i) { | |
1022 | sst_writer_threads.emplace_back(write_file_func); | |
1023 | } | |
1024 | ||
1025 | for (auto& t : sst_writer_threads) { | |
1026 | t.join(); | |
1027 | } | |
1028 | ||
1029 | fprintf(stderr, "Wrote %d files (%d keys)\n", num_files, | |
1030 | num_files * keys_per_file); | |
1031 | ||
1032 | thread_num.store(0); | |
1033 | std::atomic<int> files_added(0); | |
1034 | // Thread 0 -> Load {f0,f1} | |
1035 | // Thread 1 -> Load {f0,f1} | |
1036 | // Thread 2 -> Load {f2,f3} | |
1037 | // Thread 3 -> Load {f2,f3} | |
1038 | // Thread 4 -> Load {f4,f5} | |
1039 | // Thread 5 -> Load {f4,f5} | |
1040 | // ... | |
1041 | std::function<void()> load_file_func = [&]() { | |
1042 | // We intentionally add every file twice, and assert that it was added | |
1043 | // only once and the other add failed | |
1044 | int thread_id = thread_num.fetch_add(1); | |
1045 | int file_idx = (thread_id / 2) * 2; | |
1046 | // sometimes we use copy, sometimes link .. the result should be the same | |
1047 | bool move_file = (thread_id % 3 == 0); | |
1048 | ||
1049 | std::vector<std::string> files_to_add; | |
1050 | ||
1051 | files_to_add = {file_names[file_idx]}; | |
1052 | if (static_cast<size_t>(file_idx + 1) < file_names.size()) { | |
1053 | files_to_add.push_back(file_names[file_idx + 1]); | |
1054 | } | |
1055 | ||
1056 | Status s = DeprecatedAddFile(files_to_add, move_file); | |
1057 | if (s.ok()) { | |
1058 | files_added += static_cast<int>(files_to_add.size()); | |
1059 | } | |
1060 | }; | |
1061 | ||
1062 | // Bulk load num_files files in parallel | |
1063 | std::vector<port::Thread> add_file_threads; | |
1064 | DestroyAndReopen(options); | |
1065 | for (int i = 0; i < num_files; ++i) { | |
1066 | add_file_threads.emplace_back(load_file_func); | |
1067 | } | |
1068 | ||
1069 | for (auto& t : add_file_threads) { | |
1070 | t.join(); | |
1071 | } | |
1072 | ASSERT_EQ(files_added.load(), num_files); | |
1073 | fprintf(stderr, "Loaded %d files (%d keys)\n", num_files, | |
1074 | num_files * keys_per_file); | |
1075 | ||
1076 | // Overwrite values of keys divisible by 100 | |
1077 | for (int k = 0; k < num_files * keys_per_file; k += 100) { | |
1078 | std::string key = Key(k); | |
1079 | Status s = Put(key, key + "_new"); | |
1080 | ASSERT_TRUE(s.ok()); | |
1081 | } | |
1082 | ||
1083 | for (int i = 0; i < 2; i++) { | |
1084 | // Make sure the values are correct before and after flush/compaction | |
1085 | for (int k = 0; k < num_files * keys_per_file; ++k) { | |
1086 | std::string key = Key(k); | |
1087 | std::string value = (k % 100 == 0) ? (key + "_new") : key; | |
1088 | ASSERT_EQ(Get(key), value); | |
1089 | } | |
1090 | ASSERT_OK(Flush()); | |
1091 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
1092 | } | |
1093 | ||
1094 | fprintf(stderr, "Verified %d values\n", num_files * keys_per_file); | |
1095 | DestroyAndRecreateExternalSSTFilesDir(); | |
1096 | } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); | |
1097 | } | |
1098 | ||
1099 | TEST_F(ExternalSSTFileTest, OverlappingRanges) { | |
1100 | Random rnd(301); | |
11fdf7f2 | 1101 | SequenceNumber assigned_seqno = 0; |
f67539c2 TL |
1102 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
1103 | "ExternalSstFileIngestionJob::Run", [&assigned_seqno](void* arg) { | |
1104 | ASSERT_TRUE(arg != nullptr); | |
1105 | assigned_seqno = *(static_cast<SequenceNumber*>(arg)); | |
1106 | }); | |
7c673cae | 1107 | bool need_flush = false; |
f67539c2 TL |
1108 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
1109 | "DBImpl::IngestExternalFile:NeedFlush", [&need_flush](void* arg) { | |
1110 | ASSERT_TRUE(arg != nullptr); | |
1111 | need_flush = *(static_cast<bool*>(arg)); | |
1112 | }); | |
7c673cae | 1113 | bool overlap_with_db = false; |
f67539c2 | 1114 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
7c673cae FG |
1115 | "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile", |
1116 | [&overlap_with_db](void* arg) { | |
1117 | ASSERT_TRUE(arg != nullptr); | |
1118 | overlap_with_db = *(static_cast<bool*>(arg)); | |
1119 | }); | |
f67539c2 | 1120 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
7c673cae FG |
1121 | do { |
1122 | Options options = CurrentOptions(); | |
1123 | DestroyAndReopen(options); | |
1124 | ||
1125 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
1126 | ||
1127 | printf("Option config = %d\n", option_config_); | |
1128 | std::vector<std::pair<int, int>> key_ranges; | |
11fdf7f2 | 1129 | for (int i = 0; i < 100; i++) { |
7c673cae FG |
1130 | int range_start = rnd.Uniform(20000); |
1131 | int keys_per_range = 10 + rnd.Uniform(41); | |
1132 | ||
1133 | key_ranges.emplace_back(range_start, range_start + keys_per_range); | |
1134 | } | |
1135 | ||
1136 | int memtable_add = 0; | |
1137 | int success_add_file = 0; | |
1138 | int failed_add_file = 0; | |
1139 | std::map<std::string, std::string> true_data; | |
1140 | for (size_t i = 0; i < key_ranges.size(); i++) { | |
1141 | int range_start = key_ranges[i].first; | |
1142 | int range_end = key_ranges[i].second; | |
1143 | ||
1144 | Status s; | |
1145 | std::string range_val = "range_" + ToString(i); | |
1146 | ||
1147 | // For 20% of ranges we use DB::Put, for 80% we use DB::AddFile | |
1148 | if (i && i % 5 == 0) { | |
1149 | // Use DB::Put to insert range (insert into memtable) | |
1150 | range_val += "_put"; | |
1151 | for (int k = range_start; k <= range_end; k++) { | |
1152 | s = Put(Key(k), range_val); | |
1153 | ASSERT_OK(s); | |
1154 | } | |
1155 | memtable_add++; | |
1156 | } else { | |
1157 | // Use DB::AddFile to insert range | |
1158 | range_val += "_add_file"; | |
1159 | ||
1160 | // Generate the file containing the range | |
1161 | std::string file_name = sst_files_dir_ + env_->GenerateUniqueId(); | |
1162 | ASSERT_OK(sst_file_writer.Open(file_name)); | |
1163 | for (int k = range_start; k <= range_end; k++) { | |
11fdf7f2 | 1164 | s = sst_file_writer.Put(Key(k), range_val); |
7c673cae FG |
1165 | ASSERT_OK(s); |
1166 | } | |
1167 | ExternalSstFileInfo file_info; | |
1168 | s = sst_file_writer.Finish(&file_info); | |
1169 | ASSERT_OK(s); | |
1170 | ||
1171 | // Insert the generated file | |
1172 | s = DeprecatedAddFile({file_name}); | |
1173 | auto it = true_data.lower_bound(Key(range_start)); | |
1174 | if (option_config_ != kUniversalCompaction && | |
11fdf7f2 TL |
1175 | option_config_ != kUniversalCompactionMultiLevel && |
1176 | option_config_ != kUniversalSubcompactions) { | |
7c673cae FG |
1177 | if (it != true_data.end() && it->first <= Key(range_end)) { |
1178 | // This range overlap with data already exist in DB | |
1179 | ASSERT_NOK(s); | |
1180 | failed_add_file++; | |
1181 | } else { | |
1182 | ASSERT_OK(s); | |
1183 | success_add_file++; | |
1184 | } | |
1185 | } else { | |
1186 | if ((it != true_data.end() && it->first <= Key(range_end)) || | |
11fdf7f2 | 1187 | need_flush || assigned_seqno > 0 || overlap_with_db) { |
7c673cae FG |
1188 | // This range overlap with data already exist in DB |
1189 | ASSERT_NOK(s); | |
1190 | failed_add_file++; | |
1191 | } else { | |
1192 | ASSERT_OK(s); | |
1193 | success_add_file++; | |
1194 | } | |
1195 | } | |
1196 | } | |
1197 | ||
1198 | if (s.ok()) { | |
1199 | // Update true_data map to include the new inserted data | |
1200 | for (int k = range_start; k <= range_end; k++) { | |
1201 | true_data[Key(k)] = range_val; | |
1202 | } | |
1203 | } | |
1204 | ||
1205 | // Flush / Compact the DB | |
1206 | if (i && i % 50 == 0) { | |
1207 | Flush(); | |
1208 | } | |
1209 | if (i && i % 75 == 0) { | |
1210 | db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
1211 | } | |
1212 | } | |
1213 | ||
1214 | printf("Total: %" ROCKSDB_PRIszt | |
1215 | " ranges\n" | |
1216 | "AddFile()|Success: %d ranges\n" | |
1217 | "AddFile()|RangeConflict: %d ranges\n" | |
1218 | "Put(): %d ranges\n", | |
1219 | key_ranges.size(), success_add_file, failed_add_file, memtable_add); | |
1220 | ||
1221 | // Verify the correctness of the data | |
1222 | for (const auto& kv : true_data) { | |
1223 | ASSERT_EQ(Get(kv.first), kv.second); | |
1224 | } | |
1225 | printf("keys/values verified\n"); | |
1226 | DestroyAndRecreateExternalSSTFilesDir(); | |
1227 | } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); | |
1228 | } | |
1229 | ||
494da23a | 1230 | TEST_P(ExternalSSTFileTest, PickedLevel) { |
7c673cae FG |
1231 | Options options = CurrentOptions(); |
1232 | options.disable_auto_compactions = false; | |
1233 | options.level0_file_num_compaction_trigger = 4; | |
1234 | options.num_levels = 4; | |
1235 | DestroyAndReopen(options); | |
1236 | ||
1237 | std::map<std::string, std::string> true_data; | |
1238 | ||
1239 | // File 0 will go to last level (L3) | |
494da23a TL |
1240 | ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, -1, false, false, true, |
1241 | false, false, &true_data)); | |
7c673cae FG |
1242 | EXPECT_EQ(FilesPerLevel(), "0,0,0,1"); |
1243 | ||
1244 | // File 1 will go to level L2 (since it overlap with file 0 in L3) | |
494da23a TL |
1245 | ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, -1, false, false, true, |
1246 | false, false, &true_data)); | |
7c673cae FG |
1247 | EXPECT_EQ(FilesPerLevel(), "0,0,1,1"); |
1248 | ||
f67539c2 | 1249 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
7c673cae FG |
1250 | {"ExternalSSTFileTest::PickedLevel:0", "BackgroundCallCompaction:0"}, |
1251 | {"DBImpl::BackgroundCompaction:Start", | |
1252 | "ExternalSSTFileTest::PickedLevel:1"}, | |
1253 | {"ExternalSSTFileTest::PickedLevel:2", | |
1254 | "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}, | |
1255 | }); | |
f67539c2 | 1256 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
7c673cae FG |
1257 | |
1258 | // Flush 4 files containing the same keys | |
1259 | for (int i = 0; i < 4; i++) { | |
1260 | ASSERT_OK(Put(Key(3), Key(3) + "put")); | |
1261 | ASSERT_OK(Put(Key(8), Key(8) + "put")); | |
1262 | true_data[Key(3)] = Key(3) + "put"; | |
1263 | true_data[Key(8)] = Key(8) + "put"; | |
1264 | ASSERT_OK(Flush()); | |
1265 | } | |
1266 | ||
1267 | // Wait for BackgroundCompaction() to be called | |
1268 | TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevel:0"); | |
1269 | TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevel:1"); | |
1270 | ||
1271 | EXPECT_EQ(FilesPerLevel(), "4,0,1,1"); | |
1272 | ||
1273 | // This file overlaps with file 0 (L3), file 1 (L2) and the | |
1274 | // output of compaction going to L1 | |
494da23a TL |
1275 | ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1, false, false, true, |
1276 | false, false, &true_data)); | |
7c673cae FG |
1277 | EXPECT_EQ(FilesPerLevel(), "5,0,1,1"); |
1278 | ||
1279 | // This file does not overlap with any file or with the running compaction | |
1280 | ASSERT_OK(GenerateAndAddExternalFile(options, {9000, 9001}, -1, false, false, | |
494da23a | 1281 | false, false, false, &true_data)); |
7c673cae FG |
1282 | EXPECT_EQ(FilesPerLevel(), "5,0,1,2"); |
1283 | ||
1284 | // Hold compaction from finishing | |
1285 | TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevel:2"); | |
1286 | ||
1287 | dbfull()->TEST_WaitForCompact(); | |
1288 | EXPECT_EQ(FilesPerLevel(), "1,1,1,2"); | |
1289 | ||
1290 | size_t kcnt = 0; | |
1291 | VerifyDBFromMap(true_data, &kcnt, false); | |
1292 | ||
f67539c2 | 1293 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
7c673cae FG |
1294 | } |
1295 | ||
1296 | TEST_F(ExternalSSTFileTest, PickedLevelBug) { | |
1297 | Options options = CurrentOptions(); | |
1298 | options.disable_auto_compactions = false; | |
1299 | options.level0_file_num_compaction_trigger = 3; | |
1300 | options.num_levels = 2; | |
1301 | DestroyAndReopen(options); | |
1302 | ||
1303 | std::vector<int> file_keys; | |
1304 | ||
1305 | // file #1 in L0 | |
1306 | file_keys = {0, 5, 7}; | |
1307 | for (int k : file_keys) { | |
1308 | ASSERT_OK(Put(Key(k), Key(k))); | |
1309 | } | |
1310 | ASSERT_OK(Flush()); | |
1311 | ||
1312 | // file #2 in L0 | |
1313 | file_keys = {4, 6, 8, 9}; | |
1314 | for (int k : file_keys) { | |
1315 | ASSERT_OK(Put(Key(k), Key(k))); | |
1316 | } | |
1317 | ASSERT_OK(Flush()); | |
1318 | ||
1319 | // We have 2 overlapping files in L0 | |
1320 | EXPECT_EQ(FilesPerLevel(), "2"); | |
1321 | ||
f67539c2 | 1322 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
7c673cae FG |
1323 | {{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"}, |
1324 | {"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"}, | |
1325 | {"ExternalSSTFileTest::PickedLevelBug:2", | |
1326 | "DBImpl::RunManualCompaction:0"}, | |
1327 | {"ExternalSSTFileTest::PickedLevelBug:3", | |
1328 | "DBImpl::RunManualCompaction:1"}}); | |
1329 | ||
1330 | std::atomic<bool> bg_compact_started(false); | |
f67539c2 | 1331 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
7c673cae | 1332 | "DBImpl::BackgroundCompaction:Start", |
11fdf7f2 | 1333 | [&](void* /*arg*/) { bg_compact_started.store(true); }); |
7c673cae | 1334 | |
f67539c2 | 1335 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
7c673cae FG |
1336 | |
1337 | // While writing the MANIFEST start a thread that will ask for compaction | |
f67539c2 | 1338 | ROCKSDB_NAMESPACE::port::Thread bg_compact([&]() { |
7c673cae FG |
1339 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
1340 | }); | |
1341 | TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2"); | |
1342 | ||
1343 | // Start a thread that will ingest a new file | |
f67539c2 | 1344 | ROCKSDB_NAMESPACE::port::Thread bg_addfile([&]() { |
7c673cae FG |
1345 | file_keys = {1, 2, 3}; |
1346 | ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1)); | |
1347 | }); | |
1348 | ||
1349 | // Wait for AddFile to start picking levels and writing MANIFEST | |
1350 | TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0"); | |
1351 | ||
1352 | TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3"); | |
1353 | ||
1354 | // We need to verify that no compactions can run while AddFile is | |
1355 | // ingesting the files into the levels it find suitable. So we will | |
1356 | // wait for 2 seconds to give a chance for compactions to run during | |
1357 | // this period, and then make sure that no compactions where able to run | |
1358 | env_->SleepForMicroseconds(1000000 * 2); | |
1359 | ASSERT_FALSE(bg_compact_started.load()); | |
1360 | ||
1361 | // Hold AddFile from finishing writing the MANIFEST | |
1362 | TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1"); | |
1363 | ||
1364 | bg_addfile.join(); | |
1365 | bg_compact.join(); | |
1366 | ||
1367 | dbfull()->TEST_WaitForCompact(); | |
1368 | ||
1369 | int total_keys = 0; | |
1370 | Iterator* iter = db_->NewIterator(ReadOptions()); | |
1371 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
1372 | ASSERT_OK(iter->status()); | |
1373 | total_keys++; | |
1374 | } | |
1375 | ASSERT_EQ(total_keys, 10); | |
1376 | ||
1377 | delete iter; | |
1378 | ||
f67539c2 | 1379 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
7c673cae FG |
1380 | } |
1381 | ||
11fdf7f2 TL |
1382 | TEST_F(ExternalSSTFileTest, IngestNonExistingFile) { |
1383 | Options options = CurrentOptions(); | |
1384 | DestroyAndReopen(options); | |
1385 | ||
1386 | Status s = db_->IngestExternalFile({"non_existing_file"}, | |
1387 | IngestExternalFileOptions()); | |
1388 | ASSERT_NOK(s); | |
1389 | ||
1390 | // Verify file deletion is not impacted (verify a bug fix) | |
1391 | ASSERT_OK(Put(Key(1), Key(1))); | |
1392 | ASSERT_OK(Put(Key(9), Key(9))); | |
1393 | ASSERT_OK(Flush()); | |
1394 | ||
1395 | ASSERT_OK(Put(Key(1), Key(1))); | |
1396 | ASSERT_OK(Put(Key(9), Key(9))); | |
1397 | ASSERT_OK(Flush()); | |
1398 | ||
1399 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
494da23a | 1400 | ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); |
11fdf7f2 TL |
1401 | |
1402 | // After full compaction, there should be only 1 file. | |
1403 | std::vector<std::string> files; | |
1404 | env_->GetChildren(dbname_, &files); | |
1405 | int num_sst_files = 0; | |
1406 | for (auto& f : files) { | |
1407 | uint64_t number; | |
1408 | FileType type; | |
1409 | if (ParseFileName(f, &number, &type) && type == kTableFile) { | |
1410 | num_sst_files++; | |
1411 | } | |
1412 | } | |
1413 | ASSERT_EQ(1, num_sst_files); | |
1414 | } | |
1415 | ||
7c673cae FG |
1416 | TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { |
1417 | Options options = CurrentOptions(); | |
1418 | options.disable_auto_compactions = false; | |
1419 | options.level0_file_num_compaction_trigger = 2; | |
1420 | options.num_levels = 2; | |
1421 | DestroyAndReopen(options); | |
1422 | ||
1423 | std::function<void()> bg_compact = [&]() { | |
1424 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
1425 | }; | |
1426 | ||
1427 | int range_id = 0; | |
1428 | std::vector<int> file_keys; | |
1429 | std::function<void()> bg_addfile = [&]() { | |
1430 | ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id)); | |
1431 | }; | |
1432 | ||
11fdf7f2 | 1433 | const int num_of_ranges = 1000; |
7c673cae | 1434 | std::vector<port::Thread> threads; |
11fdf7f2 | 1435 | while (range_id < num_of_ranges) { |
7c673cae FG |
1436 | int range_start = range_id * 10; |
1437 | int range_end = range_start + 10; | |
1438 | ||
1439 | file_keys.clear(); | |
1440 | for (int k = range_start + 1; k < range_end; k++) { | |
1441 | file_keys.push_back(k); | |
1442 | } | |
1443 | ASSERT_OK(Put(Key(range_start), Key(range_start))); | |
1444 | ASSERT_OK(Put(Key(range_end), Key(range_end))); | |
1445 | ASSERT_OK(Flush()); | |
1446 | ||
1447 | if (range_id % 10 == 0) { | |
1448 | threads.emplace_back(bg_compact); | |
1449 | } | |
1450 | threads.emplace_back(bg_addfile); | |
1451 | ||
1452 | for (auto& t : threads) { | |
1453 | t.join(); | |
1454 | } | |
1455 | threads.clear(); | |
1456 | ||
1457 | range_id++; | |
1458 | } | |
1459 | ||
11fdf7f2 | 1460 | for (int rid = 0; rid < num_of_ranges; rid++) { |
7c673cae FG |
1461 | int range_start = rid * 10; |
1462 | int range_end = range_start + 10; | |
1463 | ||
1464 | ASSERT_EQ(Get(Key(range_start)), Key(range_start)) << rid; | |
1465 | ASSERT_EQ(Get(Key(range_end)), Key(range_end)) << rid; | |
1466 | for (int k = range_start + 1; k < range_end; k++) { | |
1467 | std::string v = Key(k) + ToString(rid); | |
1468 | ASSERT_EQ(Get(Key(k)), v) << rid; | |
1469 | } | |
1470 | } | |
1471 | } | |
1472 | ||
1473 | TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { | |
1474 | Options options = CurrentOptions(); | |
1475 | options.disable_auto_compactions = false; | |
1476 | options.level0_file_num_compaction_trigger = 4; | |
1477 | options.level_compaction_dynamic_level_bytes = true; | |
1478 | options.num_levels = 4; | |
1479 | DestroyAndReopen(options); | |
1480 | std::map<std::string, std::string> true_data; | |
1481 | ||
f67539c2 | 1482 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
7c673cae FG |
1483 | {"ExternalSSTFileTest::PickedLevelDynamic:0", |
1484 | "BackgroundCallCompaction:0"}, | |
1485 | {"DBImpl::BackgroundCompaction:Start", | |
1486 | "ExternalSSTFileTest::PickedLevelDynamic:1"}, | |
1487 | {"ExternalSSTFileTest::PickedLevelDynamic:2", | |
1488 | "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}, | |
1489 | }); | |
f67539c2 | 1490 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
7c673cae FG |
1491 | |
1492 | // Flush 4 files containing the same keys | |
1493 | for (int i = 0; i < 4; i++) { | |
1494 | for (int k = 20; k <= 30; k++) { | |
1495 | ASSERT_OK(Put(Key(k), Key(k) + "put")); | |
1496 | true_data[Key(k)] = Key(k) + "put"; | |
1497 | } | |
1498 | for (int k = 50; k <= 60; k++) { | |
1499 | ASSERT_OK(Put(Key(k), Key(k) + "put")); | |
1500 | true_data[Key(k)] = Key(k) + "put"; | |
1501 | } | |
1502 | ASSERT_OK(Flush()); | |
1503 | } | |
1504 | ||
1505 | // Wait for BackgroundCompaction() to be called | |
1506 | TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:0"); | |
1507 | TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:1"); | |
1508 | ||
1509 | // This file overlaps with the output of the compaction (going to L3) | |
1510 | // so the file will be added to L0 since L3 is the base level | |
1511 | ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1, false, | |
494da23a | 1512 | false, true, false, false, &true_data)); |
7c673cae FG |
1513 | EXPECT_EQ(FilesPerLevel(), "5"); |
1514 | ||
1515 | // This file does not overlap with the current running compactiong | |
1516 | ASSERT_OK(GenerateAndAddExternalFile(options, {9000, 9001}, -1, false, false, | |
494da23a | 1517 | true, false, false, &true_data)); |
7c673cae FG |
1518 | EXPECT_EQ(FilesPerLevel(), "5,0,0,1"); |
1519 | ||
1520 | // Hold compaction from finishing | |
1521 | TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelDynamic:2"); | |
1522 | ||
1523 | // Output of the compaction will go to L3 | |
1524 | dbfull()->TEST_WaitForCompact(); | |
1525 | EXPECT_EQ(FilesPerLevel(), "1,0,0,2"); | |
1526 | ||
1527 | Close(); | |
1528 | options.disable_auto_compactions = true; | |
1529 | Reopen(options); | |
1530 | ||
1531 | ASSERT_OK(GenerateAndAddExternalFile(options, {1, 15, 19}, -1, false, false, | |
494da23a | 1532 | true, false, false, &true_data)); |
7c673cae FG |
1533 | ASSERT_EQ(FilesPerLevel(), "1,0,0,3"); |
1534 | ||
1535 | ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1001, 1002}, -1, false, | |
494da23a | 1536 | false, true, false, false, &true_data)); |
7c673cae FG |
1537 | ASSERT_EQ(FilesPerLevel(), "1,0,0,4"); |
1538 | ||
1539 | ASSERT_OK(GenerateAndAddExternalFile(options, {500, 600, 700}, -1, false, | |
494da23a | 1540 | false, true, false, false, &true_data)); |
7c673cae FG |
1541 | ASSERT_EQ(FilesPerLevel(), "1,0,0,5"); |
1542 | ||
1543 | // File 5 overlaps with file 2 (L3 / base level) | |
494da23a TL |
1544 | ASSERT_OK(GenerateAndAddExternalFile(options, {2, 10}, -1, false, false, true, |
1545 | false, false, &true_data)); | |
7c673cae FG |
1546 | ASSERT_EQ(FilesPerLevel(), "2,0,0,5"); |
1547 | ||
1548 | // File 6 overlaps with file 2 (L3 / base level) and file 5 (L0) | |
494da23a TL |
1549 | ASSERT_OK(GenerateAndAddExternalFile(options, {3, 9}, -1, false, false, true, |
1550 | false, false, &true_data)); | |
7c673cae FG |
1551 | ASSERT_EQ(FilesPerLevel(), "3,0,0,5"); |
1552 | ||
1553 | // Verify data in files | |
1554 | size_t kcnt = 0; | |
1555 | VerifyDBFromMap(true_data, &kcnt, false); | |
1556 | ||
1557 | // Write range [5 => 10] to L0 | |
1558 | for (int i = 5; i <= 10; i++) { | |
1559 | std::string k = Key(i); | |
1560 | std::string v = k + "put"; | |
1561 | ASSERT_OK(Put(k, v)); | |
1562 | true_data[k] = v; | |
1563 | } | |
1564 | ASSERT_OK(Flush()); | |
1565 | ASSERT_EQ(FilesPerLevel(), "4,0,0,5"); | |
1566 | ||
1567 | // File 7 overlaps with file 4 (L3) | |
1568 | ASSERT_OK(GenerateAndAddExternalFile(options, {650, 651, 652}, -1, false, | |
494da23a | 1569 | false, true, false, false, &true_data)); |
7c673cae FG |
1570 | ASSERT_EQ(FilesPerLevel(), "5,0,0,5"); |
1571 | ||
1572 | VerifyDBFromMap(true_data, &kcnt, false); | |
1573 | ||
f67539c2 | 1574 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
7c673cae FG |
1575 | } |
1576 | ||
1577 | TEST_F(ExternalSSTFileTest, AddExternalSstFileWithCustomCompartor) { | |
1578 | Options options = CurrentOptions(); | |
1579 | options.comparator = ReverseBytewiseComparator(); | |
1580 | DestroyAndReopen(options); | |
1581 | ||
1582 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
1583 | ||
1584 | // Generate files with these key ranges | |
1585 | // {14 -> 0} | |
1586 | // {24 -> 10} | |
1587 | // {34 -> 20} | |
1588 | // {44 -> 30} | |
1589 | // .. | |
1590 | std::vector<std::string> generated_files; | |
1591 | for (int i = 0; i < 10; i++) { | |
1592 | std::string file_name = sst_files_dir_ + env_->GenerateUniqueId(); | |
1593 | ASSERT_OK(sst_file_writer.Open(file_name)); | |
1594 | ||
1595 | int range_end = i * 10; | |
1596 | int range_start = range_end + 15; | |
1597 | for (int k = (range_start - 1); k >= range_end; k--) { | |
11fdf7f2 | 1598 | ASSERT_OK(sst_file_writer.Put(Key(k), Key(k))); |
7c673cae FG |
1599 | } |
1600 | ExternalSstFileInfo file_info; | |
1601 | ASSERT_OK(sst_file_writer.Finish(&file_info)); | |
1602 | generated_files.push_back(file_name); | |
1603 | } | |
1604 | ||
1605 | std::vector<std::string> in_files; | |
1606 | ||
1607 | // These 2nd and 3rd files overlap with each other | |
1608 | in_files = {generated_files[0], generated_files[4], generated_files[5], | |
1609 | generated_files[7]}; | |
1610 | ASSERT_NOK(DeprecatedAddFile(in_files)); | |
1611 | ||
1612 | // These 2 files dont overlap with each other | |
1613 | in_files = {generated_files[0], generated_files[2]}; | |
1614 | ASSERT_OK(DeprecatedAddFile(in_files)); | |
1615 | ||
1616 | // These 2 files dont overlap with each other but overlap with keys in DB | |
1617 | in_files = {generated_files[3], generated_files[7]}; | |
1618 | ASSERT_NOK(DeprecatedAddFile(in_files)); | |
1619 | ||
1620 | // Files dont overlap and dont overlap with DB key range | |
1621 | in_files = {generated_files[4], generated_files[6], generated_files[8]}; | |
1622 | ASSERT_OK(DeprecatedAddFile(in_files)); | |
1623 | ||
1624 | for (int i = 0; i < 100; i++) { | |
1625 | if (i % 20 <= 14) { | |
1626 | ASSERT_EQ(Get(Key(i)), Key(i)); | |
1627 | } else { | |
1628 | ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); | |
1629 | } | |
1630 | } | |
1631 | } | |
1632 | ||
1633 | TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) { | |
1634 | Options options = CurrentOptions(); | |
1635 | options.num_levels = 3; | |
1636 | options.IncreaseParallelism(20); | |
1637 | DestroyAndReopen(options); | |
1638 | ||
1639 | ASSERT_OK(GenerateAndAddExternalFile(options, {1, 4}, 1)); // L3 | |
1640 | ASSERT_OK(GenerateAndAddExternalFile(options, {2, 3}, 2)); // L2 | |
1641 | ||
1642 | ASSERT_OK(GenerateAndAddExternalFile(options, {10, 14}, 3)); // L3 | |
1643 | ASSERT_OK(GenerateAndAddExternalFile(options, {12, 13}, 4)); // L2 | |
1644 | ||
1645 | ASSERT_OK(GenerateAndAddExternalFile(options, {20, 24}, 5)); // L3 | |
1646 | ASSERT_OK(GenerateAndAddExternalFile(options, {22, 23}, 6)); // L2 | |
1647 | ||
f67539c2 | 1648 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
11fdf7f2 | 1649 | "CompactionJob::Run():Start", [&](void* /*arg*/) { |
7c673cae FG |
1650 | // fit in L3 but will overlap with compaction so will be added |
1651 | // to L2 but a compaction will trivially move it to L3 | |
1652 | // and break LSM consistency | |
11fdf7f2 TL |
1653 | static std::atomic<bool> called = {false}; |
1654 | if (!called) { | |
1655 | called = true; | |
1656 | ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); | |
1657 | ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); | |
1658 | } | |
7c673cae | 1659 | }); |
f67539c2 | 1660 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
7c673cae FG |
1661 | |
1662 | CompactRangeOptions cro; | |
1663 | cro.exclusive_manual_compaction = false; | |
1664 | ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); | |
1665 | ||
1666 | dbfull()->TEST_WaitForCompact(); | |
1667 | ||
f67539c2 | 1668 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
7c673cae FG |
1669 | } |
1670 | ||
1671 | TEST_F(ExternalSSTFileTest, CompactAddedFiles) { | |
1672 | Options options = CurrentOptions(); | |
1673 | options.num_levels = 3; | |
1674 | DestroyAndReopen(options); | |
1675 | ||
1676 | ASSERT_OK(GenerateAndAddExternalFile(options, {1, 10}, 1)); // L3 | |
1677 | ASSERT_OK(GenerateAndAddExternalFile(options, {2, 9}, 2)); // L2 | |
1678 | ASSERT_OK(GenerateAndAddExternalFile(options, {3, 8}, 3)); // L1 | |
1679 | ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, 4)); // L0 | |
1680 | ||
1681 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); | |
1682 | } | |
1683 | ||
1684 | TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) { | |
1685 | Options options = CurrentOptions(); | |
1686 | DestroyAndReopen(options); | |
1687 | std::string file_path = sst_files_dir_ + "/not_shared"; | |
1688 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
1689 | ||
1690 | std::string suffix(100, 'X'); | |
1691 | ASSERT_OK(sst_file_writer.Open(file_path)); | |
11fdf7f2 TL |
1692 | ASSERT_OK(sst_file_writer.Put("A" + suffix, "VAL")); |
1693 | ASSERT_OK(sst_file_writer.Put("BB" + suffix, "VAL")); | |
1694 | ASSERT_OK(sst_file_writer.Put("CC" + suffix, "VAL")); | |
1695 | ASSERT_OK(sst_file_writer.Put("CXD" + suffix, "VAL")); | |
1696 | ASSERT_OK(sst_file_writer.Put("CZZZ" + suffix, "VAL")); | |
1697 | ASSERT_OK(sst_file_writer.Put("ZAAAX" + suffix, "VAL")); | |
7c673cae FG |
1698 | |
1699 | ASSERT_OK(sst_file_writer.Finish()); | |
1700 | ASSERT_OK(DeprecatedAddFile({file_path})); | |
1701 | } | |
1702 | ||
f67539c2 TL |
1703 | TEST_F(ExternalSSTFileTest, WithUnorderedWrite) { |
1704 | SyncPoint::GetInstance()->DisableProcessing(); | |
1705 | SyncPoint::GetInstance()->LoadDependency( | |
1706 | {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL", | |
1707 | "ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL"}, | |
1708 | {"DBImpl::WaitForPendingWrites:BeforeBlock", | |
1709 | "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}}); | |
1710 | SyncPoint::GetInstance()->SetCallBack( | |
1711 | "DBImpl::IngestExternalFile:NeedFlush", [&](void* need_flush) { | |
1712 | ASSERT_TRUE(*reinterpret_cast<bool*>(need_flush)); | |
1713 | }); | |
1714 | ||
1715 | Options options = CurrentOptions(); | |
1716 | options.unordered_write = true; | |
1717 | DestroyAndReopen(options); | |
1718 | Put("foo", "v1"); | |
1719 | SyncPoint::GetInstance()->EnableProcessing(); | |
1720 | port::Thread writer([&]() { Put("bar", "v2"); }); | |
1721 | ||
1722 | TEST_SYNC_POINT("ExternalSSTFileTest::WithUnorderedWrite:WaitWriteWAL"); | |
1723 | ASSERT_OK(GenerateAndAddExternalFile(options, {{"bar", "v3"}}, -1, | |
1724 | true /* allow_global_seqno */)); | |
1725 | ASSERT_EQ(Get("bar"), "v3"); | |
1726 | ||
1727 | writer.join(); | |
1728 | SyncPoint::GetInstance()->DisableProcessing(); | |
1729 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
1730 | } | |
1731 | ||
494da23a | 1732 | TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) { |
7c673cae FG |
1733 | Options options = CurrentOptions(); |
1734 | options.IncreaseParallelism(20); | |
1735 | options.level0_slowdown_writes_trigger = 256; | |
1736 | options.level0_stop_writes_trigger = 256; | |
1737 | ||
494da23a TL |
1738 | bool write_global_seqno = std::get<0>(GetParam()); |
1739 | bool verify_checksums_before_ingest = std::get<1>(GetParam()); | |
7c673cae FG |
1740 | for (int iter = 0; iter < 2; iter++) { |
1741 | bool write_to_memtable = (iter == 0); | |
1742 | DestroyAndReopen(options); | |
1743 | ||
1744 | Random rnd(301); | |
1745 | std::map<std::string, std::string> true_data; | |
11fdf7f2 | 1746 | for (int i = 0; i < 500; i++) { |
7c673cae FG |
1747 | std::vector<std::pair<std::string, std::string>> random_data; |
1748 | for (int j = 0; j < 100; j++) { | |
1749 | std::string k; | |
1750 | std::string v; | |
1751 | test::RandomString(&rnd, rnd.Next() % 20, &k); | |
1752 | test::RandomString(&rnd, rnd.Next() % 50, &v); | |
1753 | random_data.emplace_back(k, v); | |
1754 | } | |
1755 | ||
1756 | if (write_to_memtable && rnd.OneIn(4)) { | |
1757 | // 25% of writes go through memtable | |
1758 | for (auto& entry : random_data) { | |
1759 | ASSERT_OK(Put(entry.first, entry.second)); | |
1760 | true_data[entry.first] = entry.second; | |
1761 | } | |
1762 | } else { | |
494da23a TL |
1763 | ASSERT_OK(GenerateAndAddExternalFile( |
1764 | options, random_data, -1, true, write_global_seqno, | |
1765 | verify_checksums_before_ingest, false, true, &true_data)); | |
7c673cae FG |
1766 | } |
1767 | } | |
1768 | size_t kcnt = 0; | |
1769 | VerifyDBFromMap(true_data, &kcnt, false); | |
1770 | db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
1771 | VerifyDBFromMap(true_data, &kcnt, false); | |
1772 | } | |
1773 | } | |
1774 | ||
494da23a | 1775 | TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoAssignedLevel) { |
7c673cae FG |
1776 | Options options = CurrentOptions(); |
1777 | options.num_levels = 5; | |
1778 | options.disable_auto_compactions = true; | |
1779 | DestroyAndReopen(options); | |
1780 | std::vector<std::pair<std::string, std::string>> file_data; | |
1781 | std::map<std::string, std::string> true_data; | |
1782 | ||
1783 | // Insert 100 -> 200 into the memtable | |
1784 | for (int i = 100; i <= 200; i++) { | |
1785 | ASSERT_OK(Put(Key(i), "memtable")); | |
1786 | true_data[Key(i)] = "memtable"; | |
1787 | } | |
1788 | ||
1789 | // Insert 0 -> 20 using AddFile | |
1790 | file_data.clear(); | |
1791 | for (int i = 0; i <= 20; i++) { | |
1792 | file_data.emplace_back(Key(i), "L4"); | |
1793 | } | |
494da23a TL |
1794 | bool write_global_seqno = std::get<0>(GetParam()); |
1795 | bool verify_checksums_before_ingest = std::get<1>(GetParam()); | |
1796 | ASSERT_OK(GenerateAndAddExternalFile( | |
1797 | options, file_data, -1, true, write_global_seqno, | |
1798 | verify_checksums_before_ingest, false, false, &true_data)); | |
7c673cae FG |
1799 | |
1800 | // This file dont overlap with anything in the DB, will go to L4 | |
1801 | ASSERT_EQ("0,0,0,0,1", FilesPerLevel()); | |
1802 | ||
1803 | // Insert 80 -> 130 using AddFile | |
1804 | file_data.clear(); | |
1805 | for (int i = 80; i <= 130; i++) { | |
1806 | file_data.emplace_back(Key(i), "L0"); | |
1807 | } | |
494da23a TL |
1808 | ASSERT_OK(GenerateAndAddExternalFile( |
1809 | options, file_data, -1, true, write_global_seqno, | |
1810 | verify_checksums_before_ingest, false, false, &true_data)); | |
7c673cae FG |
1811 | |
1812 | // This file overlap with the memtable, so it will flush it and add | |
1813 | // it self to L0 | |
1814 | ASSERT_EQ("2,0,0,0,1", FilesPerLevel()); | |
1815 | ||
1816 | // Insert 30 -> 50 using AddFile | |
1817 | file_data.clear(); | |
1818 | for (int i = 30; i <= 50; i++) { | |
1819 | file_data.emplace_back(Key(i), "L4"); | |
1820 | } | |
494da23a TL |
1821 | ASSERT_OK(GenerateAndAddExternalFile( |
1822 | options, file_data, -1, true, write_global_seqno, | |
1823 | verify_checksums_before_ingest, false, false, &true_data)); | |
7c673cae FG |
1824 | |
1825 | // This file dont overlap with anything in the DB and fit in L4 as well | |
1826 | ASSERT_EQ("2,0,0,0,2", FilesPerLevel()); | |
1827 | ||
1828 | // Insert 10 -> 40 using AddFile | |
1829 | file_data.clear(); | |
1830 | for (int i = 10; i <= 40; i++) { | |
1831 | file_data.emplace_back(Key(i), "L3"); | |
1832 | } | |
494da23a TL |
1833 | ASSERT_OK(GenerateAndAddExternalFile( |
1834 | options, file_data, -1, true, write_global_seqno, | |
1835 | verify_checksums_before_ingest, false, false, &true_data)); | |
7c673cae FG |
1836 | |
1837 | // This file overlap with files in L4, we will ingest it in L3 | |
1838 | ASSERT_EQ("2,0,0,1,2", FilesPerLevel()); | |
1839 | ||
1840 | size_t kcnt = 0; | |
1841 | VerifyDBFromMap(true_data, &kcnt, false); | |
1842 | } | |
1843 | ||
494da23a | 1844 | TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoMemtableFlush) { |
7c673cae FG |
1845 | Options options = CurrentOptions(); |
1846 | DestroyAndReopen(options); | |
1847 | uint64_t entries_in_memtable; | |
1848 | std::map<std::string, std::string> true_data; | |
1849 | ||
1850 | for (int k : {10, 20, 40, 80}) { | |
1851 | ASSERT_OK(Put(Key(k), "memtable")); | |
1852 | true_data[Key(k)] = "memtable"; | |
1853 | } | |
1854 | db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, | |
1855 | &entries_in_memtable); | |
1856 | ASSERT_GE(entries_in_memtable, 1); | |
1857 | ||
494da23a TL |
1858 | bool write_global_seqno = std::get<0>(GetParam()); |
1859 | bool verify_checksums_before_ingest = std::get<1>(GetParam()); | |
7c673cae | 1860 | // No need for flush |
494da23a TL |
1861 | ASSERT_OK(GenerateAndAddExternalFile( |
1862 | options, {90, 100, 110}, -1, true, write_global_seqno, | |
1863 | verify_checksums_before_ingest, false, false, &true_data)); | |
7c673cae FG |
1864 | db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, |
1865 | &entries_in_memtable); | |
1866 | ASSERT_GE(entries_in_memtable, 1); | |
1867 | ||
1868 | // This file will flush the memtable | |
494da23a TL |
1869 | ASSERT_OK(GenerateAndAddExternalFile( |
1870 | options, {19, 20, 21}, -1, true, write_global_seqno, | |
1871 | verify_checksums_before_ingest, false, false, &true_data)); | |
7c673cae FG |
1872 | db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, |
1873 | &entries_in_memtable); | |
1874 | ASSERT_EQ(entries_in_memtable, 0); | |
1875 | ||
1876 | for (int k : {200, 201, 205, 206}) { | |
1877 | ASSERT_OK(Put(Key(k), "memtable")); | |
1878 | true_data[Key(k)] = "memtable"; | |
1879 | } | |
1880 | db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, | |
1881 | &entries_in_memtable); | |
1882 | ASSERT_GE(entries_in_memtable, 1); | |
1883 | ||
1884 | // No need for flush, this file keys fit between the memtable keys | |
494da23a TL |
1885 | ASSERT_OK(GenerateAndAddExternalFile( |
1886 | options, {202, 203, 204}, -1, true, write_global_seqno, | |
1887 | verify_checksums_before_ingest, false, false, &true_data)); | |
7c673cae FG |
1888 | db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, |
1889 | &entries_in_memtable); | |
1890 | ASSERT_GE(entries_in_memtable, 1); | |
1891 | ||
1892 | // This file will flush the memtable | |
494da23a TL |
1893 | ASSERT_OK(GenerateAndAddExternalFile( |
1894 | options, {206, 207}, -1, true, write_global_seqno, | |
1895 | verify_checksums_before_ingest, false, false, &true_data)); | |
7c673cae FG |
1896 | db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, |
1897 | &entries_in_memtable); | |
1898 | ASSERT_EQ(entries_in_memtable, 0); | |
1899 | ||
1900 | size_t kcnt = 0; | |
1901 | VerifyDBFromMap(true_data, &kcnt, false); | |
1902 | } | |
1903 | ||
494da23a | 1904 | TEST_P(ExternalSSTFileTest, L0SortingIssue) { |
7c673cae FG |
1905 | Options options = CurrentOptions(); |
1906 | options.num_levels = 2; | |
1907 | DestroyAndReopen(options); | |
1908 | std::map<std::string, std::string> true_data; | |
1909 | ||
1910 | ASSERT_OK(Put(Key(1), "memtable")); | |
1911 | ASSERT_OK(Put(Key(10), "memtable")); | |
1912 | ||
494da23a TL |
1913 | bool write_global_seqno = std::get<0>(GetParam()); |
1914 | bool verify_checksums_before_ingest = std::get<1>(GetParam()); | |
7c673cae | 1915 | // No Flush needed, No global seqno needed, Ingest in L1 |
494da23a TL |
1916 | ASSERT_OK( |
1917 | GenerateAndAddExternalFile(options, {7, 8}, -1, true, write_global_seqno, | |
1918 | verify_checksums_before_ingest, false, false)); | |
7c673cae | 1919 | // No Flush needed, but need a global seqno, Ingest in L0 |
494da23a TL |
1920 | ASSERT_OK( |
1921 | GenerateAndAddExternalFile(options, {7, 8}, -1, true, write_global_seqno, | |
1922 | verify_checksums_before_ingest, false, false)); | |
7c673cae FG |
1923 | printf("%s\n", FilesPerLevel().c_str()); |
1924 | ||
1925 | // Overwrite what we added using external files | |
1926 | ASSERT_OK(Put(Key(7), "memtable")); | |
1927 | ASSERT_OK(Put(Key(8), "memtable")); | |
1928 | ||
1929 | // Read values from memtable | |
1930 | ASSERT_EQ(Get(Key(7)), "memtable"); | |
1931 | ASSERT_EQ(Get(Key(8)), "memtable"); | |
1932 | ||
1933 | // Flush and read from L0 | |
1934 | ASSERT_OK(Flush()); | |
1935 | printf("%s\n", FilesPerLevel().c_str()); | |
1936 | ASSERT_EQ(Get(Key(7)), "memtable"); | |
1937 | ASSERT_EQ(Get(Key(8)), "memtable"); | |
1938 | } | |
1939 | ||
1940 | TEST_F(ExternalSSTFileTest, CompactionDeadlock) { | |
1941 | Options options = CurrentOptions(); | |
1942 | options.num_levels = 2; | |
1943 | options.level0_file_num_compaction_trigger = 4; | |
1944 | options.level0_slowdown_writes_trigger = 4; | |
1945 | options.level0_stop_writes_trigger = 4; | |
1946 | DestroyAndReopen(options); | |
1947 | ||
1948 | // atomic conter of currently running bg threads | |
1949 | std::atomic<int> running_threads(0); | |
1950 | ||
f67539c2 | 1951 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ |
7c673cae FG |
1952 | {"DBImpl::DelayWrite:Wait", "ExternalSSTFileTest::DeadLock:0"}, |
1953 | {"ExternalSSTFileTest::DeadLock:1", "DBImpl::AddFile:Start"}, | |
1954 | {"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::DeadLock:2"}, | |
1955 | {"ExternalSSTFileTest::DeadLock:3", "BackgroundCallCompaction:0"}, | |
1956 | }); | |
f67539c2 | 1957 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
7c673cae FG |
1958 | |
1959 | // Start ingesting and extrnal file in the background | |
f67539c2 | 1960 | ROCKSDB_NAMESPACE::port::Thread bg_ingest_file([&]() { |
7c673cae FG |
1961 | running_threads += 1; |
1962 | ASSERT_OK(GenerateAndAddExternalFile(options, {5, 6})); | |
1963 | running_threads -= 1; | |
1964 | }); | |
1965 | ||
1966 | ASSERT_OK(Put(Key(1), "memtable")); | |
1967 | ASSERT_OK(Flush()); | |
1968 | ||
1969 | ASSERT_OK(Put(Key(2), "memtable")); | |
1970 | ASSERT_OK(Flush()); | |
1971 | ||
1972 | ASSERT_OK(Put(Key(3), "memtable")); | |
1973 | ASSERT_OK(Flush()); | |
1974 | ||
1975 | ASSERT_OK(Put(Key(4), "memtable")); | |
1976 | ASSERT_OK(Flush()); | |
1977 | ||
1978 | // This thread will try to insert into the memtable but since we have 4 L0 | |
1979 | // files this thread will be blocked and hold the writer thread | |
f67539c2 | 1980 | ROCKSDB_NAMESPACE::port::Thread bg_block_put([&]() { |
7c673cae FG |
1981 | running_threads += 1; |
1982 | ASSERT_OK(Put(Key(10), "memtable")); | |
1983 | running_threads -= 1; | |
1984 | }); | |
1985 | ||
1986 | // Make sure DelayWrite is called first | |
1987 | TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:0"); | |
1988 | ||
1989 | // `DBImpl::AddFile:Start` will wait until we be here | |
1990 | TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:1"); | |
1991 | ||
1992 | // Wait for IngestExternalFile() to start and aquire mutex | |
1993 | TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:2"); | |
1994 | ||
1995 | // Now let compaction start | |
1996 | TEST_SYNC_POINT("ExternalSSTFileTest::DeadLock:3"); | |
1997 | ||
1998 | // Wait for max 5 seconds, if we did not finish all bg threads | |
1999 | // then we hit the deadlock bug | |
2000 | for (int i = 0; i < 10; i++) { | |
2001 | if (running_threads.load() == 0) { | |
2002 | break; | |
2003 | } | |
2004 | env_->SleepForMicroseconds(500000); | |
2005 | } | |
2006 | ||
2007 | ASSERT_EQ(running_threads.load(), 0); | |
2008 | ||
2009 | bg_ingest_file.join(); | |
2010 | bg_block_put.join(); | |
2011 | } | |
2012 | ||
2013 | TEST_F(ExternalSSTFileTest, DirtyExit) { | |
2014 | Options options = CurrentOptions(); | |
2015 | DestroyAndReopen(options); | |
2016 | std::string file_path = sst_files_dir_ + "/dirty_exit"; | |
2017 | std::unique_ptr<SstFileWriter> sst_file_writer; | |
2018 | ||
2019 | // Destruct SstFileWriter without calling Finish() | |
2020 | sst_file_writer.reset(new SstFileWriter(EnvOptions(), options)); | |
2021 | ASSERT_OK(sst_file_writer->Open(file_path)); | |
2022 | sst_file_writer.reset(); | |
2023 | ||
2024 | // Destruct SstFileWriter with a failing Finish | |
2025 | sst_file_writer.reset(new SstFileWriter(EnvOptions(), options)); | |
2026 | ASSERT_OK(sst_file_writer->Open(file_path)); | |
2027 | ASSERT_NOK(sst_file_writer->Finish()); | |
2028 | } | |
2029 | ||
2030 | TEST_F(ExternalSSTFileTest, FileWithCFInfo) { | |
2031 | Options options = CurrentOptions(); | |
2032 | CreateAndReopenWithCF({"koko", "toto"}, options); | |
2033 | ||
2034 | SstFileWriter sfw_default(EnvOptions(), options, handles_[0]); | |
2035 | SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); | |
2036 | SstFileWriter sfw_cf2(EnvOptions(), options, handles_[2]); | |
2037 | SstFileWriter sfw_unknown(EnvOptions(), options); | |
2038 | ||
2039 | // default_cf.sst | |
2040 | const std::string cf_default_sst = sst_files_dir_ + "/default_cf.sst"; | |
2041 | ASSERT_OK(sfw_default.Open(cf_default_sst)); | |
11fdf7f2 TL |
2042 | ASSERT_OK(sfw_default.Put("K1", "V1")); |
2043 | ASSERT_OK(sfw_default.Put("K2", "V2")); | |
7c673cae FG |
2044 | ASSERT_OK(sfw_default.Finish()); |
2045 | ||
2046 | // cf1.sst | |
2047 | const std::string cf1_sst = sst_files_dir_ + "/cf1.sst"; | |
2048 | ASSERT_OK(sfw_cf1.Open(cf1_sst)); | |
11fdf7f2 TL |
2049 | ASSERT_OK(sfw_cf1.Put("K3", "V1")); |
2050 | ASSERT_OK(sfw_cf1.Put("K4", "V2")); | |
7c673cae FG |
2051 | ASSERT_OK(sfw_cf1.Finish()); |
2052 | ||
2053 | // cf_unknown.sst | |
2054 | const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst"; | |
2055 | ASSERT_OK(sfw_unknown.Open(unknown_sst)); | |
11fdf7f2 TL |
2056 | ASSERT_OK(sfw_unknown.Put("K5", "V1")); |
2057 | ASSERT_OK(sfw_unknown.Put("K6", "V2")); | |
7c673cae FG |
2058 | ASSERT_OK(sfw_unknown.Finish()); |
2059 | ||
2060 | IngestExternalFileOptions ifo; | |
2061 | ||
2062 | // SST CF dont match | |
2063 | ASSERT_NOK(db_->IngestExternalFile(handles_[0], {cf1_sst}, ifo)); | |
2064 | // SST CF dont match | |
2065 | ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf1_sst}, ifo)); | |
2066 | // SST CF match | |
2067 | ASSERT_OK(db_->IngestExternalFile(handles_[1], {cf1_sst}, ifo)); | |
2068 | ||
2069 | // SST CF dont match | |
2070 | ASSERT_NOK(db_->IngestExternalFile(handles_[1], {cf_default_sst}, ifo)); | |
2071 | // SST CF dont match | |
2072 | ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf_default_sst}, ifo)); | |
2073 | // SST CF match | |
2074 | ASSERT_OK(db_->IngestExternalFile(handles_[0], {cf_default_sst}, ifo)); | |
2075 | ||
2076 | // SST CF unknown | |
2077 | ASSERT_OK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo)); | |
2078 | // SST CF unknown | |
2079 | ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo)); | |
2080 | // SST CF unknown | |
2081 | ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo)); | |
2082 | ||
2083 | // Cannot ingest a file into a dropped CF | |
2084 | ASSERT_OK(db_->DropColumnFamily(handles_[1])); | |
2085 | ASSERT_NOK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo)); | |
2086 | ||
2087 | // CF was not dropped, ok to Ingest | |
2088 | ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo)); | |
2089 | } | |
2090 | ||
11fdf7f2 | 2091 | /* |
f67539c2 TL |
2092 | * Test and verify the functionality of ingestion_options.move_files and |
2093 | * ingestion_options.failed_move_fall_back_to_copy | |
11fdf7f2 | 2094 | */ |
f67539c2 TL |
2095 | TEST_P(ExternSSTFileLinkFailFallbackTest, LinkFailFallBackExternalSst) { |
2096 | const bool fail_link = std::get<0>(GetParam()); | |
2097 | const bool failed_move_fall_back_to_copy = std::get<1>(GetParam()); | |
2098 | test_env_->set_fail_link(fail_link); | |
2099 | const EnvOptions env_options; | |
2100 | DestroyAndReopen(options_); | |
11fdf7f2 | 2101 | const int kNumKeys = 10000; |
f67539c2 TL |
2102 | IngestExternalFileOptions ifo; |
2103 | ifo.move_files = true; | |
2104 | ifo.failed_move_fall_back_to_copy = failed_move_fall_back_to_copy; | |
11fdf7f2 TL |
2105 | |
2106 | std::string file_path = sst_files_dir_ + "file1.sst"; | |
2107 | // Create SstFileWriter for default column family | |
f67539c2 | 2108 | SstFileWriter sst_file_writer(env_options, options_); |
11fdf7f2 TL |
2109 | ASSERT_OK(sst_file_writer.Open(file_path)); |
2110 | for (int i = 0; i < kNumKeys; i++) { | |
2111 | ASSERT_OK(sst_file_writer.Put(Key(i), Key(i) + "_value")); | |
2112 | } | |
2113 | ASSERT_OK(sst_file_writer.Finish()); | |
2114 | uint64_t file_size = 0; | |
2115 | ASSERT_OK(env_->GetFileSize(file_path, &file_size)); | |
2116 | ||
f67539c2 TL |
2117 | bool copyfile = false; |
2118 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( | |
2119 | "ExternalSstFileIngestionJob::Prepare:CopyFile", | |
2120 | [&](void* /* arg */) { copyfile = true; }); | |
2121 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); | |
2122 | ||
2123 | const Status s = db_->IngestExternalFile({file_path}, ifo); | |
11fdf7f2 TL |
2124 | |
2125 | ColumnFamilyHandleImpl* cfh = | |
2126 | static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily()); | |
2127 | ColumnFamilyData* cfd = cfh->cfd(); | |
2128 | const InternalStats* internal_stats_ptr = cfd->internal_stats(); | |
2129 | const std::vector<InternalStats::CompactionStats>& comp_stats = | |
2130 | internal_stats_ptr->TEST_GetCompactionStats(); | |
2131 | uint64_t bytes_copied = 0; | |
2132 | uint64_t bytes_moved = 0; | |
2133 | for (const auto& stats : comp_stats) { | |
2134 | bytes_copied += stats.bytes_written; | |
2135 | bytes_moved += stats.bytes_moved; | |
2136 | } | |
f67539c2 TL |
2137 | |
2138 | if (!fail_link) { | |
2139 | // Link operation succeeds. External SST should be moved. | |
2140 | ASSERT_OK(s); | |
11fdf7f2 TL |
2141 | ASSERT_EQ(0, bytes_copied); |
2142 | ASSERT_EQ(file_size, bytes_moved); | |
f67539c2 | 2143 | ASSERT_FALSE(copyfile); |
11fdf7f2 | 2144 | } else { |
f67539c2 TL |
2145 | // Link operation fails. |
2146 | ASSERT_EQ(0, bytes_moved); | |
2147 | if (failed_move_fall_back_to_copy) { | |
2148 | ASSERT_OK(s); | |
2149 | // Copy file is true since a failed link falls back to copy file. | |
2150 | ASSERT_TRUE(copyfile); | |
2151 | ASSERT_EQ(file_size, bytes_copied); | |
2152 | } else { | |
2153 | ASSERT_TRUE(s.IsNotSupported()); | |
2154 | // Copy file is false since a failed link does not fall back to copy file. | |
2155 | ASSERT_FALSE(copyfile); | |
2156 | ASSERT_EQ(0, bytes_copied); | |
2157 | } | |
11fdf7f2 | 2158 | } |
f67539c2 | 2159 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
11fdf7f2 TL |
2160 | } |
2161 | ||
7c673cae FG |
2162 | class TestIngestExternalFileListener : public EventListener { |
2163 | public: | |
11fdf7f2 | 2164 | void OnExternalFileIngested(DB* /*db*/, |
7c673cae FG |
2165 | const ExternalFileIngestionInfo& info) override { |
2166 | ingested_files.push_back(info); | |
2167 | } | |
2168 | ||
2169 | std::vector<ExternalFileIngestionInfo> ingested_files; | |
2170 | }; | |
2171 | ||
494da23a | 2172 | TEST_P(ExternalSSTFileTest, IngestionListener) { |
7c673cae FG |
2173 | Options options = CurrentOptions(); |
2174 | TestIngestExternalFileListener* listener = | |
2175 | new TestIngestExternalFileListener(); | |
2176 | options.listeners.emplace_back(listener); | |
2177 | CreateAndReopenWithCF({"koko", "toto"}, options); | |
2178 | ||
494da23a TL |
2179 | bool write_global_seqno = std::get<0>(GetParam()); |
2180 | bool verify_checksums_before_ingest = std::get<1>(GetParam()); | |
7c673cae | 2181 | // Ingest into default cf |
494da23a TL |
2182 | ASSERT_OK(GenerateAndAddExternalFile( |
2183 | options, {1, 2}, -1, true, write_global_seqno, | |
2184 | verify_checksums_before_ingest, false, true, nullptr, handles_[0])); | |
7c673cae FG |
2185 | ASSERT_EQ(listener->ingested_files.size(), 1); |
2186 | ASSERT_EQ(listener->ingested_files.back().cf_name, "default"); | |
2187 | ASSERT_EQ(listener->ingested_files.back().global_seqno, 0); | |
2188 | ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id, | |
2189 | 0); | |
2190 | ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name, | |
2191 | "default"); | |
2192 | ||
2193 | // Ingest into cf1 | |
494da23a TL |
2194 | ASSERT_OK(GenerateAndAddExternalFile( |
2195 | options, {1, 2}, -1, true, write_global_seqno, | |
2196 | verify_checksums_before_ingest, false, true, nullptr, handles_[1])); | |
7c673cae FG |
2197 | ASSERT_EQ(listener->ingested_files.size(), 2); |
2198 | ASSERT_EQ(listener->ingested_files.back().cf_name, "koko"); | |
2199 | ASSERT_EQ(listener->ingested_files.back().global_seqno, 0); | |
2200 | ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id, | |
2201 | 1); | |
2202 | ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name, | |
2203 | "koko"); | |
2204 | ||
2205 | // Ingest into cf2 | |
494da23a TL |
2206 | ASSERT_OK(GenerateAndAddExternalFile( |
2207 | options, {1, 2}, -1, true, write_global_seqno, | |
2208 | verify_checksums_before_ingest, false, true, nullptr, handles_[2])); | |
7c673cae FG |
2209 | ASSERT_EQ(listener->ingested_files.size(), 3); |
2210 | ASSERT_EQ(listener->ingested_files.back().cf_name, "toto"); | |
2211 | ASSERT_EQ(listener->ingested_files.back().global_seqno, 0); | |
2212 | ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id, | |
2213 | 2); | |
2214 | ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name, | |
2215 | "toto"); | |
2216 | } | |
2217 | ||
2218 | TEST_F(ExternalSSTFileTest, SnapshotInconsistencyBug) { | |
2219 | Options options = CurrentOptions(); | |
2220 | DestroyAndReopen(options); | |
2221 | const int kNumKeys = 10000; | |
2222 | ||
2223 | // Insert keys using normal path and take a snapshot | |
2224 | for (int i = 0; i < kNumKeys; i++) { | |
2225 | ASSERT_OK(Put(Key(i), Key(i) + "_V1")); | |
2226 | } | |
2227 | const Snapshot* snap = db_->GetSnapshot(); | |
2228 | ||
2229 | // Overwrite all keys using IngestExternalFile | |
2230 | std::string sst_file_path = sst_files_dir_ + "file1.sst"; | |
2231 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
2232 | ASSERT_OK(sst_file_writer.Open(sst_file_path)); | |
2233 | for (int i = 0; i < kNumKeys; i++) { | |
11fdf7f2 | 2234 | ASSERT_OK(sst_file_writer.Put(Key(i), Key(i) + "_V2")); |
7c673cae FG |
2235 | } |
2236 | ASSERT_OK(sst_file_writer.Finish()); | |
2237 | ||
2238 | IngestExternalFileOptions ifo; | |
2239 | ifo.move_files = true; | |
2240 | ASSERT_OK(db_->IngestExternalFile({sst_file_path}, ifo)); | |
2241 | ||
2242 | for (int i = 0; i < kNumKeys; i++) { | |
2243 | ASSERT_EQ(Get(Key(i), snap), Key(i) + "_V1"); | |
2244 | ASSERT_EQ(Get(Key(i)), Key(i) + "_V2"); | |
2245 | } | |
2246 | ||
2247 | db_->ReleaseSnapshot(snap); | |
2248 | } | |
11fdf7f2 | 2249 | |
494da23a | 2250 | TEST_P(ExternalSSTFileTest, IngestBehind) { |
11fdf7f2 TL |
2251 | Options options = CurrentOptions(); |
2252 | options.compaction_style = kCompactionStyleUniversal; | |
2253 | options.num_levels = 3; | |
2254 | options.disable_auto_compactions = false; | |
2255 | DestroyAndReopen(options); | |
2256 | std::vector<std::pair<std::string, std::string>> file_data; | |
2257 | std::map<std::string, std::string> true_data; | |
2258 | ||
2259 | // Insert 100 -> 200 into the memtable | |
2260 | for (int i = 100; i <= 200; i++) { | |
2261 | ASSERT_OK(Put(Key(i), "memtable")); | |
2262 | true_data[Key(i)] = "memtable"; | |
2263 | } | |
2264 | ||
2265 | // Insert 100 -> 200 using IngestExternalFile | |
2266 | file_data.clear(); | |
2267 | for (int i = 0; i <= 20; i++) { | |
2268 | file_data.emplace_back(Key(i), "ingest_behind"); | |
2269 | } | |
2270 | ||
494da23a TL |
2271 | bool allow_global_seqno = true; |
2272 | bool ingest_behind = true; | |
2273 | bool write_global_seqno = std::get<0>(GetParam()); | |
2274 | bool verify_checksums_before_ingest = std::get<1>(GetParam()); | |
11fdf7f2 TL |
2275 | |
2276 | // Can't ingest behind since allow_ingest_behind isn't set to true | |
494da23a TL |
2277 | ASSERT_NOK(GenerateAndAddExternalFile( |
2278 | options, file_data, -1, allow_global_seqno, write_global_seqno, | |
2279 | verify_checksums_before_ingest, ingest_behind, false /*sort_data*/, | |
2280 | &true_data)); | |
11fdf7f2 TL |
2281 | |
2282 | options.allow_ingest_behind = true; | |
2283 | // check that we still can open the DB, as num_levels should be | |
2284 | // sanitized to 3 | |
2285 | options.num_levels = 2; | |
2286 | DestroyAndReopen(options); | |
2287 | ||
2288 | options.num_levels = 3; | |
2289 | DestroyAndReopen(options); | |
2290 | // Insert 100 -> 200 into the memtable | |
2291 | for (int i = 100; i <= 200; i++) { | |
2292 | ASSERT_OK(Put(Key(i), "memtable")); | |
2293 | true_data[Key(i)] = "memtable"; | |
2294 | } | |
2295 | db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
2296 | // Universal picker should go at second from the bottom level | |
2297 | ASSERT_EQ("0,1", FilesPerLevel()); | |
494da23a TL |
2298 | ASSERT_OK(GenerateAndAddExternalFile( |
2299 | options, file_data, -1, allow_global_seqno, write_global_seqno, | |
2300 | verify_checksums_before_ingest, true /*ingest_behind*/, | |
2301 | false /*sort_data*/, &true_data)); | |
11fdf7f2 TL |
2302 | ASSERT_EQ("0,1,1", FilesPerLevel()); |
2303 | // this time ingest should fail as the file doesn't fit to the bottom level | |
494da23a TL |
2304 | ASSERT_NOK(GenerateAndAddExternalFile( |
2305 | options, file_data, -1, allow_global_seqno, write_global_seqno, | |
2306 | verify_checksums_before_ingest, true /*ingest_behind*/, | |
2307 | false /*sort_data*/, &true_data)); | |
11fdf7f2 TL |
2308 | ASSERT_EQ("0,1,1", FilesPerLevel()); |
2309 | db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
2310 | // bottom level should be empty | |
2311 | ASSERT_EQ("0,1", FilesPerLevel()); | |
2312 | ||
2313 | size_t kcnt = 0; | |
2314 | VerifyDBFromMap(true_data, &kcnt, false); | |
2315 | } | |
2316 | ||
2317 | TEST_F(ExternalSSTFileTest, SkipBloomFilter) { | |
2318 | Options options = CurrentOptions(); | |
2319 | ||
2320 | BlockBasedTableOptions table_options; | |
2321 | table_options.filter_policy.reset(NewBloomFilterPolicy(10)); | |
2322 | table_options.cache_index_and_filter_blocks = true; | |
2323 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
2324 | ||
2325 | ||
2326 | // Create external SST file and include bloom filters | |
f67539c2 | 2327 | options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); |
11fdf7f2 TL |
2328 | DestroyAndReopen(options); |
2329 | { | |
2330 | std::string file_path = sst_files_dir_ + "sst_with_bloom.sst"; | |
2331 | SstFileWriter sst_file_writer(EnvOptions(), options); | |
2332 | ASSERT_OK(sst_file_writer.Open(file_path)); | |
2333 | ASSERT_OK(sst_file_writer.Put("Key1", "Value1")); | |
2334 | ASSERT_OK(sst_file_writer.Finish()); | |
2335 | ||
2336 | ASSERT_OK( | |
2337 | db_->IngestExternalFile({file_path}, IngestExternalFileOptions())); | |
2338 | ||
2339 | ASSERT_EQ(Get("Key1"), "Value1"); | |
2340 | ASSERT_GE( | |
2341 | options.statistics->getTickerCount(Tickers::BLOCK_CACHE_FILTER_ADD), 1); | |
2342 | } | |
2343 | ||
2344 | // Create external SST file but skip bloom filters | |
f67539c2 | 2345 | options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); |
11fdf7f2 TL |
2346 | DestroyAndReopen(options); |
2347 | { | |
2348 | std::string file_path = sst_files_dir_ + "sst_with_no_bloom.sst"; | |
2349 | SstFileWriter sst_file_writer(EnvOptions(), options, nullptr, true, | |
2350 | Env::IOPriority::IO_TOTAL, | |
2351 | true /* skip_filters */); | |
2352 | ASSERT_OK(sst_file_writer.Open(file_path)); | |
2353 | ASSERT_OK(sst_file_writer.Put("Key1", "Value1")); | |
2354 | ASSERT_OK(sst_file_writer.Finish()); | |
2355 | ||
2356 | ASSERT_OK( | |
2357 | db_->IngestExternalFile({file_path}, IngestExternalFileOptions())); | |
2358 | ||
2359 | ASSERT_EQ(Get("Key1"), "Value1"); | |
2360 | ASSERT_EQ( | |
2361 | options.statistics->getTickerCount(Tickers::BLOCK_CACHE_FILTER_ADD), 0); | |
2362 | } | |
2363 | } | |
2364 | ||
494da23a TL |
2365 | TEST_F(ExternalSSTFileTest, IngestFileWrittenWithCompressionDictionary) { |
2366 | if (!ZSTD_Supported()) { | |
2367 | return; | |
2368 | } | |
2369 | const int kNumEntries = 1 << 10; | |
2370 | const int kNumBytesPerEntry = 1 << 10; | |
2371 | Options options = CurrentOptions(); | |
2372 | options.compression = kZSTD; | |
2373 | options.compression_opts.max_dict_bytes = 1 << 14; // 16KB | |
2374 | options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB | |
2375 | DestroyAndReopen(options); | |
2376 | ||
2377 | std::atomic<int> num_compression_dicts(0); | |
f67539c2 | 2378 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
494da23a TL |
2379 | "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict", |
2380 | [&](void* /* arg */) { ++num_compression_dicts; }); | |
f67539c2 | 2381 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
494da23a TL |
2382 | |
2383 | Random rnd(301); | |
2384 | std::vector<std::pair<std::string, std::string>> random_data; | |
2385 | for (int i = 0; i < kNumEntries; i++) { | |
2386 | std::string val; | |
2387 | test::RandomString(&rnd, kNumBytesPerEntry, &val); | |
2388 | random_data.emplace_back(Key(i), std::move(val)); | |
2389 | } | |
2390 | ASSERT_OK(GenerateAndAddExternalFile(options, std::move(random_data))); | |
2391 | ASSERT_EQ(1, num_compression_dicts); | |
2392 | } | |
2393 | ||
2394 | TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) { | |
2395 | std::unique_ptr<FaultInjectionTestEnv> fault_injection_env( | |
2396 | new FaultInjectionTestEnv(env_)); | |
2397 | Options options = CurrentOptions(); | |
2398 | options.env = fault_injection_env.get(); | |
f67539c2 | 2399 | CreateAndReopenWithCF({"pikachu", "eevee"}, options); |
494da23a TL |
2400 | std::vector<ColumnFamilyHandle*> column_families; |
2401 | column_families.push_back(handles_[0]); | |
2402 | column_families.push_back(handles_[1]); | |
f67539c2 | 2403 | column_families.push_back(handles_[2]); |
494da23a TL |
2404 | std::vector<IngestExternalFileOptions> ifos(column_families.size()); |
2405 | for (auto& ifo : ifos) { | |
2406 | ifo.allow_global_seqno = true; // Always allow global_seqno | |
2407 | // May or may not write global_seqno | |
2408 | ifo.write_global_seqno = std::get<0>(GetParam()); | |
2409 | // Whether to verify checksums before ingestion | |
2410 | ifo.verify_checksums_before_ingest = std::get<1>(GetParam()); | |
2411 | } | |
2412 | std::vector<std::vector<std::pair<std::string, std::string>>> data; | |
2413 | data.push_back( | |
2414 | {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); | |
2415 | data.push_back( | |
2416 | {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); | |
f67539c2 TL |
2417 | data.push_back( |
2418 | {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")}); | |
2419 | ||
494da23a TL |
2420 | // Resize the true_data vector upon construction to avoid re-alloc |
2421 | std::vector<std::map<std::string, std::string>> true_data( | |
2422 | column_families.size()); | |
2423 | Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data, | |
2424 | -1, true, true_data); | |
2425 | ASSERT_OK(s); | |
2426 | Close(); | |
f67539c2 TL |
2427 | ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"}, |
2428 | options); | |
2429 | ASSERT_EQ(3, handles_.size()); | |
494da23a TL |
2430 | int cf = 0; |
2431 | for (const auto& verify_map : true_data) { | |
2432 | for (const auto& elem : verify_map) { | |
2433 | const std::string& key = elem.first; | |
2434 | const std::string& value = elem.second; | |
2435 | ASSERT_EQ(value, Get(cf, key)); | |
2436 | } | |
2437 | ++cf; | |
2438 | } | |
2439 | Close(); | |
2440 | Destroy(options, true /* delete_cf_paths */); | |
2441 | } | |
2442 | ||
2443 | TEST_P(ExternalSSTFileTest, | |
2444 | IngestFilesIntoMultipleColumnFamilies_NoMixedStateWithSnapshot) { | |
2445 | std::unique_ptr<FaultInjectionTestEnv> fault_injection_env( | |
2446 | new FaultInjectionTestEnv(env_)); | |
2447 | SyncPoint::GetInstance()->DisableProcessing(); | |
2448 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2449 | SyncPoint::GetInstance()->LoadDependency({ | |
2450 | {"DBImpl::IngestExternalFiles:InstallSVForFirstCF:0", | |
2451 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:" | |
2452 | "BeforeRead"}, | |
2453 | {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:" | |
2454 | "AfterRead", | |
2455 | "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"}, | |
2456 | }); | |
2457 | SyncPoint::GetInstance()->EnableProcessing(); | |
2458 | ||
2459 | Options options = CurrentOptions(); | |
2460 | options.env = fault_injection_env.get(); | |
f67539c2 | 2461 | CreateAndReopenWithCF({"pikachu", "eevee"}, options); |
494da23a TL |
2462 | const std::vector<std::map<std::string, std::string>> data_before_ingestion = |
2463 | {{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}}, | |
f67539c2 TL |
2464 | {{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}, |
2465 | {{"bar4", "bv4_0"}, {"bar5", "bv5_0"}, {"bar6", "bv6_0"}}}; | |
494da23a TL |
2466 | for (size_t i = 0; i != handles_.size(); ++i) { |
2467 | int cf = static_cast<int>(i); | |
2468 | const auto& orig_data = data_before_ingestion[i]; | |
2469 | for (const auto& kv : orig_data) { | |
2470 | ASSERT_OK(Put(cf, kv.first, kv.second)); | |
2471 | } | |
2472 | ASSERT_OK(Flush(cf)); | |
2473 | } | |
2474 | ||
2475 | std::vector<ColumnFamilyHandle*> column_families; | |
2476 | column_families.push_back(handles_[0]); | |
2477 | column_families.push_back(handles_[1]); | |
f67539c2 | 2478 | column_families.push_back(handles_[2]); |
494da23a TL |
2479 | std::vector<IngestExternalFileOptions> ifos(column_families.size()); |
2480 | for (auto& ifo : ifos) { | |
2481 | ifo.allow_global_seqno = true; // Always allow global_seqno | |
2482 | // May or may not write global_seqno | |
2483 | ifo.write_global_seqno = std::get<0>(GetParam()); | |
2484 | // Whether to verify checksums before ingestion | |
2485 | ifo.verify_checksums_before_ingest = std::get<1>(GetParam()); | |
2486 | } | |
2487 | std::vector<std::vector<std::pair<std::string, std::string>>> data; | |
2488 | data.push_back( | |
2489 | {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); | |
2490 | data.push_back( | |
2491 | {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); | |
f67539c2 TL |
2492 | data.push_back( |
2493 | {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")}); | |
494da23a TL |
2494 | // Resize the true_data vector upon construction to avoid re-alloc |
2495 | std::vector<std::map<std::string, std::string>> true_data( | |
2496 | column_families.size()); | |
2497 | // Take snapshot before ingestion starts | |
2498 | ReadOptions read_opts; | |
2499 | read_opts.total_order_seek = true; | |
2500 | read_opts.snapshot = dbfull()->GetSnapshot(); | |
2501 | std::vector<Iterator*> iters(handles_.size()); | |
2502 | ||
2503 | // Range scan checks first kv of each CF before ingestion starts. | |
2504 | for (size_t i = 0; i != handles_.size(); ++i) { | |
2505 | iters[i] = dbfull()->NewIterator(read_opts, handles_[i]); | |
2506 | iters[i]->SeekToFirst(); | |
2507 | ASSERT_TRUE(iters[i]->Valid()); | |
2508 | const std::string& key = iters[i]->key().ToString(); | |
2509 | const std::string& value = iters[i]->value().ToString(); | |
2510 | const std::map<std::string, std::string>& orig_data = | |
2511 | data_before_ingestion[i]; | |
2512 | std::map<std::string, std::string>::const_iterator it = orig_data.find(key); | |
2513 | ASSERT_NE(orig_data.end(), it); | |
2514 | ASSERT_EQ(it->second, value); | |
2515 | iters[i]->Next(); | |
2516 | } | |
2517 | port::Thread ingest_thread([&]() { | |
2518 | ASSERT_OK(GenerateAndAddExternalFiles(options, column_families, ifos, data, | |
2519 | -1, true, true_data)); | |
2520 | }); | |
2521 | TEST_SYNC_POINT( | |
2522 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:" | |
2523 | "BeforeRead"); | |
2524 | // Should see only data before ingestion | |
2525 | for (size_t i = 0; i != handles_.size(); ++i) { | |
2526 | const auto& orig_data = data_before_ingestion[i]; | |
2527 | for (; iters[i]->Valid(); iters[i]->Next()) { | |
2528 | const std::string& key = iters[i]->key().ToString(); | |
2529 | const std::string& value = iters[i]->value().ToString(); | |
2530 | std::map<std::string, std::string>::const_iterator it = | |
2531 | orig_data.find(key); | |
2532 | ASSERT_NE(orig_data.end(), it); | |
2533 | ASSERT_EQ(it->second, value); | |
2534 | } | |
2535 | } | |
2536 | TEST_SYNC_POINT( | |
2537 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:" | |
2538 | "AfterRead"); | |
2539 | ingest_thread.join(); | |
2540 | for (auto* iter : iters) { | |
2541 | delete iter; | |
2542 | } | |
2543 | iters.clear(); | |
2544 | dbfull()->ReleaseSnapshot(read_opts.snapshot); | |
2545 | ||
2546 | Close(); | |
f67539c2 TL |
2547 | ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"}, |
2548 | options); | |
494da23a TL |
2549 | // Should see consistent state after ingestion for all column families even |
2550 | // without snapshot. | |
f67539c2 | 2551 | ASSERT_EQ(3, handles_.size()); |
494da23a TL |
2552 | int cf = 0; |
2553 | for (const auto& verify_map : true_data) { | |
2554 | for (const auto& elem : verify_map) { | |
2555 | const std::string& key = elem.first; | |
2556 | const std::string& value = elem.second; | |
2557 | ASSERT_EQ(value, Get(cf, key)); | |
2558 | } | |
2559 | ++cf; | |
2560 | } | |
2561 | Close(); | |
2562 | Destroy(options, true /* delete_cf_paths */); | |
2563 | } | |
2564 | ||
2565 | TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) { | |
2566 | std::unique_ptr<FaultInjectionTestEnv> fault_injection_env( | |
2567 | new FaultInjectionTestEnv(env_)); | |
2568 | Options options = CurrentOptions(); | |
2569 | options.env = fault_injection_env.get(); | |
2570 | SyncPoint::GetInstance()->DisableProcessing(); | |
2571 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2572 | SyncPoint::GetInstance()->LoadDependency({ | |
2573 | {"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0", | |
2574 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:" | |
2575 | "0"}, | |
2576 | {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:" | |
2577 | "1", | |
2578 | "DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"}, | |
2579 | }); | |
2580 | SyncPoint::GetInstance()->EnableProcessing(); | |
f67539c2 | 2581 | CreateAndReopenWithCF({"pikachu", "eevee"}, options); |
494da23a TL |
2582 | std::vector<ColumnFamilyHandle*> column_families; |
2583 | column_families.push_back(handles_[0]); | |
2584 | column_families.push_back(handles_[1]); | |
f67539c2 | 2585 | column_families.push_back(handles_[2]); |
494da23a TL |
2586 | std::vector<IngestExternalFileOptions> ifos(column_families.size()); |
2587 | for (auto& ifo : ifos) { | |
2588 | ifo.allow_global_seqno = true; // Always allow global_seqno | |
2589 | // May or may not write global_seqno | |
2590 | ifo.write_global_seqno = std::get<0>(GetParam()); | |
2591 | // Whether to verify block checksums before ingest | |
2592 | ifo.verify_checksums_before_ingest = std::get<1>(GetParam()); | |
2593 | } | |
2594 | std::vector<std::vector<std::pair<std::string, std::string>>> data; | |
2595 | data.push_back( | |
2596 | {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); | |
2597 | data.push_back( | |
2598 | {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); | |
f67539c2 TL |
2599 | data.push_back( |
2600 | {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")}); | |
2601 | ||
494da23a TL |
2602 | // Resize the true_data vector upon construction to avoid re-alloc |
2603 | std::vector<std::map<std::string, std::string>> true_data( | |
2604 | column_families.size()); | |
2605 | port::Thread ingest_thread([&]() { | |
2606 | Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data, | |
2607 | -1, true, true_data); | |
2608 | ASSERT_NOK(s); | |
2609 | }); | |
2610 | TEST_SYNC_POINT( | |
2611 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:" | |
2612 | "0"); | |
2613 | fault_injection_env->SetFilesystemActive(false); | |
2614 | TEST_SYNC_POINT( | |
2615 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:" | |
2616 | "1"); | |
2617 | ingest_thread.join(); | |
2618 | ||
2619 | fault_injection_env->SetFilesystemActive(true); | |
2620 | Close(); | |
f67539c2 TL |
2621 | ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"}, |
2622 | options); | |
2623 | ASSERT_EQ(3, handles_.size()); | |
494da23a TL |
2624 | int cf = 0; |
2625 | for (const auto& verify_map : true_data) { | |
2626 | for (const auto& elem : verify_map) { | |
2627 | const std::string& key = elem.first; | |
2628 | ASSERT_EQ("NOT_FOUND", Get(cf, key)); | |
2629 | } | |
2630 | ++cf; | |
2631 | } | |
2632 | Close(); | |
2633 | Destroy(options, true /* delete_cf_paths */); | |
2634 | } | |
2635 | ||
2636 | TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) { | |
2637 | std::unique_ptr<FaultInjectionTestEnv> fault_injection_env( | |
2638 | new FaultInjectionTestEnv(env_)); | |
2639 | Options options = CurrentOptions(); | |
2640 | options.env = fault_injection_env.get(); | |
2641 | SyncPoint::GetInstance()->DisableProcessing(); | |
2642 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2643 | SyncPoint::GetInstance()->LoadDependency({ | |
2644 | {"DBImpl::IngestExternalFiles:BeforeJobsRun:0", | |
2645 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:" | |
2646 | "0"}, | |
2647 | {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:" | |
2648 | "1", | |
2649 | "DBImpl::IngestExternalFiles:BeforeJobsRun:1"}, | |
2650 | }); | |
2651 | SyncPoint::GetInstance()->EnableProcessing(); | |
f67539c2 | 2652 | CreateAndReopenWithCF({"pikachu", "eevee"}, options); |
494da23a TL |
2653 | std::vector<ColumnFamilyHandle*> column_families; |
2654 | column_families.push_back(handles_[0]); | |
2655 | column_families.push_back(handles_[1]); | |
f67539c2 | 2656 | column_families.push_back(handles_[2]); |
494da23a TL |
2657 | std::vector<IngestExternalFileOptions> ifos(column_families.size()); |
2658 | for (auto& ifo : ifos) { | |
2659 | ifo.allow_global_seqno = true; // Always allow global_seqno | |
2660 | // May or may not write global_seqno | |
2661 | ifo.write_global_seqno = std::get<0>(GetParam()); | |
2662 | // Whether to verify block checksums before ingestion | |
2663 | ifo.verify_checksums_before_ingest = std::get<1>(GetParam()); | |
2664 | } | |
2665 | std::vector<std::vector<std::pair<std::string, std::string>>> data; | |
2666 | data.push_back( | |
2667 | {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); | |
2668 | data.push_back( | |
2669 | {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); | |
f67539c2 TL |
2670 | data.push_back( |
2671 | {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")}); | |
494da23a TL |
2672 | // Resize the true_data vector upon construction to avoid re-alloc |
2673 | std::vector<std::map<std::string, std::string>> true_data( | |
2674 | column_families.size()); | |
2675 | port::Thread ingest_thread([&]() { | |
2676 | Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data, | |
2677 | -1, true, true_data); | |
2678 | ASSERT_NOK(s); | |
2679 | }); | |
2680 | TEST_SYNC_POINT( | |
2681 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:" | |
2682 | "0"); | |
2683 | fault_injection_env->SetFilesystemActive(false); | |
2684 | TEST_SYNC_POINT( | |
2685 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:" | |
2686 | "1"); | |
2687 | ingest_thread.join(); | |
2688 | ||
2689 | fault_injection_env->SetFilesystemActive(true); | |
2690 | Close(); | |
f67539c2 TL |
2691 | ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"}, |
2692 | options); | |
2693 | ASSERT_EQ(3, handles_.size()); | |
494da23a TL |
2694 | int cf = 0; |
2695 | for (const auto& verify_map : true_data) { | |
2696 | for (const auto& elem : verify_map) { | |
2697 | const std::string& key = elem.first; | |
2698 | ASSERT_EQ("NOT_FOUND", Get(cf, key)); | |
2699 | } | |
2700 | ++cf; | |
2701 | } | |
2702 | Close(); | |
2703 | Destroy(options, true /* delete_cf_paths */); | |
2704 | } | |
2705 | ||
2706 | TEST_P(ExternalSSTFileTest, | |
2707 | IngestFilesIntoMultipleColumnFamilies_PartialManifestWriteFail) { | |
2708 | std::unique_ptr<FaultInjectionTestEnv> fault_injection_env( | |
2709 | new FaultInjectionTestEnv(env_)); | |
2710 | Options options = CurrentOptions(); | |
2711 | options.env = fault_injection_env.get(); | |
2712 | ||
f67539c2 | 2713 | CreateAndReopenWithCF({"pikachu", "eevee"}, options); |
494da23a TL |
2714 | |
2715 | SyncPoint::GetInstance()->ClearTrace(); | |
2716 | SyncPoint::GetInstance()->DisableProcessing(); | |
2717 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2718 | SyncPoint::GetInstance()->LoadDependency({ | |
2719 | {"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0", | |
2720 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_" | |
2721 | "PartialManifestWriteFail:0"}, | |
2722 | {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_" | |
2723 | "PartialManifestWriteFail:1", | |
2724 | "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1"}, | |
2725 | }); | |
2726 | SyncPoint::GetInstance()->EnableProcessing(); | |
2727 | ||
2728 | std::vector<ColumnFamilyHandle*> column_families; | |
2729 | column_families.push_back(handles_[0]); | |
2730 | column_families.push_back(handles_[1]); | |
f67539c2 | 2731 | column_families.push_back(handles_[2]); |
494da23a TL |
2732 | std::vector<IngestExternalFileOptions> ifos(column_families.size()); |
2733 | for (auto& ifo : ifos) { | |
2734 | ifo.allow_global_seqno = true; // Always allow global_seqno | |
2735 | // May or may not write global_seqno | |
2736 | ifo.write_global_seqno = std::get<0>(GetParam()); | |
2737 | // Whether to verify block checksums before ingestion | |
2738 | ifo.verify_checksums_before_ingest = std::get<1>(GetParam()); | |
2739 | } | |
2740 | std::vector<std::vector<std::pair<std::string, std::string>>> data; | |
2741 | data.push_back( | |
2742 | {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); | |
2743 | data.push_back( | |
2744 | {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); | |
f67539c2 TL |
2745 | data.push_back( |
2746 | {std::make_pair("bar3", "bv3"), std::make_pair("bar4", "bv4")}); | |
494da23a TL |
2747 | // Resize the true_data vector upon construction to avoid re-alloc |
2748 | std::vector<std::map<std::string, std::string>> true_data( | |
2749 | column_families.size()); | |
2750 | port::Thread ingest_thread([&]() { | |
2751 | Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data, | |
2752 | -1, true, true_data); | |
2753 | ASSERT_NOK(s); | |
2754 | }); | |
2755 | TEST_SYNC_POINT( | |
2756 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_" | |
2757 | "PartialManifestWriteFail:0"); | |
2758 | fault_injection_env->SetFilesystemActive(false); | |
2759 | TEST_SYNC_POINT( | |
2760 | "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_" | |
2761 | "PartialManifestWriteFail:1"); | |
2762 | ingest_thread.join(); | |
2763 | ||
2764 | fault_injection_env->DropUnsyncedFileData(); | |
2765 | fault_injection_env->SetFilesystemActive(true); | |
2766 | Close(); | |
f67539c2 TL |
2767 | ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"}, |
2768 | options); | |
2769 | ASSERT_EQ(3, handles_.size()); | |
494da23a TL |
2770 | int cf = 0; |
2771 | for (const auto& verify_map : true_data) { | |
2772 | for (const auto& elem : verify_map) { | |
2773 | const std::string& key = elem.first; | |
2774 | ASSERT_EQ("NOT_FOUND", Get(cf, key)); | |
2775 | } | |
2776 | ++cf; | |
2777 | } | |
2778 | Close(); | |
2779 | Destroy(options, true /* delete_cf_paths */); | |
2780 | } | |
2781 | ||
f67539c2 TL |
2782 | TEST_P(ExternalSSTFileTest, IngestFilesTriggerFlushingWithTwoWriteQueue) { |
2783 | Options options = CurrentOptions(); | |
2784 | // Use large buffer to avoid memtable flush | |
2785 | options.write_buffer_size = 1024 * 1024; | |
2786 | options.two_write_queues = true; | |
2787 | DestroyAndReopen(options); | |
2788 | ||
2789 | ASSERT_OK(dbfull()->Put(WriteOptions(), "1000", "v1")); | |
2790 | ASSERT_OK(dbfull()->Put(WriteOptions(), "1001", "v1")); | |
2791 | ASSERT_OK(dbfull()->Put(WriteOptions(), "9999", "v1")); | |
2792 | ||
2793 | // Put one key which is overlap with keys in memtable. | |
2794 | // It will trigger flushing memtable and require this thread is | |
2795 | // currently at the front of the 2nd writer queue. We must make | |
2796 | // sure that it won't enter the 2nd writer queue for the second time. | |
2797 | std::vector<std::pair<std::string, std::string>> data; | |
2798 | data.push_back(std::make_pair("1001", "v2")); | |
2799 | GenerateAndAddExternalFile(options, data); | |
2800 | } | |
2801 | ||
494da23a TL |
2802 | INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest, |
2803 | testing::Values(std::make_tuple(false, false), | |
2804 | std::make_tuple(false, true), | |
2805 | std::make_tuple(true, false), | |
2806 | std::make_tuple(true, true))); | |
2807 | ||
f67539c2 TL |
2808 | INSTANTIATE_TEST_CASE_P(ExternSSTFileLinkFailFallbackTest, |
2809 | ExternSSTFileLinkFailFallbackTest, | |
2810 | testing::Values(std::make_tuple(true, false), | |
2811 | std::make_tuple(true, true), | |
2812 | std::make_tuple(false, false))); | |
2813 | ||
2814 | } // namespace ROCKSDB_NAMESPACE | |
7c673cae FG |
2815 | |
2816 | int main(int argc, char** argv) { | |
f67539c2 | 2817 | ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); |
7c673cae FG |
2818 | ::testing::InitGoogleTest(&argc, argv); |
2819 | return RUN_ALL_TESTS(); | |
2820 | } | |
2821 | ||
2822 | #else | |
2823 | #include <stdio.h> | |
2824 | ||
11fdf7f2 | 2825 | int main(int /*argc*/, char** /*argv*/) { |
7c673cae FG |
2826 | fprintf(stderr, |
2827 | "SKIPPED as External SST File Writer and Ingestion are not supported " | |
2828 | "in ROCKSDB_LITE\n"); | |
2829 | return 0; | |
2830 | } | |
2831 | ||
2832 | #endif // !ROCKSDB_LITE |