]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/blob_db/blob_db_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_db_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/blob_db/blob_db.h"
9
10 #include <algorithm>
11 #include <chrono>
12 #include <cstdlib>
13 #include <iomanip>
14 #include <map>
15 #include <memory>
16 #include <sstream>
17 #include <string>
18 #include <vector>
19
20 #include "db/blob/blob_index.h"
21 #include "db/db_test_util.h"
22 #include "env/composite_env_wrapper.h"
23 #include "file/file_util.h"
24 #include "file/sst_file_manager_impl.h"
25 #include "port/port.h"
26 #include "rocksdb/utilities/debug.h"
27 #include "test_util/mock_time_env.h"
28 #include "test_util/sync_point.h"
29 #include "test_util/testharness.h"
30 #include "util/random.h"
31 #include "util/string_util.h"
32 #include "utilities/blob_db/blob_db_impl.h"
33 #include "utilities/fault_injection_env.h"
34
35 namespace ROCKSDB_NAMESPACE {
36 namespace blob_db {
37
38 class BlobDBTest : public testing::Test {
39 public:
40 const int kMaxBlobSize = 1 << 14;
41
42 struct BlobIndexVersion {
43 BlobIndexVersion() = default;
44 BlobIndexVersion(std::string _user_key, uint64_t _file_number,
45 uint64_t _expiration, SequenceNumber _sequence,
46 ValueType _type)
47 : user_key(std::move(_user_key)),
48 file_number(_file_number),
49 expiration(_expiration),
50 sequence(_sequence),
51 type(_type) {}
52
53 std::string user_key;
54 uint64_t file_number = kInvalidBlobFileNumber;
55 uint64_t expiration = kNoExpiration;
56 SequenceNumber sequence = 0;
57 ValueType type = kTypeValue;
58 };
59
60 BlobDBTest()
61 : dbname_(test::PerThreadDBPath("blob_db_test")), blob_db_(nullptr) {
62 mock_clock_ = std::make_shared<MockSystemClock>(SystemClock::Default());
63 mock_env_.reset(new CompositeEnvWrapper(Env::Default(), mock_clock_));
64 fault_injection_env_.reset(new FaultInjectionTestEnv(Env::Default()));
65
66 Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
67 assert(s.ok());
68 }
69
70 ~BlobDBTest() override {
71 SyncPoint::GetInstance()->ClearAllCallBacks();
72 Destroy();
73 }
74
75 Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
76 Options options = Options()) {
77 options.create_if_missing = true;
78 if (options.env == mock_env_.get()) {
79 // Need to disable stats dumping and persisting which also use
80 // RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
81 // With mocked time, this can hang on some platforms (MacOS)
82 // because (a) on some platforms, pthread_cond_timedwait does not appear
83 // to release the lock for other threads to operate if the deadline time
84 // is already passed, and (b) TimedWait calls are currently a bad
85 // abstraction because the deadline parameter is usually computed from
86 // Env time, but is interpreted in real clock time.
87 options.stats_dump_period_sec = 0;
88 options.stats_persist_period_sec = 0;
89 }
90 return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
91 }
92
93 void Open(BlobDBOptions bdb_options = BlobDBOptions(),
94 Options options = Options()) {
95 ASSERT_OK(TryOpen(bdb_options, options));
96 }
97
98 void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
99 Options options = Options()) {
100 assert(blob_db_ != nullptr);
101 delete blob_db_;
102 blob_db_ = nullptr;
103 Open(bdb_options, options);
104 }
105
106 void Close() {
107 assert(blob_db_ != nullptr);
108 delete blob_db_;
109 blob_db_ = nullptr;
110 }
111
112 void Destroy() {
113 if (blob_db_) {
114 Options options = blob_db_->GetOptions();
115 BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
116 delete blob_db_;
117 blob_db_ = nullptr;
118 ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
119 }
120 }
121
122 BlobDBImpl *blob_db_impl() {
123 return reinterpret_cast<BlobDBImpl *>(blob_db_);
124 }
125
126 Status Put(const Slice &key, const Slice &value,
127 std::map<std::string, std::string> *data = nullptr) {
128 Status s = blob_db_->Put(WriteOptions(), key, value);
129 if (data != nullptr) {
130 (*data)[key.ToString()] = value.ToString();
131 }
132 return s;
133 }
134
135 void Delete(const std::string &key,
136 std::map<std::string, std::string> *data = nullptr) {
137 ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
138 if (data != nullptr) {
139 data->erase(key);
140 }
141 }
142
143 Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
144 std::map<std::string, std::string> *data = nullptr) {
145 Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
146 if (data != nullptr) {
147 (*data)[key.ToString()] = value.ToString();
148 }
149 return s;
150 }
151
152 Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
153 return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
154 }
155
156 void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
157 std::map<std::string, std::string> *data = nullptr) {
158 int len = rnd->Next() % kMaxBlobSize + 1;
159 std::string value = rnd->HumanReadableString(len);
160 ASSERT_OK(
161 blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
162 if (data != nullptr) {
163 (*data)[key] = value;
164 }
165 }
166
167 void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
168 std::map<std::string, std::string> *data = nullptr) {
169 int len = rnd->Next() % kMaxBlobSize + 1;
170 std::string value = rnd->HumanReadableString(len);
171 ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
172 expiration));
173 if (data != nullptr) {
174 (*data)[key] = value;
175 }
176 }
177
178 void PutRandom(const std::string &key, Random *rnd,
179 std::map<std::string, std::string> *data = nullptr) {
180 PutRandom(blob_db_, key, rnd, data);
181 }
182
183 void PutRandom(DB *db, const std::string &key, Random *rnd,
184 std::map<std::string, std::string> *data = nullptr) {
185 int len = rnd->Next() % kMaxBlobSize + 1;
186 std::string value = rnd->HumanReadableString(len);
187 ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
188 if (data != nullptr) {
189 (*data)[key] = value;
190 }
191 }
192
193 void PutRandomToWriteBatch(
194 const std::string &key, Random *rnd, WriteBatch *batch,
195 std::map<std::string, std::string> *data = nullptr) {
196 int len = rnd->Next() % kMaxBlobSize + 1;
197 std::string value = rnd->HumanReadableString(len);
198 ASSERT_OK(batch->Put(key, value));
199 if (data != nullptr) {
200 (*data)[key] = value;
201 }
202 }
203
204 // Verify blob db contain expected data and nothing more.
205 void VerifyDB(const std::map<std::string, std::string> &data) {
206 VerifyDB(blob_db_, data);
207 }
208
209 void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
210 // Verify normal Get
211 auto *cfh = db->DefaultColumnFamily();
212 for (auto &p : data) {
213 PinnableSlice value_slice;
214 ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
215 ASSERT_EQ(p.second, value_slice.ToString());
216 std::string value;
217 ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
218 ASSERT_EQ(p.second, value);
219 }
220
221 // Verify iterators
222 Iterator *iter = db->NewIterator(ReadOptions());
223 iter->SeekToFirst();
224 for (auto &p : data) {
225 ASSERT_TRUE(iter->Valid());
226 ASSERT_EQ(p.first, iter->key().ToString());
227 ASSERT_EQ(p.second, iter->value().ToString());
228 iter->Next();
229 }
230 ASSERT_FALSE(iter->Valid());
231 ASSERT_OK(iter->status());
232 delete iter;
233 }
234
235 void VerifyBaseDB(
236 const std::map<std::string, KeyVersion> &expected_versions) {
237 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
238 DB *db = blob_db_->GetRootDB();
239 const size_t kMaxKeys = 10000;
240 std::vector<KeyVersion> versions;
241 ASSERT_OK(GetAllKeyVersions(db, "", "", kMaxKeys, &versions));
242 ASSERT_EQ(expected_versions.size(), versions.size());
243 size_t i = 0;
244 for (auto &key_version : expected_versions) {
245 const KeyVersion &expected_version = key_version.second;
246 ASSERT_EQ(expected_version.user_key, versions[i].user_key);
247 ASSERT_EQ(expected_version.sequence, versions[i].sequence);
248 ASSERT_EQ(expected_version.type, versions[i].type);
249 if (versions[i].type == kTypeValue) {
250 ASSERT_EQ(expected_version.value, versions[i].value);
251 } else {
252 ASSERT_EQ(kTypeBlobIndex, versions[i].type);
253 PinnableSlice value;
254 ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
255 versions[i].value, &value));
256 ASSERT_EQ(expected_version.value, value.ToString());
257 }
258 i++;
259 }
260 }
261
262 void VerifyBaseDBBlobIndex(
263 const std::map<std::string, BlobIndexVersion> &expected_versions) {
264 const size_t kMaxKeys = 10000;
265 std::vector<KeyVersion> versions;
266 ASSERT_OK(
267 GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions));
268 ASSERT_EQ(versions.size(), expected_versions.size());
269
270 size_t i = 0;
271 for (const auto &expected_pair : expected_versions) {
272 const BlobIndexVersion &expected_version = expected_pair.second;
273
274 ASSERT_EQ(versions[i].user_key, expected_version.user_key);
275 ASSERT_EQ(versions[i].sequence, expected_version.sequence);
276 ASSERT_EQ(versions[i].type, expected_version.type);
277 if (versions[i].type != kTypeBlobIndex) {
278 ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
279 ASSERT_EQ(kNoExpiration, expected_version.expiration);
280
281 ++i;
282 continue;
283 }
284
285 BlobIndex blob_index;
286 ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
287
288 const uint64_t file_number = !blob_index.IsInlined()
289 ? blob_index.file_number()
290 : kInvalidBlobFileNumber;
291 ASSERT_EQ(file_number, expected_version.file_number);
292
293 const uint64_t expiration =
294 blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
295 ASSERT_EQ(expiration, expected_version.expiration);
296
297 ++i;
298 }
299 }
300
301 void InsertBlobs() {
302 WriteOptions wo;
303 std::string value;
304
305 Random rnd(301);
306 for (size_t i = 0; i < 100000; i++) {
307 uint64_t ttl = rnd.Next() % 86400;
308 PutRandomWithTTL("key" + std::to_string(i % 500), ttl, &rnd, nullptr);
309 }
310
311 for (size_t i = 0; i < 10; i++) {
312 Delete("key" + std::to_string(i % 500));
313 }
314 }
315
316 const std::string dbname_;
317 std::shared_ptr<MockSystemClock> mock_clock_;
318 std::unique_ptr<Env> mock_env_;
319 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
320 BlobDB *blob_db_;
321 }; // class BlobDBTest
322
323 TEST_F(BlobDBTest, Put) {
324 Random rnd(301);
325 BlobDBOptions bdb_options;
326 bdb_options.min_blob_size = 0;
327 bdb_options.disable_background_tasks = true;
328 Open(bdb_options);
329 std::map<std::string, std::string> data;
330 for (size_t i = 0; i < 100; i++) {
331 PutRandom("key" + std::to_string(i), &rnd, &data);
332 }
333 VerifyDB(data);
334 }
335
336 TEST_F(BlobDBTest, PutWithTTL) {
337 Random rnd(301);
338 Options options;
339 options.env = mock_env_.get();
340 BlobDBOptions bdb_options;
341 bdb_options.ttl_range_secs = 1000;
342 bdb_options.min_blob_size = 0;
343 bdb_options.blob_file_size = 256 * 1000 * 1000;
344 bdb_options.disable_background_tasks = true;
345 Open(bdb_options, options);
346 std::map<std::string, std::string> data;
347 mock_clock_->SetCurrentTime(50);
348 for (size_t i = 0; i < 100; i++) {
349 uint64_t ttl = rnd.Next() % 100;
350 PutRandomWithTTL("key" + std::to_string(i), ttl, &rnd,
351 (ttl <= 50 ? nullptr : &data));
352 }
353 mock_clock_->SetCurrentTime(100);
354 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
355 auto blob_files = bdb_impl->TEST_GetBlobFiles();
356 ASSERT_EQ(1, blob_files.size());
357 ASSERT_TRUE(blob_files[0]->HasTTL());
358 ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
359 VerifyDB(data);
360 }
361
362 TEST_F(BlobDBTest, PutUntil) {
363 Random rnd(301);
364 Options options;
365 options.env = mock_env_.get();
366 BlobDBOptions bdb_options;
367 bdb_options.ttl_range_secs = 1000;
368 bdb_options.min_blob_size = 0;
369 bdb_options.blob_file_size = 256 * 1000 * 1000;
370 bdb_options.disable_background_tasks = true;
371 Open(bdb_options, options);
372 std::map<std::string, std::string> data;
373 mock_clock_->SetCurrentTime(50);
374 for (size_t i = 0; i < 100; i++) {
375 uint64_t expiration = rnd.Next() % 100 + 50;
376 PutRandomUntil("key" + std::to_string(i), expiration, &rnd,
377 (expiration <= 100 ? nullptr : &data));
378 }
379 mock_clock_->SetCurrentTime(100);
380 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
381 auto blob_files = bdb_impl->TEST_GetBlobFiles();
382 ASSERT_EQ(1, blob_files.size());
383 ASSERT_TRUE(blob_files[0]->HasTTL());
384 ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
385 VerifyDB(data);
386 }
387
388 TEST_F(BlobDBTest, StackableDBGet) {
389 Random rnd(301);
390 BlobDBOptions bdb_options;
391 bdb_options.min_blob_size = 0;
392 bdb_options.disable_background_tasks = true;
393 Open(bdb_options);
394 std::map<std::string, std::string> data;
395 for (size_t i = 0; i < 100; i++) {
396 PutRandom("key" + std::to_string(i), &rnd, &data);
397 }
398 for (size_t i = 0; i < 100; i++) {
399 StackableDB *db = blob_db_;
400 ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
401 std::string key = "key" + std::to_string(i);
402 PinnableSlice pinnable_value;
403 ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
404 std::string string_value;
405 ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
406 ASSERT_EQ(string_value, pinnable_value.ToString());
407 ASSERT_EQ(string_value, data[key]);
408 }
409 }
410
411 TEST_F(BlobDBTest, GetExpiration) {
412 Options options;
413 options.env = mock_env_.get();
414 BlobDBOptions bdb_options;
415 bdb_options.disable_background_tasks = true;
416 mock_clock_->SetCurrentTime(100);
417 Open(bdb_options, options);
418 ASSERT_OK(Put("key1", "value1"));
419 ASSERT_OK(PutWithTTL("key2", "value2", 200));
420 PinnableSlice value;
421 uint64_t expiration;
422 ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
423 ASSERT_EQ("value1", value.ToString());
424 ASSERT_EQ(kNoExpiration, expiration);
425 ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
426 ASSERT_EQ("value2", value.ToString());
427 ASSERT_EQ(300 /* = 100 + 200 */, expiration);
428 }
429
430 TEST_F(BlobDBTest, GetIOError) {
431 Options options;
432 options.env = fault_injection_env_.get();
433 BlobDBOptions bdb_options;
434 bdb_options.min_blob_size = 0; // Make sure value write to blob file
435 bdb_options.disable_background_tasks = true;
436 Open(bdb_options, options);
437 ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
438 PinnableSlice value;
439 ASSERT_OK(Put("foo", "bar"));
440 fault_injection_env_->SetFilesystemActive(false, Status::IOError());
441 Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
442 ASSERT_TRUE(s.IsIOError());
443 // Reactivate file system to allow test to close DB.
444 fault_injection_env_->SetFilesystemActive(true);
445 }
446
447 TEST_F(BlobDBTest, PutIOError) {
448 Options options;
449 options.env = fault_injection_env_.get();
450 BlobDBOptions bdb_options;
451 bdb_options.min_blob_size = 0; // Make sure value write to blob file
452 bdb_options.disable_background_tasks = true;
453 Open(bdb_options, options);
454 fault_injection_env_->SetFilesystemActive(false, Status::IOError());
455 ASSERT_TRUE(Put("foo", "v1").IsIOError());
456 fault_injection_env_->SetFilesystemActive(true, Status::IOError());
457 ASSERT_OK(Put("bar", "v1"));
458 }
459
460 TEST_F(BlobDBTest, WriteBatch) {
461 Random rnd(301);
462 BlobDBOptions bdb_options;
463 bdb_options.min_blob_size = 0;
464 bdb_options.disable_background_tasks = true;
465 Open(bdb_options);
466 std::map<std::string, std::string> data;
467 for (size_t i = 0; i < 100; i++) {
468 WriteBatch batch;
469 for (size_t j = 0; j < 10; j++) {
470 PutRandomToWriteBatch("key" + std::to_string(j * 100 + i), &rnd, &batch,
471 &data);
472 }
473
474 ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
475 }
476 VerifyDB(data);
477 }
478
479 TEST_F(BlobDBTest, Delete) {
480 Random rnd(301);
481 BlobDBOptions bdb_options;
482 bdb_options.min_blob_size = 0;
483 bdb_options.disable_background_tasks = true;
484 Open(bdb_options);
485 std::map<std::string, std::string> data;
486 for (size_t i = 0; i < 100; i++) {
487 PutRandom("key" + std::to_string(i), &rnd, &data);
488 }
489 for (size_t i = 0; i < 100; i += 5) {
490 Delete("key" + std::to_string(i), &data);
491 }
492 VerifyDB(data);
493 }
494
495 TEST_F(BlobDBTest, DeleteBatch) {
496 Random rnd(301);
497 BlobDBOptions bdb_options;
498 bdb_options.min_blob_size = 0;
499 bdb_options.disable_background_tasks = true;
500 Open(bdb_options);
501 for (size_t i = 0; i < 100; i++) {
502 PutRandom("key" + std::to_string(i), &rnd);
503 }
504 WriteBatch batch;
505 for (size_t i = 0; i < 100; i++) {
506 ASSERT_OK(batch.Delete("key" + std::to_string(i)));
507 }
508 ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
509 // DB should be empty.
510 VerifyDB({});
511 }
512
513 TEST_F(BlobDBTest, Override) {
514 Random rnd(301);
515 BlobDBOptions bdb_options;
516 bdb_options.min_blob_size = 0;
517 bdb_options.disable_background_tasks = true;
518 Open(bdb_options);
519 std::map<std::string, std::string> data;
520 for (int i = 0; i < 10000; i++) {
521 PutRandom("key" + std::to_string(i), &rnd, nullptr);
522 }
523 // override all the keys
524 for (int i = 0; i < 10000; i++) {
525 PutRandom("key" + std::to_string(i), &rnd, &data);
526 }
527 VerifyDB(data);
528 }
529
530 #ifdef SNAPPY
531 TEST_F(BlobDBTest, Compression) {
532 Random rnd(301);
533 BlobDBOptions bdb_options;
534 bdb_options.min_blob_size = 0;
535 bdb_options.disable_background_tasks = true;
536 bdb_options.compression = CompressionType::kSnappyCompression;
537 Open(bdb_options);
538 std::map<std::string, std::string> data;
539 for (size_t i = 0; i < 100; i++) {
540 PutRandom("put-key" + std::to_string(i), &rnd, &data);
541 }
542 for (int i = 0; i < 100; i++) {
543 WriteBatch batch;
544 for (size_t j = 0; j < 10; j++) {
545 PutRandomToWriteBatch("write-batch-key" + std::to_string(j * 100 + i),
546 &rnd, &batch, &data);
547 }
548 ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
549 }
550 VerifyDB(data);
551 }
552
553 TEST_F(BlobDBTest, DecompressAfterReopen) {
554 Random rnd(301);
555 BlobDBOptions bdb_options;
556 bdb_options.min_blob_size = 0;
557 bdb_options.disable_background_tasks = true;
558 bdb_options.compression = CompressionType::kSnappyCompression;
559 Open(bdb_options);
560 std::map<std::string, std::string> data;
561 for (size_t i = 0; i < 100; i++) {
562 PutRandom("put-key" + std::to_string(i), &rnd, &data);
563 }
564 VerifyDB(data);
565 bdb_options.compression = CompressionType::kNoCompression;
566 Reopen(bdb_options);
567 VerifyDB(data);
568 }
569
570 TEST_F(BlobDBTest, EnableDisableCompressionGC) {
571 Random rnd(301);
572 BlobDBOptions bdb_options;
573 bdb_options.min_blob_size = 0;
574 bdb_options.garbage_collection_cutoff = 1.0;
575 bdb_options.disable_background_tasks = true;
576 bdb_options.compression = kSnappyCompression;
577 Open(bdb_options);
578 std::map<std::string, std::string> data;
579 size_t data_idx = 0;
580 for (; data_idx < 100; data_idx++) {
581 PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
582 }
583 VerifyDB(data);
584 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
585 ASSERT_EQ(1, blob_files.size());
586 ASSERT_EQ(kSnappyCompression, blob_files[0]->GetCompressionType());
587
588 // disable compression
589 bdb_options.compression = kNoCompression;
590 Reopen(bdb_options);
591
592 // Add more data with new compression type
593 for (; data_idx < 200; data_idx++) {
594 PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
595 }
596 VerifyDB(data);
597
598 blob_files = blob_db_impl()->TEST_GetBlobFiles();
599 ASSERT_EQ(2, blob_files.size());
600 ASSERT_EQ(kNoCompression, blob_files[1]->GetCompressionType());
601
602 // Enable GC. If we do it earlier the snapshot release triggered compaction
603 // may compact files and trigger GC before we can verify there are two files.
604 bdb_options.enable_garbage_collection = true;
605 Reopen(bdb_options);
606
607 // Trigger compaction
608 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
609 blob_db_impl()->TEST_DeleteObsoleteFiles();
610 VerifyDB(data);
611
612 blob_files = blob_db_impl()->TEST_GetBlobFiles();
613 for (auto bfile : blob_files) {
614 ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
615 }
616
617 // enabling the compression again
618 bdb_options.compression = kSnappyCompression;
619 Reopen(bdb_options);
620
621 // Add more data with new compression type
622 for (; data_idx < 300; data_idx++) {
623 PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
624 }
625 VerifyDB(data);
626
627 // Trigger compaction
628 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
629 blob_db_impl()->TEST_DeleteObsoleteFiles();
630 VerifyDB(data);
631
632 blob_files = blob_db_impl()->TEST_GetBlobFiles();
633 for (auto bfile : blob_files) {
634 ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
635 }
636 }
637
638 #ifdef LZ4
639 // Test switch compression types and run GC, it needs both Snappy and LZ4
640 // support.
641 TEST_F(BlobDBTest, ChangeCompressionGC) {
642 Random rnd(301);
643 BlobDBOptions bdb_options;
644 bdb_options.min_blob_size = 0;
645 bdb_options.garbage_collection_cutoff = 1.0;
646 bdb_options.disable_background_tasks = true;
647 bdb_options.compression = kLZ4Compression;
648 Open(bdb_options);
649 std::map<std::string, std::string> data;
650 size_t data_idx = 0;
651 for (; data_idx < 100; data_idx++) {
652 PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
653 }
654 VerifyDB(data);
655 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
656 ASSERT_EQ(1, blob_files.size());
657 ASSERT_EQ(kLZ4Compression, blob_files[0]->GetCompressionType());
658
659 // Change compression type
660 bdb_options.compression = kSnappyCompression;
661 Reopen(bdb_options);
662
663 // Add more data with Snappy compression type
664 for (; data_idx < 200; data_idx++) {
665 PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
666 }
667 VerifyDB(data);
668
669 // Verify blob file compression type
670 blob_files = blob_db_impl()->TEST_GetBlobFiles();
671 ASSERT_EQ(2, blob_files.size());
672 ASSERT_EQ(kSnappyCompression, blob_files[1]->GetCompressionType());
673
674 // Enable GC. If we do it earlier the snapshot release triggered compaction
675 // may compact files and trigger GC before we can verify there are two files.
676 bdb_options.enable_garbage_collection = true;
677 Reopen(bdb_options);
678
679 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
680 VerifyDB(data);
681
682 blob_db_impl()->TEST_DeleteObsoleteFiles();
683 blob_files = blob_db_impl()->TEST_GetBlobFiles();
684 for (auto bfile : blob_files) {
685 ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
686 }
687
688 // Disable compression
689 bdb_options.compression = kNoCompression;
690 Reopen(bdb_options);
691 for (; data_idx < 300; data_idx++) {
692 PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
693 }
694 VerifyDB(data);
695
696 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
697 VerifyDB(data);
698
699 blob_db_impl()->TEST_DeleteObsoleteFiles();
700 blob_files = blob_db_impl()->TEST_GetBlobFiles();
701 for (auto bfile : blob_files) {
702 ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
703 }
704
705 // switching different compression types to generate mixed compression types
706 bdb_options.compression = kSnappyCompression;
707 Reopen(bdb_options);
708 for (; data_idx < 400; data_idx++) {
709 PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
710 }
711 VerifyDB(data);
712
713 bdb_options.compression = kLZ4Compression;
714 Reopen(bdb_options);
715 for (; data_idx < 500; data_idx++) {
716 PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
717 }
718 VerifyDB(data);
719
720 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
721 VerifyDB(data);
722
723 blob_db_impl()->TEST_DeleteObsoleteFiles();
724 blob_files = blob_db_impl()->TEST_GetBlobFiles();
725 for (auto bfile : blob_files) {
726 ASSERT_EQ(kLZ4Compression, bfile->GetCompressionType());
727 }
728 }
729 #endif // LZ4
730 #endif // SNAPPY
731
732 TEST_F(BlobDBTest, MultipleWriters) {
733 Open(BlobDBOptions());
734
735 std::vector<port::Thread> workers;
736 std::vector<std::map<std::string, std::string>> data_set(10);
737 for (uint32_t i = 0; i < 10; i++)
738 workers.push_back(port::Thread(
739 [&](uint32_t id) {
740 Random rnd(301 + id);
741 for (int j = 0; j < 100; j++) {
742 std::string key =
743 "key" + std::to_string(id) + "_" + std::to_string(j);
744 if (id < 5) {
745 PutRandom(key, &rnd, &data_set[id]);
746 } else {
747 WriteBatch batch;
748 PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
749 ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
750 }
751 }
752 },
753 i));
754 std::map<std::string, std::string> data;
755 for (size_t i = 0; i < 10; i++) {
756 workers[i].join();
757 data.insert(data_set[i].begin(), data_set[i].end());
758 }
759 VerifyDB(data);
760 }
761
762 TEST_F(BlobDBTest, SstFileManager) {
763 // run the same test for Get(), MultiGet() and Iterator each.
764 std::shared_ptr<SstFileManager> sst_file_manager(
765 NewSstFileManager(mock_env_.get()));
766 sst_file_manager->SetDeleteRateBytesPerSecond(1);
767 SstFileManagerImpl *sfm =
768 static_cast<SstFileManagerImpl *>(sst_file_manager.get());
769
770 BlobDBOptions bdb_options;
771 bdb_options.min_blob_size = 0;
772 bdb_options.enable_garbage_collection = true;
773 bdb_options.garbage_collection_cutoff = 1.0;
774 Options db_options;
775
776 int files_scheduled_to_delete = 0;
777 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
778 "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
779 assert(arg);
780 const std::string *const file_path =
781 static_cast<const std::string *>(arg);
782 if (file_path->find(".blob") != std::string::npos) {
783 ++files_scheduled_to_delete;
784 }
785 });
786 SyncPoint::GetInstance()->EnableProcessing();
787 db_options.sst_file_manager = sst_file_manager;
788
789 Open(bdb_options, db_options);
790
791 // Create one obselete file and clean it.
792 ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "bar"));
793 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
794 ASSERT_EQ(1, blob_files.size());
795 std::shared_ptr<BlobFile> bfile = blob_files[0];
796 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
797 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
798 blob_db_impl()->TEST_DeleteObsoleteFiles();
799
800 // Even if SSTFileManager is not set, DB is creating a dummy one.
801 ASSERT_EQ(1, files_scheduled_to_delete);
802 Destroy();
803 // Make sure that DestroyBlobDB() also goes through delete scheduler.
804 ASSERT_EQ(2, files_scheduled_to_delete);
805 SyncPoint::GetInstance()->DisableProcessing();
806 sfm->WaitForEmptyTrash();
807 }
808
809 TEST_F(BlobDBTest, SstFileManagerRestart) {
810 int files_scheduled_to_delete = 0;
811 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
812 "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
813 assert(arg);
814 const std::string *const file_path =
815 static_cast<const std::string *>(arg);
816 if (file_path->find(".blob") != std::string::npos) {
817 ++files_scheduled_to_delete;
818 }
819 });
820
821 // run the same test for Get(), MultiGet() and Iterator each.
822 std::shared_ptr<SstFileManager> sst_file_manager(
823 NewSstFileManager(mock_env_.get()));
824 sst_file_manager->SetDeleteRateBytesPerSecond(1);
825 SstFileManagerImpl *sfm =
826 static_cast<SstFileManagerImpl *>(sst_file_manager.get());
827
828 BlobDBOptions bdb_options;
829 bdb_options.min_blob_size = 0;
830 Options db_options;
831
832 SyncPoint::GetInstance()->EnableProcessing();
833 db_options.sst_file_manager = sst_file_manager;
834
835 Open(bdb_options, db_options);
836 std::string blob_dir = blob_db_impl()->TEST_blob_dir();
837 ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "bar"));
838 Close();
839
840 // Create 3 dummy trash files under the blob_dir
841 const auto &fs = db_options.env->GetFileSystem();
842 ASSERT_OK(CreateFile(fs, blob_dir + "/000666.blob.trash", "", false));
843 ASSERT_OK(CreateFile(fs, blob_dir + "/000888.blob.trash", "", true));
844 ASSERT_OK(CreateFile(fs, blob_dir + "/something_not_match.trash", "", false));
845
846 // Make sure that reopening the DB rescan the existing trash files
847 Open(bdb_options, db_options);
848 ASSERT_EQ(files_scheduled_to_delete, 2);
849
850 sfm->WaitForEmptyTrash();
851
852 // There should be exact one file under the blob dir now.
853 std::vector<std::string> all_files;
854 ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
855 int nfiles = 0;
856 for (const auto &f : all_files) {
857 assert(!f.empty());
858 if (f[0] == '.') {
859 continue;
860 }
861 nfiles++;
862 }
863 ASSERT_EQ(nfiles, 1);
864
865 SyncPoint::GetInstance()->DisableProcessing();
866 }
867
868 TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
869 BlobDBOptions bdb_options;
870 bdb_options.min_blob_size = 0;
871 bdb_options.enable_garbage_collection = true;
872 bdb_options.garbage_collection_cutoff = 1.0;
873 bdb_options.disable_background_tasks = true;
874
875 Options options;
876 options.disable_auto_compactions = true;
877
878 // i = when to take snapshot
879 for (int i = 0; i < 4; i++) {
880 Destroy();
881 Open(bdb_options, options);
882
883 const Snapshot *snapshot = nullptr;
884
885 // First file
886 ASSERT_OK(Put("key1", "value"));
887 if (i == 0) {
888 snapshot = blob_db_->GetSnapshot();
889 }
890
891 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
892 ASSERT_EQ(1, blob_files.size());
893 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
894
895 // Second file
896 ASSERT_OK(Put("key2", "value"));
897 if (i == 1) {
898 snapshot = blob_db_->GetSnapshot();
899 }
900
901 blob_files = blob_db_impl()->TEST_GetBlobFiles();
902 ASSERT_EQ(2, blob_files.size());
903 auto bfile = blob_files[1];
904 ASSERT_FALSE(bfile->Immutable());
905 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
906
907 // Third file
908 ASSERT_OK(Put("key3", "value"));
909 if (i == 2) {
910 snapshot = blob_db_->GetSnapshot();
911 }
912
913 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
914 ASSERT_TRUE(bfile->Obsolete());
915 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
916 bfile->GetObsoleteSequence());
917
918 Delete("key2");
919 if (i == 3) {
920 snapshot = blob_db_->GetSnapshot();
921 }
922
923 ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
924 blob_db_impl()->TEST_DeleteObsoleteFiles();
925
926 if (i >= 2) {
927 // The snapshot shouldn't see data in bfile
928 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
929 blob_db_->ReleaseSnapshot(snapshot);
930 } else {
931 // The snapshot will see data in bfile, so the file shouldn't be deleted
932 ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
933 blob_db_->ReleaseSnapshot(snapshot);
934 blob_db_impl()->TEST_DeleteObsoleteFiles();
935 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
936 }
937 }
938 }
939
940 TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
941 Options options;
942 options.env = mock_env_.get();
943 mock_clock_->SetCurrentTime(0);
944 Open(BlobDBOptions(), options);
945 ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
946 ColumnFamilyHandle *handle = nullptr;
947 std::string value;
948 std::vector<std::string> values;
949 // The call simply pass through to base db. It should succeed.
950 ASSERT_OK(
951 blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
952 ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
953 ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
954 .IsNotSupported());
955 ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
956 .IsNotSupported());
957 WriteBatch batch;
958 ASSERT_OK(batch.Put("k1", "v1"));
959 ASSERT_OK(batch.Put(handle, "k2", "v2"));
960 ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
961 ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
962 ASSERT_TRUE(
963 blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
964 auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
965 {"k1", "k2"}, &values);
966 ASSERT_EQ(2, statuses.size());
967 ASSERT_TRUE(statuses[0].IsNotSupported());
968 ASSERT_TRUE(statuses[1].IsNotSupported());
969 ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
970 delete handle;
971 }
972
973 TEST_F(BlobDBTest, GetLiveFilesMetaData) {
974 Random rnd(301);
975
976 BlobDBOptions bdb_options;
977 bdb_options.blob_dir = "blob_dir";
978 bdb_options.path_relative = true;
979 bdb_options.ttl_range_secs = 10;
980 bdb_options.min_blob_size = 0;
981 bdb_options.disable_background_tasks = true;
982
983 Options options;
984 options.env = mock_env_.get();
985
986 Open(bdb_options, options);
987
988 std::map<std::string, std::string> data;
989 for (size_t i = 0; i < 100; i++) {
990 PutRandom("key" + std::to_string(i), &rnd, &data);
991 }
992
993 constexpr uint64_t expiration = 1000ULL;
994 PutRandomUntil("key100", expiration, &rnd, &data);
995
996 std::vector<LiveFileMetaData> metadata;
997 blob_db_->GetLiveFilesMetaData(&metadata);
998
999 ASSERT_EQ(2U, metadata.size());
1000 // Path should be relative to db_name, but begin with slash.
1001 const std::string filename1("/blob_dir/000001.blob");
1002 ASSERT_EQ(filename1, metadata[0].name);
1003 ASSERT_EQ(1, metadata[0].file_number);
1004 ASSERT_EQ(0, metadata[0].oldest_ancester_time);
1005 ASSERT_EQ(kDefaultColumnFamilyName, metadata[0].column_family_name);
1006
1007 const std::string filename2("/blob_dir/000002.blob");
1008 ASSERT_EQ(filename2, metadata[1].name);
1009 ASSERT_EQ(2, metadata[1].file_number);
1010 ASSERT_EQ(expiration, metadata[1].oldest_ancester_time);
1011 ASSERT_EQ(kDefaultColumnFamilyName, metadata[1].column_family_name);
1012
1013 std::vector<std::string> livefile;
1014 uint64_t mfs;
1015 ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
1016 ASSERT_EQ(5U, livefile.size());
1017 ASSERT_EQ(filename1, livefile[3]);
1018 ASSERT_EQ(filename2, livefile[4]);
1019 VerifyDB(data);
1020 }
1021
1022 TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
1023 constexpr size_t kNumKey = 20;
1024 constexpr size_t kNumIteration = 10;
1025 Random rnd(301);
1026 std::map<std::string, std::string> data;
1027 std::vector<bool> is_blob(kNumKey, false);
1028
1029 // Write to plain rocksdb.
1030 Options options;
1031 options.create_if_missing = true;
1032 DB *db = nullptr;
1033 ASSERT_OK(DB::Open(options, dbname_, &db));
1034 for (size_t i = 0; i < kNumIteration; i++) {
1035 auto key_index = rnd.Next() % kNumKey;
1036 std::string key = "key" + std::to_string(key_index);
1037 PutRandom(db, key, &rnd, &data);
1038 }
1039 VerifyDB(db, data);
1040 delete db;
1041 db = nullptr;
1042
1043 // Open as blob db. Verify it can read existing data.
1044 Open();
1045 VerifyDB(blob_db_, data);
1046 for (size_t i = 0; i < kNumIteration; i++) {
1047 auto key_index = rnd.Next() % kNumKey;
1048 std::string key = "key" + std::to_string(key_index);
1049 is_blob[key_index] = true;
1050 PutRandom(blob_db_, key, &rnd, &data);
1051 }
1052 VerifyDB(blob_db_, data);
1053 delete blob_db_;
1054 blob_db_ = nullptr;
1055
1056 // Verify plain db return error for keys written by blob db.
1057 ASSERT_OK(DB::Open(options, dbname_, &db));
1058 std::string value;
1059 for (size_t i = 0; i < kNumKey; i++) {
1060 std::string key = "key" + std::to_string(i);
1061 Status s = db->Get(ReadOptions(), key, &value);
1062 if (data.count(key) == 0) {
1063 ASSERT_TRUE(s.IsNotFound());
1064 } else if (is_blob[i]) {
1065 ASSERT_TRUE(s.IsCorruption());
1066 } else {
1067 ASSERT_OK(s);
1068 ASSERT_EQ(data[key], value);
1069 }
1070 }
1071 delete db;
1072 }
1073
1074 // Test to verify that a NoSpace IOError Status is returned on reaching
1075 // max_db_size limit.
1076 TEST_F(BlobDBTest, OutOfSpace) {
1077 // Use mock env to stop wall clock.
1078 Options options;
1079 options.env = mock_env_.get();
1080 BlobDBOptions bdb_options;
1081 bdb_options.max_db_size = 200;
1082 bdb_options.is_fifo = false;
1083 bdb_options.disable_background_tasks = true;
1084 Open(bdb_options);
1085
1086 // Each stored blob has an overhead of about 42 bytes currently.
1087 // So a small key + a 100 byte blob should take up ~150 bytes in the db.
1088 std::string value(100, 'v');
1089 ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
1090
1091 // Putting another blob should fail as ading it would exceed the max_db_size
1092 // limit.
1093 Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
1094 ASSERT_TRUE(s.IsIOError());
1095 ASSERT_TRUE(s.IsNoSpace());
1096 }
1097
1098 TEST_F(BlobDBTest, FIFOEviction) {
1099 BlobDBOptions bdb_options;
1100 bdb_options.max_db_size = 200;
1101 bdb_options.blob_file_size = 100;
1102 bdb_options.is_fifo = true;
1103 bdb_options.disable_background_tasks = true;
1104 Open(bdb_options);
1105
1106 std::atomic<int> evict_count{0};
1107 SyncPoint::GetInstance()->SetCallBack(
1108 "BlobDBImpl::EvictOldestBlobFile:Evicted",
1109 [&](void *) { evict_count++; });
1110 SyncPoint::GetInstance()->EnableProcessing();
1111
1112 // Each stored blob has an overhead of 32 bytes currently.
1113 // So a 100 byte blob should take up 132 bytes.
1114 std::string value(100, 'v');
1115 ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
1116 VerifyDB({{"key1", value}});
1117
1118 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1119
1120 // Adding another 100 bytes blob would take the total size to 264 bytes
1121 // (2*132). max_db_size will be exceeded
1122 // than max_db_size and trigger FIFO eviction.
1123 ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
1124 ASSERT_EQ(1, evict_count);
1125 // key1 will exist until corresponding file be deleted.
1126 VerifyDB({{"key1", value}, {"key2", value}});
1127
1128 // Adding another 100 bytes blob without TTL.
1129 ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
1130 ASSERT_EQ(2, evict_count);
1131 // key1 and key2 will exist until corresponding file be deleted.
1132 VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
1133
1134 // The fourth blob file, without TTL.
1135 ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
1136 ASSERT_EQ(3, evict_count);
1137 VerifyDB(
1138 {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
1139
1140 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1141 ASSERT_EQ(4, blob_files.size());
1142 ASSERT_TRUE(blob_files[0]->Obsolete());
1143 ASSERT_TRUE(blob_files[1]->Obsolete());
1144 ASSERT_TRUE(blob_files[2]->Obsolete());
1145 ASSERT_FALSE(blob_files[3]->Obsolete());
1146 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1147 ASSERT_EQ(3, obsolete_files.size());
1148 ASSERT_EQ(blob_files[0], obsolete_files[0]);
1149 ASSERT_EQ(blob_files[1], obsolete_files[1]);
1150 ASSERT_EQ(blob_files[2], obsolete_files[2]);
1151
1152 blob_db_impl()->TEST_DeleteObsoleteFiles();
1153 obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1154 ASSERT_TRUE(obsolete_files.empty());
1155 VerifyDB({{"key4", value}});
1156 }
1157
1158 TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
1159 Options options;
1160 BlobDBOptions bdb_options;
1161 bdb_options.max_db_size = 1000;
1162 bdb_options.blob_file_size = 5000;
1163 bdb_options.is_fifo = true;
1164 bdb_options.disable_background_tasks = true;
1165 Open(bdb_options);
1166
1167 std::atomic<int> evict_count{0};
1168 SyncPoint::GetInstance()->SetCallBack(
1169 "BlobDBImpl::EvictOldestBlobFile:Evicted",
1170 [&](void *) { evict_count++; });
1171 SyncPoint::GetInstance()->EnableProcessing();
1172
1173 std::string value(2000, 'v');
1174 ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
1175 ASSERT_EQ(0, evict_count);
1176 }
1177
1178 TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
1179 BlobDBOptions bdb_options;
1180 bdb_options.is_fifo = true;
1181 bdb_options.min_blob_size = 100;
1182 bdb_options.disable_background_tasks = true;
1183 Options options;
1184 // Use mock env to stop wall clock.
1185 options.env = mock_env_.get();
1186 options.disable_auto_compactions = true;
1187 auto statistics = CreateDBStatistics();
1188 options.statistics = statistics;
1189 Open(bdb_options, options);
1190
1191 ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
1192 std::string small_value(50, 'v');
1193 std::map<std::string, std::string> data;
1194 // Insert some data into LSM tree to make sure FIFO eviction take SST
1195 // file size into account.
1196 for (int i = 0; i < 1000; i++) {
1197 ASSERT_OK(Put("key" + std::to_string(i), small_value, &data));
1198 }
1199 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1200 uint64_t live_sst_size = 0;
1201 ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
1202 &live_sst_size));
1203 ASSERT_TRUE(live_sst_size > 0);
1204 ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
1205
1206 bdb_options.max_db_size = live_sst_size + 2000;
1207 Reopen(bdb_options, options);
1208 ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
1209
1210 std::string value_1k(1000, 'v');
1211 ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
1212 ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1213 VerifyDB(data);
1214 // large_key2 evicts large_key1
1215 ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
1216 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1217 blob_db_impl()->TEST_DeleteObsoleteFiles();
1218 data.erase("large_key1");
1219 VerifyDB(data);
1220 // large_key3 get no enough space even after evicting large_key2, so it
1221 // instead return no space error.
1222 std::string value_2k(2000, 'v');
1223 ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
1224 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1225 // Verify large_key2 still exists.
1226 VerifyDB(data);
1227 }
1228
1229 // Test flush or compaction will trigger FIFO eviction since they update
1230 // total SST file size.
1231 TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
1232 BlobDBOptions bdb_options;
1233 bdb_options.max_db_size = 1000;
1234 bdb_options.is_fifo = true;
1235 bdb_options.min_blob_size = 100;
1236 bdb_options.disable_background_tasks = true;
1237 Options options;
1238 // Use mock env to stop wall clock.
1239 options.env = mock_env_.get();
1240 auto statistics = CreateDBStatistics();
1241 options.statistics = statistics;
1242 options.compression = kNoCompression;
1243 Open(bdb_options, options);
1244
1245 std::string value(800, 'v');
1246 ASSERT_OK(PutWithTTL("large_key", value, 60));
1247 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1248 ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1249 VerifyDB({{"large_key", value}});
1250
1251 // Insert some small keys and flush to bring DB out of space.
1252 std::map<std::string, std::string> data;
1253 for (int i = 0; i < 10; i++) {
1254 ASSERT_OK(Put("key" + std::to_string(i), "v", &data));
1255 }
1256 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1257
1258 // Verify large_key is deleted by FIFO eviction.
1259 blob_db_impl()->TEST_DeleteObsoleteFiles();
1260 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1261 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1262 VerifyDB(data);
1263 }
1264
1265 TEST_F(BlobDBTest, InlineSmallValues) {
1266 constexpr uint64_t kMaxExpiration = 1000;
1267 Random rnd(301);
1268 BlobDBOptions bdb_options;
1269 bdb_options.ttl_range_secs = kMaxExpiration;
1270 bdb_options.min_blob_size = 100;
1271 bdb_options.blob_file_size = 256 * 1000 * 1000;
1272 bdb_options.disable_background_tasks = true;
1273 Options options;
1274 options.env = mock_env_.get();
1275 mock_clock_->SetCurrentTime(0);
1276 Open(bdb_options, options);
1277 std::map<std::string, std::string> data;
1278 std::map<std::string, KeyVersion> versions;
1279 for (size_t i = 0; i < 1000; i++) {
1280 bool is_small_value = rnd.Next() % 2;
1281 bool has_ttl = rnd.Next() % 2;
1282 uint64_t expiration = rnd.Next() % kMaxExpiration;
1283 int len = is_small_value ? 50 : 200;
1284 std::string key = "key" + std::to_string(i);
1285 std::string value = rnd.HumanReadableString(len);
1286 std::string blob_index;
1287 data[key] = value;
1288 SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1289 if (!has_ttl) {
1290 ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
1291 } else {
1292 ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
1293 }
1294 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1295 versions[key] =
1296 KeyVersion(key, value, sequence,
1297 (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
1298 }
1299 VerifyDB(data);
1300 VerifyBaseDB(versions);
1301 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
1302 auto blob_files = bdb_impl->TEST_GetBlobFiles();
1303 ASSERT_EQ(2, blob_files.size());
1304 std::shared_ptr<BlobFile> non_ttl_file;
1305 std::shared_ptr<BlobFile> ttl_file;
1306 if (blob_files[0]->HasTTL()) {
1307 ttl_file = blob_files[0];
1308 non_ttl_file = blob_files[1];
1309 } else {
1310 non_ttl_file = blob_files[0];
1311 ttl_file = blob_files[1];
1312 }
1313 ASSERT_FALSE(non_ttl_file->HasTTL());
1314 ASSERT_TRUE(ttl_file->HasTTL());
1315 }
1316
1317 TEST_F(BlobDBTest, UserCompactionFilter) {
1318 class CustomerFilter : public CompactionFilter {
1319 public:
1320 bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
1321 std::string *new_value, bool *value_changed) const override {
1322 *value_changed = false;
1323 // changing value size to test value transitions between inlined data
1324 // and stored-in-blob data
1325 if (value.size() % 4 == 1) {
1326 *new_value = value.ToString();
1327 // double size by duplicating value
1328 *new_value += *new_value;
1329 *value_changed = true;
1330 return false;
1331 } else if (value.size() % 3 == 1) {
1332 *new_value = value.ToString();
1333 // trancate value size by half
1334 *new_value = new_value->substr(0, new_value->size() / 2);
1335 *value_changed = true;
1336 return false;
1337 } else if (value.size() % 2 == 1) {
1338 return true;
1339 }
1340 return false;
1341 }
1342 bool IgnoreSnapshots() const override { return true; }
1343 const char *Name() const override { return "CustomerFilter"; }
1344 };
1345 class CustomerFilterFactory : public CompactionFilterFactory {
1346 const char *Name() const override { return "CustomerFilterFactory"; }
1347 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
1348 const CompactionFilter::Context & /*context*/) override {
1349 return std::unique_ptr<CompactionFilter>(new CustomerFilter());
1350 }
1351 };
1352
1353 constexpr size_t kNumPuts = 1 << 10;
1354 // Generate both inlined and blob value
1355 constexpr uint64_t kMinValueSize = 1 << 6;
1356 constexpr uint64_t kMaxValueSize = 1 << 8;
1357 constexpr uint64_t kMinBlobSize = 1 << 7;
1358 static_assert(kMinValueSize < kMinBlobSize, "");
1359 static_assert(kMaxValueSize > kMinBlobSize, "");
1360
1361 BlobDBOptions bdb_options;
1362 bdb_options.min_blob_size = kMinBlobSize;
1363 bdb_options.blob_file_size = kMaxValueSize * 10;
1364 bdb_options.disable_background_tasks = true;
1365 if (Snappy_Supported()) {
1366 bdb_options.compression = CompressionType::kSnappyCompression;
1367 }
1368 // case_num == 0: Test user defined compaction filter
1369 // case_num == 1: Test user defined compaction filter factory
1370 for (int case_num = 0; case_num < 2; case_num++) {
1371 Options options;
1372 if (case_num == 0) {
1373 options.compaction_filter = new CustomerFilter();
1374 } else {
1375 options.compaction_filter_factory.reset(new CustomerFilterFactory());
1376 }
1377 options.disable_auto_compactions = true;
1378 options.env = mock_env_.get();
1379 options.statistics = CreateDBStatistics();
1380 Open(bdb_options, options);
1381
1382 std::map<std::string, std::string> data;
1383 std::map<std::string, std::string> data_after_compact;
1384 Random rnd(301);
1385 uint64_t value_size = kMinValueSize;
1386 int drop_record = 0;
1387 for (size_t i = 0; i < kNumPuts; ++i) {
1388 std::ostringstream oss;
1389 oss << "key" << std::setw(4) << std::setfill('0') << i;
1390
1391 const std::string key(oss.str());
1392 const std::string value = rnd.HumanReadableString((int)value_size);
1393 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1394
1395 ASSERT_OK(Put(key, value));
1396 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1397
1398 data[key] = value;
1399 if (value.length() % 4 == 1) {
1400 data_after_compact[key] = value + value;
1401 } else if (value.length() % 3 == 1) {
1402 data_after_compact[key] = value.substr(0, value.size() / 2);
1403 } else if (value.length() % 2 == 1) {
1404 ++drop_record;
1405 } else {
1406 data_after_compact[key] = value;
1407 }
1408
1409 if (++value_size > kMaxValueSize) {
1410 value_size = kMinValueSize;
1411 }
1412 }
1413 // Verify full data set
1414 VerifyDB(data);
1415 // Applying compaction filter for records
1416 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1417 // Verify data after compaction, only value with even length left.
1418 VerifyDB(data_after_compact);
1419 ASSERT_EQ(drop_record,
1420 options.statistics->getTickerCount(COMPACTION_KEY_DROP_USER));
1421 delete options.compaction_filter;
1422 Destroy();
1423 }
1424 }
1425
1426 // Test user comapction filter when there is IO error on blob data.
1427 TEST_F(BlobDBTest, UserCompactionFilter_BlobIOError) {
1428 class CustomerFilter : public CompactionFilter {
1429 public:
1430 bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
1431 std::string *new_value, bool *value_changed) const override {
1432 *new_value = value.ToString() + "_new";
1433 *value_changed = true;
1434 return false;
1435 }
1436 bool IgnoreSnapshots() const override { return true; }
1437 const char *Name() const override { return "CustomerFilter"; }
1438 };
1439
1440 constexpr size_t kNumPuts = 100;
1441 constexpr int kValueSize = 100;
1442
1443 BlobDBOptions bdb_options;
1444 bdb_options.min_blob_size = 0;
1445 bdb_options.blob_file_size = kValueSize * 10;
1446 bdb_options.disable_background_tasks = true;
1447 bdb_options.compression = CompressionType::kNoCompression;
1448
1449 std::vector<std::string> io_failure_cases = {
1450 "BlobDBImpl::CreateBlobFileAndWriter",
1451 "BlobIndexCompactionFilterBase::WriteBlobToNewFile",
1452 "BlobDBImpl::CloseBlobFile"};
1453
1454 for (size_t case_num = 0; case_num < io_failure_cases.size(); case_num++) {
1455 Options options;
1456 options.compaction_filter = new CustomerFilter();
1457 options.disable_auto_compactions = true;
1458 options.env = fault_injection_env_.get();
1459 options.statistics = CreateDBStatistics();
1460 Open(bdb_options, options);
1461
1462 std::map<std::string, std::string> data;
1463 Random rnd(301);
1464 for (size_t i = 0; i < kNumPuts; ++i) {
1465 std::ostringstream oss;
1466 oss << "key" << std::setw(4) << std::setfill('0') << i;
1467
1468 const std::string key(oss.str());
1469 const std::string value = rnd.HumanReadableString(kValueSize);
1470 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1471
1472 ASSERT_OK(Put(key, value));
1473 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1474 data[key] = value;
1475 }
1476
1477 // Verify full data set
1478 VerifyDB(data);
1479
1480 SyncPoint::GetInstance()->SetCallBack(
1481 io_failure_cases[case_num], [&](void * /*arg*/) {
1482 fault_injection_env_->SetFilesystemActive(false, Status::IOError());
1483 });
1484 SyncPoint::GetInstance()->EnableProcessing();
1485 auto s = blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
1486 ASSERT_TRUE(s.IsIOError());
1487
1488 // Reactivate file system to allow test to verify and close DB.
1489 fault_injection_env_->SetFilesystemActive(true);
1490 SyncPoint::GetInstance()->DisableProcessing();
1491 SyncPoint::GetInstance()->ClearAllCallBacks();
1492
1493 // Verify full data set after compaction failure
1494 VerifyDB(data);
1495
1496 delete options.compaction_filter;
1497 Destroy();
1498 }
1499 }
1500
1501 // Test comapction filter should remove any expired blob index.
1502 TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
1503 constexpr size_t kNumKeys = 100;
1504 constexpr size_t kNumPuts = 1000;
1505 constexpr uint64_t kMaxExpiration = 1000;
1506 constexpr uint64_t kCompactTime = 500;
1507 constexpr uint64_t kMinBlobSize = 100;
1508 Random rnd(301);
1509 mock_clock_->SetCurrentTime(0);
1510 BlobDBOptions bdb_options;
1511 bdb_options.min_blob_size = kMinBlobSize;
1512 bdb_options.disable_background_tasks = true;
1513 Options options;
1514 options.env = mock_env_.get();
1515 Open(bdb_options, options);
1516
1517 std::map<std::string, std::string> data;
1518 std::map<std::string, std::string> data_after_compact;
1519 for (size_t i = 0; i < kNumPuts; i++) {
1520 bool is_small_value = rnd.Next() % 2;
1521 bool has_ttl = rnd.Next() % 2;
1522 uint64_t expiration = rnd.Next() % kMaxExpiration;
1523 int len = is_small_value ? 10 : 200;
1524 std::string key = "key" + std::to_string(rnd.Next() % kNumKeys);
1525 std::string value = rnd.HumanReadableString(len);
1526 if (!has_ttl) {
1527 if (is_small_value) {
1528 std::string blob_entry;
1529 BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
1530 // Fake blob index with TTL. See what it will do.
1531 ASSERT_GT(kMinBlobSize, blob_entry.size());
1532 value = blob_entry;
1533 }
1534 ASSERT_OK(Put(key, value));
1535 data_after_compact[key] = value;
1536 } else {
1537 ASSERT_OK(PutUntil(key, value, expiration));
1538 if (expiration <= kCompactTime) {
1539 data_after_compact.erase(key);
1540 } else {
1541 data_after_compact[key] = value;
1542 }
1543 }
1544 data[key] = value;
1545 }
1546 VerifyDB(data);
1547
1548 mock_clock_->SetCurrentTime(kCompactTime);
1549 // Take a snapshot before compaction. Make sure expired blob indexes is
1550 // filtered regardless of snapshot.
1551 const Snapshot *snapshot = blob_db_->GetSnapshot();
1552 // Issue manual compaction to trigger compaction filter.
1553 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1554 blob_db_->ReleaseSnapshot(snapshot);
1555 // Verify expired blob index are filtered.
1556 std::vector<KeyVersion> versions;
1557 const size_t kMaxKeys = 10000;
1558 ASSERT_OK(GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions));
1559 ASSERT_EQ(data_after_compact.size(), versions.size());
1560 for (auto &version : versions) {
1561 ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
1562 }
1563 VerifyDB(data_after_compact);
1564 }
1565
1566 // Test compaction filter should remove any blob index where corresponding
1567 // blob file has been removed.
1568 TEST_F(BlobDBTest, FilterFileNotAvailable) {
1569 BlobDBOptions bdb_options;
1570 bdb_options.min_blob_size = 0;
1571 bdb_options.disable_background_tasks = true;
1572 Options options;
1573 options.disable_auto_compactions = true;
1574 Open(bdb_options, options);
1575
1576 ASSERT_OK(Put("foo", "v1"));
1577 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1578 ASSERT_EQ(1, blob_files.size());
1579 ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
1580 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
1581
1582 ASSERT_OK(Put("bar", "v2"));
1583 blob_files = blob_db_impl()->TEST_GetBlobFiles();
1584 ASSERT_EQ(2, blob_files.size());
1585 ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
1586 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
1587
1588 const size_t kMaxKeys = 10000;
1589
1590 DB *base_db = blob_db_->GetRootDB();
1591 std::vector<KeyVersion> versions;
1592 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1593 ASSERT_EQ(2, versions.size());
1594 ASSERT_EQ("bar", versions[0].user_key);
1595 ASSERT_EQ("foo", versions[1].user_key);
1596 VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
1597
1598 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1599 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1600 ASSERT_EQ(2, versions.size());
1601 ASSERT_EQ("bar", versions[0].user_key);
1602 ASSERT_EQ("foo", versions[1].user_key);
1603 VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
1604
1605 // Remove the first blob file and compact. foo should be remove from base db.
1606 blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
1607 blob_db_impl()->TEST_DeleteObsoleteFiles();
1608 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1609 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1610 ASSERT_EQ(1, versions.size());
1611 ASSERT_EQ("bar", versions[0].user_key);
1612 VerifyDB({{"bar", "v2"}});
1613
1614 // Remove the second blob file and compact. bar should be remove from base db.
1615 blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
1616 blob_db_impl()->TEST_DeleteObsoleteFiles();
1617 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1618 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1619 ASSERT_EQ(0, versions.size());
1620 VerifyDB({});
1621 }
1622
1623 // Test compaction filter should filter any inlined TTL keys that would have
1624 // been dropped by last FIFO eviction if they are store out-of-line.
1625 TEST_F(BlobDBTest, FilterForFIFOEviction) {
1626 Random rnd(215);
1627 BlobDBOptions bdb_options;
1628 bdb_options.min_blob_size = 100;
1629 bdb_options.ttl_range_secs = 60;
1630 bdb_options.max_db_size = 0;
1631 bdb_options.disable_background_tasks = true;
1632 Options options;
1633 // Use mock env to stop wall clock.
1634 mock_clock_->SetCurrentTime(0);
1635 options.env = mock_env_.get();
1636 auto statistics = CreateDBStatistics();
1637 options.statistics = statistics;
1638 options.disable_auto_compactions = true;
1639 Open(bdb_options, options);
1640
1641 std::map<std::string, std::string> data;
1642 std::map<std::string, std::string> data_after_compact;
1643 // Insert some small values that will be inlined.
1644 for (int i = 0; i < 1000; i++) {
1645 std::string key = "key" + std::to_string(i);
1646 std::string value = rnd.HumanReadableString(50);
1647 uint64_t ttl = rnd.Next() % 120 + 1;
1648 ASSERT_OK(PutWithTTL(key, value, ttl, &data));
1649 if (ttl >= 60) {
1650 data_after_compact[key] = value;
1651 }
1652 }
1653 uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
1654 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1655 uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
1656 ASSERT_GT(live_sst_size, 0);
1657 VerifyDB(data);
1658
1659 bdb_options.max_db_size = live_sst_size + 30000;
1660 bdb_options.is_fifo = true;
1661 Reopen(bdb_options, options);
1662 VerifyDB(data);
1663
1664 // Put two large values, each on a different blob file.
1665 std::string large_value(10000, 'v');
1666 ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
1667 ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
1668 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
1669 ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1670 data["large_key1"] = large_value;
1671 data["large_key2"] = large_value;
1672 VerifyDB(data);
1673
1674 // Put a third large value which will bring the DB out of space.
1675 // FIFO eviction will evict the file of large_key1.
1676 ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
1677 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1678 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
1679 blob_db_impl()->TEST_DeleteObsoleteFiles();
1680 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1681 data.erase("large_key1");
1682 data["large_key3"] = large_value;
1683 VerifyDB(data);
1684
1685 // Putting some more small values. These values shouldn't be evicted by
1686 // compaction filter since they are inserted after FIFO eviction.
1687 ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
1688 ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
1689
1690 // FIFO eviction doesn't trigger again since there enough room for the flush.
1691 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1692 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1693
1694 // Manual compact and check if compaction filter evict those keys with
1695 // expiration < 60.
1696 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1697 // All keys with expiration < 60, plus large_key1 is filtered by
1698 // compaction filter.
1699 ASSERT_EQ(num_keys_to_evict + 1,
1700 statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
1701 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1702 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1703 data_after_compact["large_key2"] = large_value;
1704 data_after_compact["large_key3"] = large_value;
1705 VerifyDB(data_after_compact);
1706 }
1707
1708 TEST_F(BlobDBTest, GarbageCollection) {
1709 constexpr size_t kNumPuts = 1 << 10;
1710
1711 constexpr uint64_t kExpiration = 1000;
1712 constexpr uint64_t kCompactTime = 500;
1713
1714 constexpr uint64_t kKeySize = 7; // "key" + 4 digits
1715
1716 constexpr uint64_t kSmallValueSize = 1 << 6;
1717 constexpr uint64_t kLargeValueSize = 1 << 8;
1718 constexpr uint64_t kMinBlobSize = 1 << 7;
1719 static_assert(kSmallValueSize < kMinBlobSize, "");
1720 static_assert(kLargeValueSize > kMinBlobSize, "");
1721
1722 constexpr size_t kBlobsPerFile = 8;
1723 constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
1724 constexpr uint64_t kBlobFileSize =
1725 BlobLogHeader::kSize +
1726 (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
1727
1728 BlobDBOptions bdb_options;
1729 bdb_options.min_blob_size = kMinBlobSize;
1730 bdb_options.blob_file_size = kBlobFileSize;
1731 bdb_options.enable_garbage_collection = true;
1732 bdb_options.garbage_collection_cutoff = 0.25;
1733 bdb_options.disable_background_tasks = true;
1734
1735 Options options;
1736 options.env = mock_env_.get();
1737 options.statistics = CreateDBStatistics();
1738
1739 Open(bdb_options, options);
1740
1741 std::map<std::string, std::string> data;
1742 std::map<std::string, KeyVersion> blob_value_versions;
1743 std::map<std::string, BlobIndexVersion> blob_index_versions;
1744
1745 Random rnd(301);
1746
1747 // Add a bunch of large non-TTL values. These will be written to non-TTL
1748 // blob files and will be subject to GC.
1749 for (size_t i = 0; i < kNumPuts; ++i) {
1750 std::ostringstream oss;
1751 oss << "key" << std::setw(4) << std::setfill('0') << i;
1752
1753 const std::string key(oss.str());
1754 const std::string value = rnd.HumanReadableString(kLargeValueSize);
1755 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1756
1757 ASSERT_OK(Put(key, value));
1758 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1759
1760 data[key] = value;
1761 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1762 blob_index_versions[key] =
1763 BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
1764 sequence, kTypeBlobIndex);
1765 }
1766
1767 // Add some small and/or TTL values that will be ignored during GC.
1768 // First, add a large TTL value will be written to its own TTL blob file.
1769 {
1770 const std::string key("key2000");
1771 const std::string value = rnd.HumanReadableString(kLargeValueSize);
1772 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1773
1774 ASSERT_OK(PutUntil(key, value, kExpiration));
1775 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1776
1777 data[key] = value;
1778 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1779 blob_index_versions[key] =
1780 BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
1781 sequence, kTypeBlobIndex);
1782 }
1783
1784 // Now add a small TTL value (which will be inlined).
1785 {
1786 const std::string key("key3000");
1787 const std::string value = rnd.HumanReadableString(kSmallValueSize);
1788 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1789
1790 ASSERT_OK(PutUntil(key, value, kExpiration));
1791 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1792
1793 data[key] = value;
1794 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1795 blob_index_versions[key] = BlobIndexVersion(
1796 key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
1797 }
1798
1799 // Finally, add a small non-TTL value (which will be stored as a regular
1800 // value).
1801 {
1802 const std::string key("key4000");
1803 const std::string value = rnd.HumanReadableString(kSmallValueSize);
1804 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1805
1806 ASSERT_OK(Put(key, value));
1807 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1808
1809 data[key] = value;
1810 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
1811 blob_index_versions[key] = BlobIndexVersion(
1812 key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
1813 }
1814
1815 VerifyDB(data);
1816 VerifyBaseDB(blob_value_versions);
1817 VerifyBaseDBBlobIndex(blob_index_versions);
1818
1819 // At this point, we should have 128 immutable non-TTL files with file numbers
1820 // 1..128.
1821 {
1822 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1823 ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
1824 for (size_t i = 0; i < kNumBlobFiles; ++i) {
1825 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1826 ASSERT_EQ(live_imm_files[i]->GetFileSize(),
1827 kBlobFileSize + BlobLogFooter::kSize);
1828 }
1829 }
1830
1831 mock_clock_->SetCurrentTime(kCompactTime);
1832
1833 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1834
1835 // We expect the data to remain the same and the blobs from the oldest N files
1836 // to be moved to new files. Sequence numbers get zeroed out during the
1837 // compaction.
1838 VerifyDB(data);
1839
1840 for (auto &pair : blob_value_versions) {
1841 KeyVersion &version = pair.second;
1842 version.sequence = 0;
1843 }
1844
1845 VerifyBaseDB(blob_value_versions);
1846
1847 const uint64_t cutoff = static_cast<uint64_t>(
1848 bdb_options.garbage_collection_cutoff * kNumBlobFiles);
1849 for (auto &pair : blob_index_versions) {
1850 BlobIndexVersion &version = pair.second;
1851
1852 version.sequence = 0;
1853
1854 if (version.file_number == kInvalidBlobFileNumber) {
1855 continue;
1856 }
1857
1858 if (version.file_number > cutoff) {
1859 continue;
1860 }
1861
1862 version.file_number += kNumBlobFiles + 1;
1863 }
1864
1865 VerifyBaseDBBlobIndex(blob_index_versions);
1866
1867 const Statistics *const statistics = options.statistics.get();
1868 assert(statistics);
1869
1870 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
1871 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
1872 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
1873 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
1874 cutoff * kBlobsPerFile);
1875 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
1876 cutoff * kBlobsPerFile * kLargeValueSize);
1877
1878 // At this point, we should have 128 immutable non-TTL files with file numbers
1879 // 33..128 and 130..161. (129 was taken by the TTL blob file.)
1880 {
1881 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1882 ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
1883 for (size_t i = 0; i < kNumBlobFiles; ++i) {
1884 uint64_t expected_file_number = i + cutoff + 1;
1885 if (expected_file_number > kNumBlobFiles) {
1886 ++expected_file_number;
1887 }
1888
1889 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
1890 ASSERT_EQ(live_imm_files[i]->GetFileSize(),
1891 kBlobFileSize + BlobLogFooter::kSize);
1892 }
1893 }
1894 }
1895
1896 TEST_F(BlobDBTest, GarbageCollectionFailure) {
1897 BlobDBOptions bdb_options;
1898 bdb_options.min_blob_size = 0;
1899 bdb_options.enable_garbage_collection = true;
1900 bdb_options.garbage_collection_cutoff = 1.0;
1901 bdb_options.disable_background_tasks = true;
1902
1903 Options db_options;
1904 db_options.statistics = CreateDBStatistics();
1905
1906 Open(bdb_options, db_options);
1907
1908 // Write a couple of valid blobs.
1909 ASSERT_OK(Put("foo", "bar"));
1910 ASSERT_OK(Put("dead", "beef"));
1911
1912 // Write a fake blob reference into the base DB that points to a non-existing
1913 // blob file.
1914 std::string blob_index;
1915 BlobIndex::EncodeBlob(&blob_index, /* file_number */ 1000, /* offset */ 1234,
1916 /* size */ 5678, kNoCompression);
1917
1918 WriteBatch batch;
1919 ASSERT_OK(WriteBatchInternal::PutBlobIndex(
1920 &batch, blob_db_->DefaultColumnFamily()->GetID(), "key", blob_index));
1921 ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
1922
1923 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1924 ASSERT_EQ(blob_files.size(), 1);
1925 auto blob_file = blob_files[0];
1926 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
1927
1928 ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
1929 .IsIOError());
1930
1931 const Statistics *const statistics = db_options.statistics.get();
1932 assert(statistics);
1933
1934 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
1935 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
1936 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
1937 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
1938 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
1939 }
1940
1941 // File should be evicted after expiration.
1942 TEST_F(BlobDBTest, EvictExpiredFile) {
1943 BlobDBOptions bdb_options;
1944 bdb_options.ttl_range_secs = 100;
1945 bdb_options.min_blob_size = 0;
1946 bdb_options.disable_background_tasks = true;
1947 Options options;
1948 options.env = mock_env_.get();
1949 Open(bdb_options, options);
1950 mock_clock_->SetCurrentTime(50);
1951 std::map<std::string, std::string> data;
1952 ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
1953 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1954 ASSERT_EQ(1, blob_files.size());
1955 auto blob_file = blob_files[0];
1956 ASSERT_FALSE(blob_file->Immutable());
1957 ASSERT_FALSE(blob_file->Obsolete());
1958 VerifyDB(data);
1959 mock_clock_->SetCurrentTime(250);
1960 // The key should expired now.
1961 blob_db_impl()->TEST_EvictExpiredFiles();
1962 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1963 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1964 ASSERT_TRUE(blob_file->Immutable());
1965 ASSERT_TRUE(blob_file->Obsolete());
1966 blob_db_impl()->TEST_DeleteObsoleteFiles();
1967 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1968 ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
1969 // Make sure we don't return garbage value after blob file being evicted,
1970 // but the blob index still exists in the LSM tree.
1971 std::string val = "";
1972 ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
1973 ASSERT_EQ("", val);
1974 }
1975
1976 TEST_F(BlobDBTest, DisableFileDeletions) {
1977 BlobDBOptions bdb_options;
1978 bdb_options.disable_background_tasks = true;
1979 Open(bdb_options);
1980 std::map<std::string, std::string> data;
1981 for (bool force : {true, false}) {
1982 ASSERT_OK(Put("foo", "v", &data));
1983 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1984 ASSERT_EQ(1, blob_files.size());
1985 auto blob_file = blob_files[0];
1986 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
1987 blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
1988 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1989 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1990 // Call DisableFileDeletions twice.
1991 ASSERT_OK(blob_db_->DisableFileDeletions());
1992 ASSERT_OK(blob_db_->DisableFileDeletions());
1993 // File deletions should be disabled.
1994 blob_db_impl()->TEST_DeleteObsoleteFiles();
1995 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1996 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1997 VerifyDB(data);
1998 // Enable file deletions once. If force=true, file deletion is enabled.
1999 // Otherwise it needs to enable it for a second time.
2000 ASSERT_OK(blob_db_->EnableFileDeletions(force));
2001 blob_db_impl()->TEST_DeleteObsoleteFiles();
2002 if (!force) {
2003 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
2004 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
2005 VerifyDB(data);
2006 // Call EnableFileDeletions a second time.
2007 ASSERT_OK(blob_db_->EnableFileDeletions(false));
2008 blob_db_impl()->TEST_DeleteObsoleteFiles();
2009 }
2010 // Regardless of value of `force`, file should be deleted by now.
2011 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
2012 ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
2013 VerifyDB({});
2014 }
2015 }
2016
2017 TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
2018 BlobDBOptions bdb_options;
2019 bdb_options.enable_garbage_collection = true;
2020 bdb_options.disable_background_tasks = true;
2021 Open(bdb_options);
2022
2023 // Register some dummy blob files.
2024 blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
2025 blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
2026 blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
2027 blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
2028 blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
2029
2030 // Initialize the blob <-> SST file mapping. First, add some SST files with
2031 // blob file references, then some without.
2032 std::vector<LiveFileMetaData> live_files;
2033
2034 for (uint64_t i = 1; i <= 10; ++i) {
2035 LiveFileMetaData live_file;
2036 live_file.file_number = i;
2037 live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
2038
2039 live_files.emplace_back(live_file);
2040 }
2041
2042 for (uint64_t i = 11; i <= 20; ++i) {
2043 LiveFileMetaData live_file;
2044 live_file.file_number = i;
2045
2046 live_files.emplace_back(live_file);
2047 }
2048
2049 blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
2050
2051 // Check that the blob <-> SST mappings have been correctly initialized.
2052 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
2053
2054 ASSERT_EQ(blob_files.size(), 5);
2055
2056 {
2057 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2058 ASSERT_EQ(live_imm_files.size(), 5);
2059 for (size_t i = 0; i < 5; ++i) {
2060 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
2061 }
2062
2063 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2064 }
2065
2066 {
2067 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2068 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
2069 const std::vector<bool> expected_obsolete{false, false, false, false,
2070 false};
2071 for (size_t i = 0; i < 5; ++i) {
2072 const auto &blob_file = blob_files[i];
2073 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2074 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2075 }
2076
2077 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2078 ASSERT_EQ(live_imm_files.size(), 5);
2079 for (size_t i = 0; i < 5; ++i) {
2080 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
2081 }
2082
2083 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2084 }
2085
2086 // Simulate a flush where the SST does not reference any blob files.
2087 {
2088 FlushJobInfo info{};
2089 info.file_number = 21;
2090 info.smallest_seqno = 1;
2091 info.largest_seqno = 100;
2092
2093 blob_db_impl()->TEST_ProcessFlushJobInfo(info);
2094
2095 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2096 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
2097 const std::vector<bool> expected_obsolete{false, false, false, false,
2098 false};
2099 for (size_t i = 0; i < 5; ++i) {
2100 const auto &blob_file = blob_files[i];
2101 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2102 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2103 }
2104
2105 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2106 ASSERT_EQ(live_imm_files.size(), 5);
2107 for (size_t i = 0; i < 5; ++i) {
2108 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
2109 }
2110
2111 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2112 }
2113
2114 // Simulate a flush where the SST references a blob file.
2115 {
2116 FlushJobInfo info{};
2117 info.file_number = 22;
2118 info.oldest_blob_file_number = 5;
2119 info.smallest_seqno = 101;
2120 info.largest_seqno = 200;
2121
2122 blob_db_impl()->TEST_ProcessFlushJobInfo(info);
2123
2124 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2125 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
2126 const std::vector<bool> expected_obsolete{false, false, false, false,
2127 false};
2128 for (size_t i = 0; i < 5; ++i) {
2129 const auto &blob_file = blob_files[i];
2130 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2131 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2132 }
2133
2134 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2135 ASSERT_EQ(live_imm_files.size(), 5);
2136 for (size_t i = 0; i < 5; ++i) {
2137 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
2138 }
2139
2140 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2141 }
2142
2143 // Simulate a compaction. Some inputs and outputs have blob file references,
2144 // some don't. There is also a trivial move (which means the SST appears on
2145 // both the input and the output list). Blob file 1 loses all its linked SSTs,
2146 // and since it got marked immutable at sequence number 200 which has already
2147 // been flushed, it can be marked obsolete.
2148 {
2149 CompactionJobInfo info{};
2150 info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
2151 info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
2152 info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
2153 info.input_file_infos.emplace_back(
2154 CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
2155 info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
2156 info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
2157 info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
2158 info.output_file_infos.emplace_back(
2159 CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
2160
2161 blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
2162
2163 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2164 {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
2165 const std::vector<bool> expected_obsolete{true, false, false, false, false};
2166 for (size_t i = 0; i < 5; ++i) {
2167 const auto &blob_file = blob_files[i];
2168 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2169 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2170 }
2171
2172 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2173 ASSERT_EQ(live_imm_files.size(), 4);
2174 for (size_t i = 0; i < 4; ++i) {
2175 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
2176 }
2177
2178 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
2179 ASSERT_EQ(obsolete_files.size(), 1);
2180 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
2181 }
2182
2183 // Simulate a failed compaction. No mappings should be updated.
2184 {
2185 CompactionJobInfo info{};
2186 info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
2187 info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
2188 info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
2189 info.status = Status::Corruption();
2190
2191 blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
2192
2193 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2194 {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
2195 const std::vector<bool> expected_obsolete{true, false, false, false, false};
2196 for (size_t i = 0; i < 5; ++i) {
2197 const auto &blob_file = blob_files[i];
2198 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2199 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2200 }
2201
2202 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2203 ASSERT_EQ(live_imm_files.size(), 4);
2204 for (size_t i = 0; i < 4; ++i) {
2205 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
2206 }
2207
2208 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
2209 ASSERT_EQ(obsolete_files.size(), 1);
2210 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
2211 }
2212
2213 // Simulate another compaction. Blob file 2 loses all its linked SSTs
2214 // but since it got marked immutable at sequence number 300 which hasn't
2215 // been flushed yet, it cannot be marked obsolete at this point.
2216 {
2217 CompactionJobInfo info{};
2218 info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
2219 info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
2220 info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
2221
2222 blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
2223
2224 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2225 {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
2226 const std::vector<bool> expected_obsolete{true, false, false, false, false};
2227 for (size_t i = 0; i < 5; ++i) {
2228 const auto &blob_file = blob_files[i];
2229 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2230 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2231 }
2232
2233 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2234 ASSERT_EQ(live_imm_files.size(), 4);
2235 for (size_t i = 0; i < 4; ++i) {
2236 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
2237 }
2238
2239 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
2240 ASSERT_EQ(obsolete_files.size(), 1);
2241 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
2242 }
2243
2244 // Simulate a flush with largest sequence number 300. This will make it
2245 // possible to mark blob file 2 obsolete.
2246 {
2247 FlushJobInfo info{};
2248 info.file_number = 26;
2249 info.smallest_seqno = 201;
2250 info.largest_seqno = 300;
2251
2252 blob_db_impl()->TEST_ProcessFlushJobInfo(info);
2253
2254 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2255 {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
2256 const std::vector<bool> expected_obsolete{true, true, false, false, false};
2257 for (size_t i = 0; i < 5; ++i) {
2258 const auto &blob_file = blob_files[i];
2259 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2260 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2261 }
2262
2263 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2264 ASSERT_EQ(live_imm_files.size(), 3);
2265 for (size_t i = 0; i < 3; ++i) {
2266 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
2267 }
2268
2269 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
2270 ASSERT_EQ(obsolete_files.size(), 2);
2271 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
2272 ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
2273 }
2274 }
2275
2276 TEST_F(BlobDBTest, ShutdownWait) {
2277 BlobDBOptions bdb_options;
2278 bdb_options.ttl_range_secs = 100;
2279 bdb_options.min_blob_size = 0;
2280 bdb_options.disable_background_tasks = false;
2281 Options options;
2282 options.env = mock_env_.get();
2283
2284 SyncPoint::GetInstance()->LoadDependency({
2285 {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
2286 {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
2287 {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
2288 {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
2289 });
2290 // Force all tasks to be scheduled immediately.
2291 SyncPoint::GetInstance()->SetCallBack(
2292 "TimeQueue::Add:item.end", [&](void *arg) {
2293 std::chrono::steady_clock::time_point *tp =
2294 static_cast<std::chrono::steady_clock::time_point *>(arg);
2295 *tp =
2296 std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
2297 });
2298
2299 SyncPoint::GetInstance()->SetCallBack(
2300 "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
2301 // Sleep 3 ms to increase the chance of data race.
2302 // We've synced up the code so that EvictExpiredFiles()
2303 // is called concurrently with ~BlobDBImpl().
2304 // ~BlobDBImpl() is supposed to wait for all background
2305 // task to shutdown before doing anything else. In order
2306 // to use the same test to reproduce a bug of the waiting
2307 // logic, we wait a little bit here, so that TSAN can
2308 // catch the data race.
2309 // We should improve the test if we find a better way.
2310 Env::Default()->SleepForMicroseconds(3000);
2311 });
2312
2313 SyncPoint::GetInstance()->EnableProcessing();
2314
2315 Open(bdb_options, options);
2316 mock_clock_->SetCurrentTime(50);
2317 std::map<std::string, std::string> data;
2318 ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
2319 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
2320 ASSERT_EQ(1, blob_files.size());
2321 auto blob_file = blob_files[0];
2322 ASSERT_FALSE(blob_file->Immutable());
2323 ASSERT_FALSE(blob_file->Obsolete());
2324 VerifyDB(data);
2325
2326 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
2327 mock_clock_->SetCurrentTime(250);
2328 // The key should expired now.
2329 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
2330
2331 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
2332 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
2333 Close();
2334
2335 SyncPoint::GetInstance()->DisableProcessing();
2336 }
2337
2338 TEST_F(BlobDBTest, SyncBlobFileBeforeClose) {
2339 Options options;
2340 options.statistics = CreateDBStatistics();
2341
2342 BlobDBOptions blob_options;
2343 blob_options.min_blob_size = 0;
2344 blob_options.bytes_per_sync = 1 << 20;
2345 blob_options.disable_background_tasks = true;
2346
2347 Open(blob_options, options);
2348
2349 ASSERT_OK(Put("foo", "bar"));
2350
2351 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
2352 ASSERT_EQ(blob_files.size(), 1);
2353
2354 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
2355 ASSERT_EQ(options.statistics->getTickerCount(BLOB_DB_BLOB_FILE_SYNCED), 1);
2356 }
2357
2358 TEST_F(BlobDBTest, SyncBlobFileBeforeCloseIOError) {
2359 Options options;
2360 options.env = fault_injection_env_.get();
2361
2362 BlobDBOptions blob_options;
2363 blob_options.min_blob_size = 0;
2364 blob_options.bytes_per_sync = 1 << 20;
2365 blob_options.disable_background_tasks = true;
2366
2367 Open(blob_options, options);
2368
2369 ASSERT_OK(Put("foo", "bar"));
2370
2371 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
2372 ASSERT_EQ(blob_files.size(), 1);
2373
2374 SyncPoint::GetInstance()->SetCallBack(
2375 "BlobLogWriter::Sync", [this](void * /* arg */) {
2376 fault_injection_env_->SetFilesystemActive(false, Status::IOError());
2377 });
2378 SyncPoint::GetInstance()->EnableProcessing();
2379
2380 const Status s = blob_db_impl()->TEST_CloseBlobFile(blob_files[0]);
2381
2382 fault_injection_env_->SetFilesystemActive(true);
2383 SyncPoint::GetInstance()->DisableProcessing();
2384 SyncPoint::GetInstance()->ClearAllCallBacks();
2385
2386 ASSERT_TRUE(s.IsIOError());
2387 }
2388
2389 } // namespace blob_db
2390 } // namespace ROCKSDB_NAMESPACE
2391
2392 // A black-box test for the ttl wrapper around rocksdb
2393 int main(int argc, char **argv) {
2394 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2395 ::testing::InitGoogleTest(&argc, argv);
2396 return RUN_ALL_TESTS();
2397 }
2398
2399 #else
2400 #include <stdio.h>
2401
2402 int main(int /*argc*/, char** /*argv*/) {
2403 fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
2404 return 0;
2405 }
2406
2407 #endif // !ROCKSDB_LITE