]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/import_column_family_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / import_column_family_test.cc
1 #ifndef ROCKSDB_LITE
2
3 #include <functional>
4
5 #include "db/db_test_util.h"
6 #include "port/port.h"
7 #include "port/stack_trace.h"
8 #include "rocksdb/sst_file_writer.h"
9 #include "test_util/testutil.h"
10 #include "util/random.h"
11
12 namespace ROCKSDB_NAMESPACE {
13
14 class ImportColumnFamilyTest : public DBTestBase {
15 public:
16 ImportColumnFamilyTest()
17 : DBTestBase("/import_column_family_test", /*env_do_fsync=*/true) {
18 sst_files_dir_ = dbname_ + "/sst_files/";
19 export_files_dir_ = test::PerThreadDBPath(env_, "export");
20 DestroyAndRecreateExternalSSTFilesDir();
21 import_cfh_ = nullptr;
22 import_cfh2_ = nullptr;
23 metadata_ptr_ = nullptr;
24 }
25
26 ~ImportColumnFamilyTest() {
27 if (import_cfh_) {
28 EXPECT_OK(db_->DropColumnFamily(import_cfh_));
29 EXPECT_OK(db_->DestroyColumnFamilyHandle(import_cfh_));
30 import_cfh_ = nullptr;
31 }
32 if (import_cfh2_) {
33 EXPECT_OK(db_->DropColumnFamily(import_cfh2_));
34 EXPECT_OK(db_->DestroyColumnFamilyHandle(import_cfh2_));
35 import_cfh2_ = nullptr;
36 }
37 if (metadata_ptr_) {
38 delete metadata_ptr_;
39 metadata_ptr_ = nullptr;
40 }
41 EXPECT_OK(DestroyDir(env_, sst_files_dir_));
42 EXPECT_OK(DestroyDir(env_, export_files_dir_));
43 }
44
45 void DestroyAndRecreateExternalSSTFilesDir() {
46 EXPECT_OK(DestroyDir(env_, sst_files_dir_));
47 EXPECT_OK(env_->CreateDir(sst_files_dir_));
48 EXPECT_OK(DestroyDir(env_, export_files_dir_));
49 }
50
51 LiveFileMetaData LiveFileMetaDataInit(std::string name, std::string path,
52 int level,
53 SequenceNumber smallest_seqno,
54 SequenceNumber largest_seqno) {
55 LiveFileMetaData metadata;
56 metadata.name = name;
57 metadata.db_path = path;
58 metadata.smallest_seqno = smallest_seqno;
59 metadata.largest_seqno = largest_seqno;
60 metadata.level = level;
61 return metadata;
62 }
63
64 protected:
65 std::string sst_files_dir_;
66 std::string export_files_dir_;
67 ColumnFamilyHandle* import_cfh_;
68 ColumnFamilyHandle* import_cfh2_;
69 ExportImportFilesMetaData* metadata_ptr_;
70 };
71
72 TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFiles) {
73 Options options = CurrentOptions();
74 CreateAndReopenWithCF({"koko"}, options);
75
76 SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
77 SstFileWriter sfw_unknown(EnvOptions(), options);
78
79 // cf1.sst
80 const std::string cf1_sst_name = "cf1.sst";
81 const std::string cf1_sst = sst_files_dir_ + cf1_sst_name;
82 ASSERT_OK(sfw_cf1.Open(cf1_sst));
83 ASSERT_OK(sfw_cf1.Put("K1", "V1"));
84 ASSERT_OK(sfw_cf1.Put("K2", "V2"));
85 ASSERT_OK(sfw_cf1.Finish());
86
87 // cf_unknown.sst
88 const std::string unknown_sst_name = "cf_unknown.sst";
89 const std::string unknown_sst = sst_files_dir_ + unknown_sst_name;
90 ASSERT_OK(sfw_unknown.Open(unknown_sst));
91 ASSERT_OK(sfw_unknown.Put("K3", "V1"));
92 ASSERT_OK(sfw_unknown.Put("K4", "V2"));
93 ASSERT_OK(sfw_unknown.Finish());
94
95 {
96 // Import sst file corresponding to cf1 onto a new cf and verify
97 ExportImportFilesMetaData metadata;
98 metadata.files.push_back(
99 LiveFileMetaDataInit(cf1_sst_name, sst_files_dir_, 0, 10, 19));
100 metadata.db_comparator_name = options.comparator->Name();
101
102 ASSERT_OK(db_->CreateColumnFamilyWithImport(
103 options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_));
104 ASSERT_NE(import_cfh_, nullptr);
105
106 std::string value;
107 db_->Get(ReadOptions(), import_cfh_, "K1", &value);
108 ASSERT_EQ(value, "V1");
109 db_->Get(ReadOptions(), import_cfh_, "K2", &value);
110 ASSERT_EQ(value, "V2");
111 ASSERT_OK(db_->DropColumnFamily(import_cfh_));
112 ASSERT_OK(db_->DestroyColumnFamilyHandle(import_cfh_));
113 import_cfh_ = nullptr;
114 }
115
116 {
117 // Import sst file corresponding to unknown cf onto a new cf and verify
118 ExportImportFilesMetaData metadata;
119 metadata.files.push_back(
120 LiveFileMetaDataInit(unknown_sst_name, sst_files_dir_, 0, 20, 29));
121 metadata.db_comparator_name = options.comparator->Name();
122
123 ASSERT_OK(db_->CreateColumnFamilyWithImport(
124 options, "yoyo", ImportColumnFamilyOptions(), metadata, &import_cfh_));
125 ASSERT_NE(import_cfh_, nullptr);
126
127 std::string value;
128 db_->Get(ReadOptions(), import_cfh_, "K3", &value);
129 ASSERT_EQ(value, "V1");
130 db_->Get(ReadOptions(), import_cfh_, "K4", &value);
131 ASSERT_EQ(value, "V2");
132 }
133 }
134
135 TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFilesWithOverlap) {
136 Options options = CurrentOptions();
137 CreateAndReopenWithCF({"koko"}, options);
138
139 SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
140
141 // file3.sst
142 const std::string file3_sst_name = "file3.sst";
143 const std::string file3_sst = sst_files_dir_ + file3_sst_name;
144 ASSERT_OK(sfw_cf1.Open(file3_sst));
145 for (int i = 0; i < 100; ++i) {
146 ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_val"));
147 }
148 ASSERT_OK(sfw_cf1.Finish());
149
150 // file2.sst
151 const std::string file2_sst_name = "file2.sst";
152 const std::string file2_sst = sst_files_dir_ + file2_sst_name;
153 ASSERT_OK(sfw_cf1.Open(file2_sst));
154 for (int i = 0; i < 100; i += 2) {
155 ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_overwrite1"));
156 }
157 ASSERT_OK(sfw_cf1.Finish());
158
159 // file1a.sst
160 const std::string file1a_sst_name = "file1a.sst";
161 const std::string file1a_sst = sst_files_dir_ + file1a_sst_name;
162 ASSERT_OK(sfw_cf1.Open(file1a_sst));
163 for (int i = 0; i < 52; i += 4) {
164 ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_overwrite2"));
165 }
166 ASSERT_OK(sfw_cf1.Finish());
167
168 // file1b.sst
169 const std::string file1b_sst_name = "file1b.sst";
170 const std::string file1b_sst = sst_files_dir_ + file1b_sst_name;
171 ASSERT_OK(sfw_cf1.Open(file1b_sst));
172 for (int i = 52; i < 100; i += 4) {
173 ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_overwrite2"));
174 }
175 ASSERT_OK(sfw_cf1.Finish());
176
177 // file0a.sst
178 const std::string file0a_sst_name = "file0a.sst";
179 const std::string file0a_sst = sst_files_dir_ + file0a_sst_name;
180 ASSERT_OK(sfw_cf1.Open(file0a_sst));
181 for (int i = 0; i < 100; i += 16) {
182 ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_overwrite3"));
183 }
184 ASSERT_OK(sfw_cf1.Finish());
185
186 // file0b.sst
187 const std::string file0b_sst_name = "file0b.sst";
188 const std::string file0b_sst = sst_files_dir_ + file0b_sst_name;
189 ASSERT_OK(sfw_cf1.Open(file0b_sst));
190 for (int i = 0; i < 100; i += 16) {
191 ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_overwrite4"));
192 }
193 ASSERT_OK(sfw_cf1.Finish());
194
195 // Import sst files and verify
196 ExportImportFilesMetaData metadata;
197 metadata.files.push_back(
198 LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 3, 10, 19));
199 metadata.files.push_back(
200 LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 2, 20, 29));
201 metadata.files.push_back(
202 LiveFileMetaDataInit(file1a_sst_name, sst_files_dir_, 1, 30, 34));
203 metadata.files.push_back(
204 LiveFileMetaDataInit(file1b_sst_name, sst_files_dir_, 1, 35, 39));
205 metadata.files.push_back(
206 LiveFileMetaDataInit(file0a_sst_name, sst_files_dir_, 0, 40, 49));
207 metadata.files.push_back(
208 LiveFileMetaDataInit(file0b_sst_name, sst_files_dir_, 0, 50, 59));
209 metadata.db_comparator_name = options.comparator->Name();
210
211 ASSERT_OK(db_->CreateColumnFamilyWithImport(
212 options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_));
213 ASSERT_NE(import_cfh_, nullptr);
214
215 for (int i = 0; i < 100; i++) {
216 std::string value;
217 db_->Get(ReadOptions(), import_cfh_, Key(i), &value);
218 if (i % 16 == 0) {
219 ASSERT_EQ(value, Key(i) + "_overwrite4");
220 } else if (i % 4 == 0) {
221 ASSERT_EQ(value, Key(i) + "_overwrite2");
222 } else if (i % 2 == 0) {
223 ASSERT_EQ(value, Key(i) + "_overwrite1");
224 } else {
225 ASSERT_EQ(value, Key(i) + "_val");
226 }
227 }
228
229 for (int i = 0; i < 100; i += 5) {
230 ASSERT_OK(
231 db_->Put(WriteOptions(), import_cfh_, Key(i), Key(i) + "_overwrite5"));
232 }
233
234 // Flush and check again
235 ASSERT_OK(db_->Flush(FlushOptions(), import_cfh_));
236 for (int i = 0; i < 100; i++) {
237 std::string value;
238 db_->Get(ReadOptions(), import_cfh_, Key(i), &value);
239 if (i % 5 == 0) {
240 ASSERT_EQ(value, Key(i) + "_overwrite5");
241 } else if (i % 16 == 0) {
242 ASSERT_EQ(value, Key(i) + "_overwrite4");
243 } else if (i % 4 == 0) {
244 ASSERT_EQ(value, Key(i) + "_overwrite2");
245 } else if (i % 2 == 0) {
246 ASSERT_EQ(value, Key(i) + "_overwrite1");
247 } else {
248 ASSERT_EQ(value, Key(i) + "_val");
249 }
250 }
251
252 // Compact and check again.
253 ASSERT_OK(
254 db_->CompactRange(CompactRangeOptions(), import_cfh_, nullptr, nullptr));
255 for (int i = 0; i < 100; i++) {
256 std::string value;
257 db_->Get(ReadOptions(), import_cfh_, Key(i), &value);
258 if (i % 5 == 0) {
259 ASSERT_EQ(value, Key(i) + "_overwrite5");
260 } else if (i % 16 == 0) {
261 ASSERT_EQ(value, Key(i) + "_overwrite4");
262 } else if (i % 4 == 0) {
263 ASSERT_EQ(value, Key(i) + "_overwrite2");
264 } else if (i % 2 == 0) {
265 ASSERT_EQ(value, Key(i) + "_overwrite1");
266 } else {
267 ASSERT_EQ(value, Key(i) + "_val");
268 }
269 }
270 }
271
272 TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherCF) {
273 Options options = CurrentOptions();
274 CreateAndReopenWithCF({"koko"}, options);
275
276 for (int i = 0; i < 100; ++i) {
277 ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
278 }
279 ASSERT_OK(Flush(1));
280
281 ASSERT_OK(
282 db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
283
284 // Overwrite the value in the same set of keys.
285 for (int i = 0; i < 100; ++i) {
286 ASSERT_OK(Put(1, Key(i), Key(i) + "_overwrite"));
287 }
288
289 // Flush to create L0 file.
290 ASSERT_OK(Flush(1));
291 for (int i = 0; i < 100; ++i) {
292 ASSERT_OK(Put(1, Key(i), Key(i) + "_overwrite2"));
293 }
294
295 // Flush again to create another L0 file. It should have higher sequencer.
296 ASSERT_OK(Flush(1));
297
298 Checkpoint* checkpoint;
299 ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
300 ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
301 &metadata_ptr_));
302 ASSERT_NE(metadata_ptr_, nullptr);
303 delete checkpoint;
304
305 ImportColumnFamilyOptions import_options;
306 import_options.move_files = false;
307 ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "toto", import_options,
308 *metadata_ptr_, &import_cfh_));
309 ASSERT_NE(import_cfh_, nullptr);
310
311 import_options.move_files = true;
312 ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "yoyo", import_options,
313 *metadata_ptr_, &import_cfh2_));
314 ASSERT_NE(import_cfh2_, nullptr);
315 delete metadata_ptr_;
316 metadata_ptr_ = NULL;
317
318 std::string value1, value2;
319
320 for (int i = 0; i < 100; ++i) {
321 db_->Get(ReadOptions(), import_cfh_, Key(i), &value1);
322 ASSERT_EQ(Get(1, Key(i)), value1);
323 }
324
325 for (int i = 0; i < 100; ++i) {
326 db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2);
327 ASSERT_EQ(Get(1, Key(i)), value2);
328 }
329
330 // Modify keys in cf1 and verify.
331 for (int i = 0; i < 25; i++) {
332 ASSERT_OK(db_->Delete(WriteOptions(), import_cfh_, Key(i)));
333 }
334 for (int i = 25; i < 50; i++) {
335 ASSERT_OK(
336 db_->Put(WriteOptions(), import_cfh_, Key(i), Key(i) + "_overwrite3"));
337 }
338 for (int i = 0; i < 25; ++i) {
339 ASSERT_TRUE(
340 db_->Get(ReadOptions(), import_cfh_, Key(i), &value1).IsNotFound());
341 }
342 for (int i = 25; i < 50; ++i) {
343 db_->Get(ReadOptions(), import_cfh_, Key(i), &value1);
344 ASSERT_EQ(Key(i) + "_overwrite3", value1);
345 }
346 for (int i = 50; i < 100; ++i) {
347 db_->Get(ReadOptions(), import_cfh_, Key(i), &value1);
348 ASSERT_EQ(Key(i) + "_overwrite2", value1);
349 }
350
351 for (int i = 0; i < 100; ++i) {
352 db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2);
353 ASSERT_EQ(Get(1, Key(i)), value2);
354 }
355
356 // Compact and check again.
357 ASSERT_OK(db_->Flush(FlushOptions(), import_cfh_));
358 ASSERT_OK(
359 db_->CompactRange(CompactRangeOptions(), import_cfh_, nullptr, nullptr));
360
361 for (int i = 0; i < 25; ++i) {
362 ASSERT_TRUE(
363 db_->Get(ReadOptions(), import_cfh_, Key(i), &value1).IsNotFound());
364 }
365 for (int i = 25; i < 50; ++i) {
366 db_->Get(ReadOptions(), import_cfh_, Key(i), &value1);
367 ASSERT_EQ(Key(i) + "_overwrite3", value1);
368 }
369 for (int i = 50; i < 100; ++i) {
370 db_->Get(ReadOptions(), import_cfh_, Key(i), &value1);
371 ASSERT_EQ(Key(i) + "_overwrite2", value1);
372 }
373
374 for (int i = 0; i < 100; ++i) {
375 db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2);
376 ASSERT_EQ(Get(1, Key(i)), value2);
377 }
378 }
379
380 TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherDB) {
381 Options options = CurrentOptions();
382 CreateAndReopenWithCF({"koko"}, options);
383
384 for (int i = 0; i < 100; ++i) {
385 ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
386 }
387 ASSERT_OK(Flush(1));
388
389 // Compact to create a L1 file.
390 ASSERT_OK(
391 db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
392
393 // Overwrite the value in the same set of keys.
394 for (int i = 0; i < 50; ++i) {
395 ASSERT_OK(Put(1, Key(i), Key(i) + "_overwrite"));
396 }
397
398 // Flush to create L0 file.
399 ASSERT_OK(Flush(1));
400
401 for (int i = 0; i < 25; ++i) {
402 ASSERT_OK(Put(1, Key(i), Key(i) + "_overwrite2"));
403 }
404
405 // Flush again to create another L0 file. It should have higher sequencer.
406 ASSERT_OK(Flush(1));
407
408 Checkpoint* checkpoint;
409 ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
410 ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
411 &metadata_ptr_));
412 ASSERT_NE(metadata_ptr_, nullptr);
413 delete checkpoint;
414
415 // Create a new db and import the files.
416 DB* db_copy;
417 ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
418 ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy));
419 ColumnFamilyHandle* cfh = nullptr;
420 ASSERT_OK(db_copy->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
421 ImportColumnFamilyOptions(),
422 *metadata_ptr_, &cfh));
423 ASSERT_NE(cfh, nullptr);
424
425 for (int i = 0; i < 100; ++i) {
426 std::string value;
427 db_copy->Get(ReadOptions(), cfh, Key(i), &value);
428 ASSERT_EQ(Get(1, Key(i)), value);
429 }
430 ASSERT_OK(db_copy->DropColumnFamily(cfh));
431 ASSERT_OK(db_copy->DestroyColumnFamilyHandle(cfh));
432 delete db_copy;
433 ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
434 }
435
436 TEST_F(ImportColumnFamilyTest, LevelFilesOverlappingAtEndpoints) {
437 // Imports a column family containing a level where two files overlap at their
438 // endpoints. "Overlap" means the largest user key in one file is the same as
439 // the smallest user key in the second file.
440 const int kFileBytes = 128 << 10; // 128KB
441 const int kValueBytes = 1 << 10; // 1KB
442 const int kNumFiles = 4;
443
444 Options options = CurrentOptions();
445 options.disable_auto_compactions = true;
446 options.num_levels = 2;
447 CreateAndReopenWithCF({"koko"}, options);
448
449 Random rnd(301);
450 // Every key is snapshot protected to ensure older versions will not be
451 // dropped during compaction.
452 std::vector<const Snapshot*> snapshots;
453 snapshots.reserve(kFileBytes / kValueBytes * kNumFiles);
454 for (int i = 0; i < kNumFiles; ++i) {
455 for (int j = 0; j < kFileBytes / kValueBytes; ++j) {
456 auto value = rnd.RandomString(kValueBytes);
457 ASSERT_OK(Put(1, "key", value));
458 snapshots.push_back(db_->GetSnapshot());
459 }
460 ASSERT_OK(Flush(1));
461 }
462
463 // Compact to create overlapping L1 files.
464 ASSERT_OK(
465 db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
466 ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
467
468 Checkpoint* checkpoint;
469 ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
470 ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
471 &metadata_ptr_));
472 ASSERT_NE(metadata_ptr_, nullptr);
473 delete checkpoint;
474
475 // Create a new db and import the files.
476 DB* db_copy;
477 ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
478 ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy));
479 ColumnFamilyHandle* cfh = nullptr;
480 ASSERT_OK(db_copy->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
481 ImportColumnFamilyOptions(),
482 *metadata_ptr_, &cfh));
483 ASSERT_NE(cfh, nullptr);
484
485 {
486 std::string value;
487 ASSERT_OK(db_copy->Get(ReadOptions(), cfh, "key", &value));
488 }
489 ASSERT_OK(db_copy->DropColumnFamily(cfh));
490 ASSERT_OK(db_copy->DestroyColumnFamilyHandle(cfh));
491 delete db_copy;
492 ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
493 for (const Snapshot* snapshot : snapshots) {
494 db_->ReleaseSnapshot(snapshot);
495 }
496 }
497
498 TEST_F(ImportColumnFamilyTest, ImportColumnFamilyNegativeTest) {
499 Options options = CurrentOptions();
500 CreateAndReopenWithCF({"koko"}, options);
501
502 {
503 // Create column family with existing cf name.
504 ExportImportFilesMetaData metadata;
505
506 ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "koko",
507 ImportColumnFamilyOptions(),
508 metadata, &import_cfh_),
509 Status::InvalidArgument("Column family already exists"));
510 ASSERT_EQ(import_cfh_, nullptr);
511 }
512
513 {
514 // Import with no files specified.
515 ExportImportFilesMetaData metadata;
516
517 ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
518 ImportColumnFamilyOptions(),
519 metadata, &import_cfh_),
520 Status::InvalidArgument("The list of files is empty"));
521 ASSERT_EQ(import_cfh_, nullptr);
522 }
523
524 {
525 // Import with overlapping keys in sst files.
526 ExportImportFilesMetaData metadata;
527 SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
528 const std::string file1_sst_name = "file1.sst";
529 const std::string file1_sst = sst_files_dir_ + file1_sst_name;
530 ASSERT_OK(sfw_cf1.Open(file1_sst));
531 ASSERT_OK(sfw_cf1.Put("K1", "V1"));
532 ASSERT_OK(sfw_cf1.Put("K2", "V2"));
533 ASSERT_OK(sfw_cf1.Finish());
534 const std::string file2_sst_name = "file2.sst";
535 const std::string file2_sst = sst_files_dir_ + file2_sst_name;
536 ASSERT_OK(sfw_cf1.Open(file2_sst));
537 ASSERT_OK(sfw_cf1.Put("K2", "V2"));
538 ASSERT_OK(sfw_cf1.Put("K3", "V3"));
539 ASSERT_OK(sfw_cf1.Finish());
540
541 metadata.files.push_back(
542 LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19));
543 metadata.files.push_back(
544 LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 1, 10, 19));
545 metadata.db_comparator_name = options.comparator->Name();
546
547 ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
548 ImportColumnFamilyOptions(),
549 metadata, &import_cfh_),
550 Status::InvalidArgument("Files have overlapping ranges"));
551 ASSERT_EQ(import_cfh_, nullptr);
552 }
553
554 {
555 // Import with a mismatching comparator, should fail with appropriate error.
556 ExportImportFilesMetaData metadata;
557 Options mismatch_options = CurrentOptions();
558 mismatch_options.comparator = ReverseBytewiseComparator();
559 SstFileWriter sfw_cf1(EnvOptions(), mismatch_options, handles_[1]);
560 const std::string file1_sst_name = "file1.sst";
561 const std::string file1_sst = sst_files_dir_ + file1_sst_name;
562 ASSERT_OK(sfw_cf1.Open(file1_sst));
563 ASSERT_OK(sfw_cf1.Put("K2", "V2"));
564 ASSERT_OK(sfw_cf1.Put("K1", "V1"));
565 ASSERT_OK(sfw_cf1.Finish());
566
567 metadata.files.push_back(
568 LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19));
569 metadata.db_comparator_name = mismatch_options.comparator->Name();
570
571 ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "coco",
572 ImportColumnFamilyOptions(),
573 metadata, &import_cfh_),
574 Status::InvalidArgument("Comparator name mismatch"));
575 ASSERT_EQ(import_cfh_, nullptr);
576 }
577
578 {
579 // Import with non existent sst file should fail with appropriate error
580 ExportImportFilesMetaData metadata;
581 SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
582 const std::string file1_sst_name = "file1.sst";
583 const std::string file1_sst = sst_files_dir_ + file1_sst_name;
584 ASSERT_OK(sfw_cf1.Open(file1_sst));
585 ASSERT_OK(sfw_cf1.Put("K1", "V1"));
586 ASSERT_OK(sfw_cf1.Put("K2", "V2"));
587 ASSERT_OK(sfw_cf1.Finish());
588 const std::string file3_sst_name = "file3.sst";
589
590 metadata.files.push_back(
591 LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19));
592 metadata.files.push_back(
593 LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 1, 10, 19));
594 metadata.db_comparator_name = options.comparator->Name();
595
596 ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
597 ImportColumnFamilyOptions(),
598 metadata, &import_cfh_),
599 Status::IOError("No such file or directory"));
600 ASSERT_EQ(import_cfh_, nullptr);
601
602 // Test successful import after a failure with the same CF name. Ensures
603 // there is no side effect with CF when there is a failed import
604 metadata.files.pop_back();
605 metadata.db_comparator_name = options.comparator->Name();
606
607 ASSERT_OK(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
608 ImportColumnFamilyOptions(),
609 metadata, &import_cfh_));
610 ASSERT_NE(import_cfh_, nullptr);
611 }
612 }
613
614 } // namespace ROCKSDB_NAMESPACE
615
616 int main(int argc, char** argv) {
617 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
618 ::testing::InitGoogleTest(&argc, argv);
619 return RUN_ALL_TESTS();
620 }
621
622 #else
623 #include <stdio.h>
624
625 int main(int /*argc*/, char** /*argv*/) {
626 fprintf(stderr,
627 "SKIPPED as External SST File Writer and Import are not supported "
628 "in ROCKSDB_LITE\n");
629 return 0;
630 }
631
632 #endif // !ROCKSDB_LITE