]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/blob_db/blob_db_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_db_test.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
20effc67
TL
8#include "utilities/blob_db/blob_db.h"
9
11fdf7f2 10#include <algorithm>
494da23a 11#include <chrono>
11fdf7f2 12#include <cstdlib>
f67539c2 13#include <iomanip>
11fdf7f2
TL
14#include <map>
15#include <memory>
f67539c2 16#include <sstream>
11fdf7f2
TL
17#include <string>
18#include <vector>
19
20effc67 20#include "db/blob/blob_index.h"
11fdf7f2 21#include "db/db_test_util.h"
f67539c2
TL
22#include "env/composite_env_wrapper.h"
23#include "file/file_util.h"
24#include "file/sst_file_manager_impl.h"
11fdf7f2
TL
25#include "port/port.h"
26#include "rocksdb/utilities/debug.h"
f67539c2
TL
27#include "test_util/sync_point.h"
28#include "test_util/testharness.h"
7c673cae 29#include "util/random.h"
11fdf7f2 30#include "util/string_util.h"
11fdf7f2 31#include "utilities/blob_db/blob_db_impl.h"
20effc67 32#include "utilities/fault_injection_env.h"
7c673cae 33
f67539c2 34namespace ROCKSDB_NAMESPACE {
11fdf7f2
TL
35namespace blob_db {
36
7c673cae
FG
37class BlobDBTest : public testing::Test {
38 public:
11fdf7f2
TL
39 const int kMaxBlobSize = 1 << 14;
40
f67539c2
TL
41 struct BlobIndexVersion {
42 BlobIndexVersion() = default;
43 BlobIndexVersion(std::string _user_key, uint64_t _file_number,
44 uint64_t _expiration, SequenceNumber _sequence,
45 ValueType _type)
46 : user_key(std::move(_user_key)),
47 file_number(_file_number),
48 expiration(_expiration),
49 sequence(_sequence),
50 type(_type) {}
51
52 std::string user_key;
53 uint64_t file_number = kInvalidBlobFileNumber;
54 uint64_t expiration = kNoExpiration;
55 SequenceNumber sequence = 0;
56 ValueType type = kTypeValue;
11fdf7f2
TL
57 };
58
59 BlobDBTest()
60 : dbname_(test::PerThreadDBPath("blob_db_test")),
61 mock_env_(new MockTimeEnv(Env::Default())),
62 fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
63 blob_db_(nullptr) {
64 Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
65 assert(s.ok());
66 }
67
494da23a 68 ~BlobDBTest() override {
11fdf7f2
TL
69 SyncPoint::GetInstance()->ClearAllCallBacks();
70 Destroy();
71 }
72
73 Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
74 Options options = Options()) {
7c673cae 75 options.create_if_missing = true;
20effc67
TL
76 if (options.env == mock_env_.get()) {
77 // Need to disable stats dumping and persisting which also use
78 // RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
79 // With mocked time, this can hang on some platforms (MacOS)
80 // because (a) on some platforms, pthread_cond_timedwait does not appear
81 // to release the lock for other threads to operate if the deadline time
82 // is already passed, and (b) TimedWait calls are currently a bad
83 // abstraction because the deadline parameter is usually computed from
84 // Env time, but is interpreted in real clock time.
85 options.stats_dump_period_sec = 0;
86 options.stats_persist_period_sec = 0;
87 }
11fdf7f2
TL
88 return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
89 }
90
91 void Open(BlobDBOptions bdb_options = BlobDBOptions(),
92 Options options = Options()) {
93 ASSERT_OK(TryOpen(bdb_options, options));
94 }
95
96 void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
97 Options options = Options()) {
98 assert(blob_db_ != nullptr);
99 delete blob_db_;
100 blob_db_ = nullptr;
101 Open(bdb_options, options);
102 }
103
494da23a
TL
104 void Close() {
105 assert(blob_db_ != nullptr);
106 delete blob_db_;
107 blob_db_ = nullptr;
108 }
109
11fdf7f2
TL
110 void Destroy() {
111 if (blob_db_) {
112 Options options = blob_db_->GetOptions();
113 BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
114 delete blob_db_;
115 blob_db_ = nullptr;
116 ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
117 }
118 }
119
120 BlobDBImpl *blob_db_impl() {
121 return reinterpret_cast<BlobDBImpl *>(blob_db_);
122 }
123
124 Status Put(const Slice &key, const Slice &value,
125 std::map<std::string, std::string> *data = nullptr) {
126 Status s = blob_db_->Put(WriteOptions(), key, value);
127 if (data != nullptr) {
128 (*data)[key.ToString()] = value.ToString();
129 }
130 return s;
131 }
132
133 void Delete(const std::string &key,
134 std::map<std::string, std::string> *data = nullptr) {
135 ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
136 if (data != nullptr) {
137 data->erase(key);
138 }
139 }
140
141 Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
142 std::map<std::string, std::string> *data = nullptr) {
143 Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
144 if (data != nullptr) {
145 (*data)[key.ToString()] = value.ToString();
146 }
147 return s;
148 }
149
150 Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
151 return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
7c673cae
FG
152 }
153
11fdf7f2
TL
154 void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
155 std::map<std::string, std::string> *data = nullptr) {
156 int len = rnd->Next() % kMaxBlobSize + 1;
20effc67 157 std::string value = rnd->HumanReadableString(len);
11fdf7f2
TL
158 ASSERT_OK(
159 blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
160 if (data != nullptr) {
161 (*data)[key] = value;
162 }
163 }
164
165 void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
166 std::map<std::string, std::string> *data = nullptr) {
167 int len = rnd->Next() % kMaxBlobSize + 1;
20effc67 168 std::string value = rnd->HumanReadableString(len);
11fdf7f2
TL
169 ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
170 expiration));
171 if (data != nullptr) {
172 (*data)[key] = value;
173 }
174 }
175
176 void PutRandom(const std::string &key, Random *rnd,
177 std::map<std::string, std::string> *data = nullptr) {
178 PutRandom(blob_db_, key, rnd, data);
179 }
180
181 void PutRandom(DB *db, const std::string &key, Random *rnd,
182 std::map<std::string, std::string> *data = nullptr) {
183 int len = rnd->Next() % kMaxBlobSize + 1;
20effc67 184 std::string value = rnd->HumanReadableString(len);
11fdf7f2
TL
185 ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
186 if (data != nullptr) {
187 (*data)[key] = value;
188 }
189 }
190
191 void PutRandomToWriteBatch(
192 const std::string &key, Random *rnd, WriteBatch *batch,
193 std::map<std::string, std::string> *data = nullptr) {
194 int len = rnd->Next() % kMaxBlobSize + 1;
20effc67 195 std::string value = rnd->HumanReadableString(len);
11fdf7f2
TL
196 ASSERT_OK(batch->Put(key, value));
197 if (data != nullptr) {
198 (*data)[key] = value;
199 }
200 }
201
202 // Verify blob db contain expected data and nothing more.
203 void VerifyDB(const std::map<std::string, std::string> &data) {
204 VerifyDB(blob_db_, data);
205 }
206
207 void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
208 // Verify normal Get
209 auto* cfh = db->DefaultColumnFamily();
210 for (auto &p : data) {
211 PinnableSlice value_slice;
212 ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
213 ASSERT_EQ(p.second, value_slice.ToString());
214 std::string value;
215 ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
216 ASSERT_EQ(p.second, value);
217 }
218
219 // Verify iterators
220 Iterator *iter = db->NewIterator(ReadOptions());
221 iter->SeekToFirst();
222 for (auto &p : data) {
223 ASSERT_TRUE(iter->Valid());
224 ASSERT_EQ(p.first, iter->key().ToString());
225 ASSERT_EQ(p.second, iter->value().ToString());
226 iter->Next();
227 }
228 ASSERT_FALSE(iter->Valid());
229 ASSERT_OK(iter->status());
230 delete iter;
231 }
7c673cae 232
11fdf7f2
TL
233 void VerifyBaseDB(
234 const std::map<std::string, KeyVersion> &expected_versions) {
235 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
236 DB *db = blob_db_->GetRootDB();
237 const size_t kMaxKeys = 10000;
238 std::vector<KeyVersion> versions;
239 GetAllKeyVersions(db, "", "", kMaxKeys, &versions);
240 ASSERT_EQ(expected_versions.size(), versions.size());
241 size_t i = 0;
242 for (auto &key_version : expected_versions) {
243 const KeyVersion &expected_version = key_version.second;
244 ASSERT_EQ(expected_version.user_key, versions[i].user_key);
245 ASSERT_EQ(expected_version.sequence, versions[i].sequence);
246 ASSERT_EQ(expected_version.type, versions[i].type);
247 if (versions[i].type == kTypeValue) {
248 ASSERT_EQ(expected_version.value, versions[i].value);
249 } else {
250 ASSERT_EQ(kTypeBlobIndex, versions[i].type);
251 PinnableSlice value;
252 ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
253 versions[i].value, &value));
254 ASSERT_EQ(expected_version.value, value.ToString());
255 }
256 i++;
257 }
258 }
259
f67539c2
TL
260 void VerifyBaseDBBlobIndex(
261 const std::map<std::string, BlobIndexVersion> &expected_versions) {
262 const size_t kMaxKeys = 10000;
263 std::vector<KeyVersion> versions;
264 ASSERT_OK(
265 GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions));
266 ASSERT_EQ(versions.size(), expected_versions.size());
267
268 size_t i = 0;
269 for (const auto &expected_pair : expected_versions) {
270 const BlobIndexVersion &expected_version = expected_pair.second;
271
272 ASSERT_EQ(versions[i].user_key, expected_version.user_key);
273 ASSERT_EQ(versions[i].sequence, expected_version.sequence);
274 ASSERT_EQ(versions[i].type, expected_version.type);
275 if (versions[i].type != kTypeBlobIndex) {
276 ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
277 ASSERT_EQ(kNoExpiration, expected_version.expiration);
278
279 ++i;
280 continue;
281 }
282
283 BlobIndex blob_index;
284 ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
285
286 const uint64_t file_number = !blob_index.IsInlined()
287 ? blob_index.file_number()
288 : kInvalidBlobFileNumber;
289 ASSERT_EQ(file_number, expected_version.file_number);
290
291 const uint64_t expiration =
292 blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
293 ASSERT_EQ(expiration, expected_version.expiration);
294
295 ++i;
296 }
297 }
298
11fdf7f2
TL
299 void InsertBlobs() {
300 WriteOptions wo;
301 std::string value;
302
303 Random rnd(301);
304 for (size_t i = 0; i < 100000; i++) {
305 uint64_t ttl = rnd.Next() % 86400;
306 PutRandomWithTTL("key" + ToString(i % 500), ttl, &rnd, nullptr);
307 }
308
309 for (size_t i = 0; i < 10; i++) {
310 Delete("key" + ToString(i % 500));
311 }
312 }
313
314 const std::string dbname_;
315 std::unique_ptr<MockTimeEnv> mock_env_;
316 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
317 BlobDB *blob_db_;
7c673cae
FG
318}; // class BlobDBTest
319
11fdf7f2
TL
320TEST_F(BlobDBTest, Put) {
321 Random rnd(301);
322 BlobDBOptions bdb_options;
323 bdb_options.min_blob_size = 0;
324 bdb_options.disable_background_tasks = true;
325 Open(bdb_options);
326 std::map<std::string, std::string> data;
327 for (size_t i = 0; i < 100; i++) {
328 PutRandom("key" + ToString(i), &rnd, &data);
329 }
330 VerifyDB(data);
331}
7c673cae 332
11fdf7f2
TL
333TEST_F(BlobDBTest, PutWithTTL) {
334 Random rnd(301);
335 Options options;
336 options.env = mock_env_.get();
337 BlobDBOptions bdb_options;
338 bdb_options.ttl_range_secs = 1000;
339 bdb_options.min_blob_size = 0;
340 bdb_options.blob_file_size = 256 * 1000 * 1000;
341 bdb_options.disable_background_tasks = true;
342 Open(bdb_options, options);
343 std::map<std::string, std::string> data;
344 mock_env_->set_current_time(50);
345 for (size_t i = 0; i < 100; i++) {
346 uint64_t ttl = rnd.Next() % 100;
347 PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
348 (ttl <= 50 ? nullptr : &data));
349 }
350 mock_env_->set_current_time(100);
351 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
352 auto blob_files = bdb_impl->TEST_GetBlobFiles();
353 ASSERT_EQ(1, blob_files.size());
354 ASSERT_TRUE(blob_files[0]->HasTTL());
355 ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
11fdf7f2
TL
356 VerifyDB(data);
357}
7c673cae 358
11fdf7f2
TL
359TEST_F(BlobDBTest, PutUntil) {
360 Random rnd(301);
361 Options options;
362 options.env = mock_env_.get();
363 BlobDBOptions bdb_options;
364 bdb_options.ttl_range_secs = 1000;
365 bdb_options.min_blob_size = 0;
366 bdb_options.blob_file_size = 256 * 1000 * 1000;
367 bdb_options.disable_background_tasks = true;
368 Open(bdb_options, options);
369 std::map<std::string, std::string> data;
370 mock_env_->set_current_time(50);
371 for (size_t i = 0; i < 100; i++) {
372 uint64_t expiration = rnd.Next() % 100 + 50;
373 PutRandomUntil("key" + ToString(i), expiration, &rnd,
374 (expiration <= 100 ? nullptr : &data));
375 }
376 mock_env_->set_current_time(100);
377 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
378 auto blob_files = bdb_impl->TEST_GetBlobFiles();
379 ASSERT_EQ(1, blob_files.size());
380 ASSERT_TRUE(blob_files[0]->HasTTL());
381 ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
11fdf7f2
TL
382 VerifyDB(data);
383}
384
385TEST_F(BlobDBTest, StackableDBGet) {
386 Random rnd(301);
387 BlobDBOptions bdb_options;
388 bdb_options.min_blob_size = 0;
389 bdb_options.disable_background_tasks = true;
390 Open(bdb_options);
391 std::map<std::string, std::string> data;
392 for (size_t i = 0; i < 100; i++) {
393 PutRandom("key" + ToString(i), &rnd, &data);
394 }
395 for (size_t i = 0; i < 100; i++) {
396 StackableDB *db = blob_db_;
397 ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
398 std::string key = "key" + ToString(i);
399 PinnableSlice pinnable_value;
400 ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
401 std::string string_value;
402 ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
403 ASSERT_EQ(string_value, pinnable_value.ToString());
404 ASSERT_EQ(string_value, data[key]);
405 }
7c673cae
FG
406}
407
11fdf7f2
TL
408TEST_F(BlobDBTest, GetExpiration) {
409 Options options;
410 options.env = mock_env_.get();
411 BlobDBOptions bdb_options;
412 bdb_options.disable_background_tasks = true;
413 mock_env_->set_current_time(100);
414 Open(bdb_options, options);
415 Put("key1", "value1");
416 PutWithTTL("key2", "value2", 200);
417 PinnableSlice value;
418 uint64_t expiration;
419 ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
420 ASSERT_EQ("value1", value.ToString());
421 ASSERT_EQ(kNoExpiration, expiration);
422 ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
423 ASSERT_EQ("value2", value.ToString());
424 ASSERT_EQ(300 /* = 100 + 200 */, expiration);
425}
426
427TEST_F(BlobDBTest, GetIOError) {
428 Options options;
429 options.env = fault_injection_env_.get();
430 BlobDBOptions bdb_options;
431 bdb_options.min_blob_size = 0; // Make sure value write to blob file
432 bdb_options.disable_background_tasks = true;
433 Open(bdb_options, options);
434 ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
435 PinnableSlice value;
436 ASSERT_OK(Put("foo", "bar"));
437 fault_injection_env_->SetFilesystemActive(false, Status::IOError());
438 Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
439 ASSERT_TRUE(s.IsIOError());
440 // Reactivate file system to allow test to close DB.
441 fault_injection_env_->SetFilesystemActive(true);
442}
443
494da23a
TL
444TEST_F(BlobDBTest, PutIOError) {
445 Options options;
446 options.env = fault_injection_env_.get();
447 BlobDBOptions bdb_options;
448 bdb_options.min_blob_size = 0; // Make sure value write to blob file
449 bdb_options.disable_background_tasks = true;
450 Open(bdb_options, options);
451 fault_injection_env_->SetFilesystemActive(false, Status::IOError());
452 ASSERT_TRUE(Put("foo", "v1").IsIOError());
453 fault_injection_env_->SetFilesystemActive(true, Status::IOError());
454 ASSERT_OK(Put("bar", "v1"));
455}
456
11fdf7f2 457TEST_F(BlobDBTest, WriteBatch) {
7c673cae 458 Random rnd(301);
11fdf7f2
TL
459 BlobDBOptions bdb_options;
460 bdb_options.min_blob_size = 0;
461 bdb_options.disable_background_tasks = true;
462 Open(bdb_options);
463 std::map<std::string, std::string> data;
464 for (size_t i = 0; i < 100; i++) {
465 WriteBatch batch;
466 for (size_t j = 0; j < 10; j++) {
467 PutRandomToWriteBatch("key" + ToString(j * 100 + i), &rnd, &batch, &data);
468 }
469 blob_db_->Write(WriteOptions(), &batch);
470 }
471 VerifyDB(data);
472}
7c673cae 473
11fdf7f2
TL
474TEST_F(BlobDBTest, Delete) {
475 Random rnd(301);
476 BlobDBOptions bdb_options;
477 bdb_options.min_blob_size = 0;
478 bdb_options.disable_background_tasks = true;
479 Open(bdb_options);
480 std::map<std::string, std::string> data;
481 for (size_t i = 0; i < 100; i++) {
482 PutRandom("key" + ToString(i), &rnd, &data);
483 }
484 for (size_t i = 0; i < 100; i += 5) {
485 Delete("key" + ToString(i), &data);
486 }
487 VerifyDB(data);
488}
489
490TEST_F(BlobDBTest, DeleteBatch) {
491 Random rnd(301);
492 BlobDBOptions bdb_options;
493 bdb_options.min_blob_size = 0;
494 bdb_options.disable_background_tasks = true;
495 Open(bdb_options);
496 for (size_t i = 0; i < 100; i++) {
497 PutRandom("key" + ToString(i), &rnd);
498 }
499 WriteBatch batch;
500 for (size_t i = 0; i < 100; i++) {
501 batch.Delete("key" + ToString(i));
502 }
503 ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
504 // DB should be empty.
505 VerifyDB({});
506}
507
508TEST_F(BlobDBTest, Override) {
509 Random rnd(301);
510 BlobDBOptions bdb_options;
511 bdb_options.min_blob_size = 0;
512 bdb_options.disable_background_tasks = true;
513 Open(bdb_options);
514 std::map<std::string, std::string> data;
515 for (int i = 0; i < 10000; i++) {
516 PutRandom("key" + ToString(i), &rnd, nullptr);
517 }
518 // override all the keys
519 for (int i = 0; i < 10000; i++) {
520 PutRandom("key" + ToString(i), &rnd, &data);
521 }
522 VerifyDB(data);
523}
7c673cae 524
11fdf7f2
TL
525#ifdef SNAPPY
526TEST_F(BlobDBTest, Compression) {
527 Random rnd(301);
528 BlobDBOptions bdb_options;
529 bdb_options.min_blob_size = 0;
530 bdb_options.disable_background_tasks = true;
531 bdb_options.compression = CompressionType::kSnappyCompression;
532 Open(bdb_options);
533 std::map<std::string, std::string> data;
534 for (size_t i = 0; i < 100; i++) {
535 PutRandom("put-key" + ToString(i), &rnd, &data);
536 }
537 for (int i = 0; i < 100; i++) {
538 WriteBatch batch;
539 for (size_t j = 0; j < 10; j++) {
540 PutRandomToWriteBatch("write-batch-key" + ToString(j * 100 + i), &rnd,
541 &batch, &data);
542 }
543 blob_db_->Write(WriteOptions(), &batch);
544 }
545 VerifyDB(data);
546}
547
548TEST_F(BlobDBTest, DecompressAfterReopen) {
549 Random rnd(301);
550 BlobDBOptions bdb_options;
551 bdb_options.min_blob_size = 0;
552 bdb_options.disable_background_tasks = true;
553 bdb_options.compression = CompressionType::kSnappyCompression;
554 Open(bdb_options);
555 std::map<std::string, std::string> data;
556 for (size_t i = 0; i < 100; i++) {
557 PutRandom("put-key" + ToString(i), &rnd, &data);
558 }
559 VerifyDB(data);
560 bdb_options.compression = CompressionType::kNoCompression;
561 Reopen(bdb_options);
562 VerifyDB(data);
563}
20effc67
TL
564
565TEST_F(BlobDBTest, EnableDisableCompressionGC) {
566 Random rnd(301);
567 BlobDBOptions bdb_options;
568 bdb_options.min_blob_size = 0;
569 bdb_options.enable_garbage_collection = true;
570 bdb_options.garbage_collection_cutoff = 1.0;
571 bdb_options.disable_background_tasks = true;
572 bdb_options.compression = kSnappyCompression;
573 Open(bdb_options);
574 std::map<std::string, std::string> data;
575 size_t data_idx = 0;
576 for (; data_idx < 100; data_idx++) {
577 PutRandom("put-key" + ToString(data_idx), &rnd, &data);
578 }
579 VerifyDB(data);
580 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
581 ASSERT_EQ(1, blob_files.size());
582 ASSERT_EQ(kSnappyCompression, blob_files[0]->GetCompressionType());
583
584 // disable compression
585 bdb_options.compression = kNoCompression;
586 Reopen(bdb_options);
587
588 // Add more data with new compression type
589 for (; data_idx < 200; data_idx++) {
590 PutRandom("put-key" + ToString(data_idx), &rnd, &data);
591 }
592 VerifyDB(data);
593
594 blob_files = blob_db_impl()->TEST_GetBlobFiles();
595 ASSERT_EQ(2, blob_files.size());
596 ASSERT_EQ(kNoCompression, blob_files[1]->GetCompressionType());
597
598 // Trigger compaction
599 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
600 blob_db_impl()->TEST_DeleteObsoleteFiles();
601 VerifyDB(data);
602
603 blob_files = blob_db_impl()->TEST_GetBlobFiles();
604 for (auto bfile : blob_files) {
605 ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
606 }
607
608 // enabling the compression again
609 bdb_options.compression = kSnappyCompression;
610 Reopen(bdb_options);
611
612 // Add more data with new compression type
613 for (; data_idx < 300; data_idx++) {
614 PutRandom("put-key" + ToString(data_idx), &rnd, &data);
615 }
616 VerifyDB(data);
617
618 // Trigger compaction
619 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
620 blob_db_impl()->TEST_DeleteObsoleteFiles();
621 VerifyDB(data);
622
623 blob_files = blob_db_impl()->TEST_GetBlobFiles();
624 for (auto bfile : blob_files) {
625 ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
626 }
627}
628
629#ifdef LZ4
630// Test switch compression types and run GC, it needs both Snappy and LZ4
631// support.
632TEST_F(BlobDBTest, ChangeCompressionGC) {
633 Random rnd(301);
634 BlobDBOptions bdb_options;
635 bdb_options.min_blob_size = 0;
636 bdb_options.enable_garbage_collection = true;
637 bdb_options.garbage_collection_cutoff = 1.0;
638 bdb_options.disable_background_tasks = true;
639 bdb_options.compression = kLZ4Compression;
640 Open(bdb_options);
641 std::map<std::string, std::string> data;
642 size_t data_idx = 0;
643 for (; data_idx < 100; data_idx++) {
644 PutRandom("put-key" + ToString(data_idx), &rnd, &data);
645 }
646 VerifyDB(data);
647 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
648 ASSERT_EQ(1, blob_files.size());
649 ASSERT_EQ(kLZ4Compression, blob_files[0]->GetCompressionType());
650
651 // Change compression type
652 bdb_options.compression = kSnappyCompression;
653 Reopen(bdb_options);
654
655 // Add more data with Snappy compression type
656 for (; data_idx < 200; data_idx++) {
657 PutRandom("put-key" + ToString(data_idx), &rnd, &data);
658 }
659 VerifyDB(data);
660
661 // Verify blob file compression type
662 blob_files = blob_db_impl()->TEST_GetBlobFiles();
663 ASSERT_EQ(2, blob_files.size());
664 ASSERT_EQ(kSnappyCompression, blob_files[1]->GetCompressionType());
665
666 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
667 VerifyDB(data);
668
669 blob_db_impl()->TEST_DeleteObsoleteFiles();
670 blob_files = blob_db_impl()->TEST_GetBlobFiles();
671 for (auto bfile : blob_files) {
672 ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
673 }
674
675 // Disable compression
676 bdb_options.compression = kNoCompression;
677 Reopen(bdb_options);
678 for (; data_idx < 300; data_idx++) {
679 PutRandom("put-key" + ToString(data_idx), &rnd, &data);
680 }
681 VerifyDB(data);
682
683 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
684 VerifyDB(data);
685
686 blob_db_impl()->TEST_DeleteObsoleteFiles();
687 blob_files = blob_db_impl()->TEST_GetBlobFiles();
688 for (auto bfile : blob_files) {
689 ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
690 }
691
692 // switching different compression types to generate mixed compression types
693 bdb_options.compression = kSnappyCompression;
694 Reopen(bdb_options);
695 for (; data_idx < 400; data_idx++) {
696 PutRandom("put-key" + ToString(data_idx), &rnd, &data);
697 }
698 VerifyDB(data);
699
700 bdb_options.compression = kLZ4Compression;
701 Reopen(bdb_options);
702 for (; data_idx < 500; data_idx++) {
703 PutRandom("put-key" + ToString(data_idx), &rnd, &data);
704 }
705 VerifyDB(data);
706
707 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
708 VerifyDB(data);
709
710 blob_db_impl()->TEST_DeleteObsoleteFiles();
711 blob_files = blob_db_impl()->TEST_GetBlobFiles();
712 for (auto bfile : blob_files) {
713 ASSERT_EQ(kLZ4Compression, bfile->GetCompressionType());
714 }
715}
716#endif // LZ4
717#endif // SNAPPY
11fdf7f2
TL
718
719TEST_F(BlobDBTest, MultipleWriters) {
720 Open(BlobDBOptions());
721
722 std::vector<port::Thread> workers;
723 std::vector<std::map<std::string, std::string>> data_set(10);
724 for (uint32_t i = 0; i < 10; i++)
725 workers.push_back(port::Thread(
726 [&](uint32_t id) {
727 Random rnd(301 + id);
728 for (int j = 0; j < 100; j++) {
729 std::string key = "key" + ToString(id) + "_" + ToString(j);
730 if (id < 5) {
731 PutRandom(key, &rnd, &data_set[id]);
732 } else {
733 WriteBatch batch;
734 PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
735 blob_db_->Write(WriteOptions(), &batch);
736 }
737 }
738 },
739 i));
740 std::map<std::string, std::string> data;
741 for (size_t i = 0; i < 10; i++) {
742 workers[i].join();
743 data.insert(data_set[i].begin(), data_set[i].end());
744 }
745 VerifyDB(data);
746}
747
494da23a
TL
748TEST_F(BlobDBTest, SstFileManager) {
749 // run the same test for Get(), MultiGet() and Iterator each.
750 std::shared_ptr<SstFileManager> sst_file_manager(
751 NewSstFileManager(mock_env_.get()));
752 sst_file_manager->SetDeleteRateBytesPerSecond(1);
753 SstFileManagerImpl *sfm =
754 static_cast<SstFileManagerImpl *>(sst_file_manager.get());
755
756 BlobDBOptions bdb_options;
757 bdb_options.min_blob_size = 0;
f67539c2
TL
758 bdb_options.enable_garbage_collection = true;
759 bdb_options.garbage_collection_cutoff = 1.0;
494da23a
TL
760 Options db_options;
761
494da23a 762 int files_scheduled_to_delete = 0;
f67539c2
TL
763 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
764 "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
765 assert(arg);
766 const std::string *const file_path =
767 static_cast<const std::string *>(arg);
768 if (file_path->find(".blob") != std::string::npos) {
769 ++files_scheduled_to_delete;
770 }
771 });
494da23a
TL
772 SyncPoint::GetInstance()->EnableProcessing();
773 db_options.sst_file_manager = sst_file_manager;
774
775 Open(bdb_options, db_options);
776
777 // Create one obselete file and clean it.
778 blob_db_->Put(WriteOptions(), "foo", "bar");
779 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
780 ASSERT_EQ(1, blob_files.size());
781 std::shared_ptr<BlobFile> bfile = blob_files[0];
782 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
f67539c2 783 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
494da23a
TL
784 blob_db_impl()->TEST_DeleteObsoleteFiles();
785
786 // Even if SSTFileManager is not set, DB is creating a dummy one.
787 ASSERT_EQ(1, files_scheduled_to_delete);
494da23a
TL
788 Destroy();
789 // Make sure that DestroyBlobDB() also goes through delete scheduler.
f67539c2 790 ASSERT_EQ(2, files_scheduled_to_delete);
494da23a
TL
791 SyncPoint::GetInstance()->DisableProcessing();
792 sfm->WaitForEmptyTrash();
793}
794
795TEST_F(BlobDBTest, SstFileManagerRestart) {
494da23a 796 int files_scheduled_to_delete = 0;
f67539c2
TL
797 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
798 "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
799 assert(arg);
800 const std::string *const file_path =
801 static_cast<const std::string *>(arg);
802 if (file_path->find(".blob") != std::string::npos) {
803 ++files_scheduled_to_delete;
804 }
805 });
494da23a
TL
806
807 // run the same test for Get(), MultiGet() and Iterator each.
808 std::shared_ptr<SstFileManager> sst_file_manager(
809 NewSstFileManager(mock_env_.get()));
810 sst_file_manager->SetDeleteRateBytesPerSecond(1);
811 SstFileManagerImpl *sfm =
812 static_cast<SstFileManagerImpl *>(sst_file_manager.get());
813
814 BlobDBOptions bdb_options;
815 bdb_options.min_blob_size = 0;
816 Options db_options;
817
818 SyncPoint::GetInstance()->EnableProcessing();
819 db_options.sst_file_manager = sst_file_manager;
820
821 Open(bdb_options, db_options);
822 std::string blob_dir = blob_db_impl()->TEST_blob_dir();
823 blob_db_->Put(WriteOptions(), "foo", "bar");
824 Close();
825
826 // Create 3 dummy trash files under the blob_dir
f67539c2
TL
827 LegacyFileSystemWrapper fs(db_options.env);
828 CreateFile(&fs, blob_dir + "/000666.blob.trash", "", false);
829 CreateFile(&fs, blob_dir + "/000888.blob.trash", "", true);
830 CreateFile(&fs, blob_dir + "/something_not_match.trash", "", false);
494da23a
TL
831
832 // Make sure that reopening the DB rescan the existing trash files
833 Open(bdb_options, db_options);
f67539c2 834 ASSERT_EQ(files_scheduled_to_delete, 2);
494da23a
TL
835
836 sfm->WaitForEmptyTrash();
837
838 // There should be exact one file under the blob dir now.
839 std::vector<std::string> all_files;
840 ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
841 int nfiles = 0;
842 for (const auto &f : all_files) {
843 assert(!f.empty());
844 if (f[0] == '.') {
845 continue;
846 }
847 nfiles++;
848 }
849 ASSERT_EQ(nfiles, 1);
850
851 SyncPoint::GetInstance()->DisableProcessing();
852}
853
11fdf7f2
TL
854TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
855 BlobDBOptions bdb_options;
856 bdb_options.min_blob_size = 0;
f67539c2
TL
857 bdb_options.enable_garbage_collection = true;
858 bdb_options.garbage_collection_cutoff = 1.0;
11fdf7f2 859 bdb_options.disable_background_tasks = true;
f67539c2 860
11fdf7f2
TL
861 // i = when to take snapshot
862 for (int i = 0; i < 4; i++) {
f67539c2
TL
863 Destroy();
864 Open(bdb_options);
865
866 const Snapshot *snapshot = nullptr;
867
868 // First file
869 ASSERT_OK(Put("key1", "value"));
870 if (i == 0) {
871 snapshot = blob_db_->GetSnapshot();
872 }
873
874 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
875 ASSERT_EQ(1, blob_files.size());
876 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
877
878 // Second file
879 ASSERT_OK(Put("key2", "value"));
880 if (i == 1) {
881 snapshot = blob_db_->GetSnapshot();
882 }
883
884 blob_files = blob_db_impl()->TEST_GetBlobFiles();
885 ASSERT_EQ(2, blob_files.size());
886 auto bfile = blob_files[1];
887 ASSERT_FALSE(bfile->Immutable());
888 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
889
890 // Third file
891 ASSERT_OK(Put("key3", "value"));
892 if (i == 2) {
893 snapshot = blob_db_->GetSnapshot();
894 }
895
896 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
897 ASSERT_TRUE(bfile->Obsolete());
898 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
899 bfile->GetObsoleteSequence());
900
901 Delete("key2");
902 if (i == 3) {
903 snapshot = blob_db_->GetSnapshot();
904 }
905
906 ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
907 blob_db_impl()->TEST_DeleteObsoleteFiles();
908
909 if (i >= 2) {
910 // The snapshot shouldn't see data in bfile
911 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
912 blob_db_->ReleaseSnapshot(snapshot);
913 } else {
914 // The snapshot will see data in bfile, so the file shouldn't be deleted
915 ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
916 blob_db_->ReleaseSnapshot(snapshot);
11fdf7f2 917 blob_db_impl()->TEST_DeleteObsoleteFiles();
f67539c2 918 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
11fdf7f2
TL
919 }
920 }
921}
922
923TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
924 Options options;
925 options.env = mock_env_.get();
926 mock_env_->set_current_time(0);
927 Open(BlobDBOptions(), options);
928 ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
929 ColumnFamilyHandle *handle = nullptr;
7c673cae 930 std::string value;
11fdf7f2
TL
931 std::vector<std::string> values;
932 // The call simply pass through to base db. It should succeed.
933 ASSERT_OK(
934 blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
935 ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
936 ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
937 .IsNotSupported());
938 ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
939 .IsNotSupported());
940 WriteBatch batch;
941 batch.Put("k1", "v1");
942 batch.Put(handle, "k2", "v2");
943 ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
944 ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
945 ASSERT_TRUE(
946 blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
947 auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
948 {"k1", "k2"}, &values);
949 ASSERT_EQ(2, statuses.size());
950 ASSERT_TRUE(statuses[0].IsNotSupported());
951 ASSERT_TRUE(statuses[1].IsNotSupported());
952 ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
953 delete handle;
954}
955
956TEST_F(BlobDBTest, GetLiveFilesMetaData) {
957 Random rnd(301);
20effc67 958
11fdf7f2
TL
959 BlobDBOptions bdb_options;
960 bdb_options.blob_dir = "blob_dir";
961 bdb_options.path_relative = true;
20effc67 962 bdb_options.ttl_range_secs = 10;
11fdf7f2
TL
963 bdb_options.min_blob_size = 0;
964 bdb_options.disable_background_tasks = true;
20effc67
TL
965
966 Options options;
967 options.env = mock_env_.get();
968
969 Open(bdb_options, options);
970
11fdf7f2
TL
971 std::map<std::string, std::string> data;
972 for (size_t i = 0; i < 100; i++) {
973 PutRandom("key" + ToString(i), &rnd, &data);
974 }
20effc67
TL
975
976 constexpr uint64_t expiration = 1000ULL;
977 PutRandomUntil("key100", expiration, &rnd, &data);
978
11fdf7f2
TL
979 std::vector<LiveFileMetaData> metadata;
980 blob_db_->GetLiveFilesMetaData(&metadata);
20effc67
TL
981
982 ASSERT_EQ(2U, metadata.size());
11fdf7f2 983 // Path should be relative to db_name, but begin with slash.
20effc67
TL
984 const std::string filename1("/blob_dir/000001.blob");
985 ASSERT_EQ(filename1, metadata[0].name);
f67539c2 986 ASSERT_EQ(1, metadata[0].file_number);
20effc67
TL
987 ASSERT_EQ(0, metadata[0].oldest_ancester_time);
988 ASSERT_EQ(kDefaultColumnFamilyName, metadata[0].column_family_name);
989
990 const std::string filename2("/blob_dir/000002.blob");
991 ASSERT_EQ(filename2, metadata[1].name);
992 ASSERT_EQ(2, metadata[1].file_number);
993 ASSERT_EQ(expiration, metadata[1].oldest_ancester_time);
994 ASSERT_EQ(kDefaultColumnFamilyName, metadata[1].column_family_name);
995
11fdf7f2
TL
996 std::vector<std::string> livefile;
997 uint64_t mfs;
998 ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
20effc67
TL
999 ASSERT_EQ(5U, livefile.size());
1000 ASSERT_EQ(filename1, livefile[3]);
1001 ASSERT_EQ(filename2, livefile[4]);
11fdf7f2
TL
1002 VerifyDB(data);
1003}
1004
1005TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
1006 constexpr size_t kNumKey = 20;
1007 constexpr size_t kNumIteration = 10;
1008 Random rnd(301);
1009 std::map<std::string, std::string> data;
1010 std::vector<bool> is_blob(kNumKey, false);
1011
1012 // Write to plain rocksdb.
1013 Options options;
1014 options.create_if_missing = true;
1015 DB *db = nullptr;
1016 ASSERT_OK(DB::Open(options, dbname_, &db));
1017 for (size_t i = 0; i < kNumIteration; i++) {
1018 auto key_index = rnd.Next() % kNumKey;
1019 std::string key = "key" + ToString(key_index);
1020 PutRandom(db, key, &rnd, &data);
1021 }
1022 VerifyDB(db, data);
1023 delete db;
1024 db = nullptr;
1025
1026 // Open as blob db. Verify it can read existing data.
1027 Open();
1028 VerifyDB(blob_db_, data);
1029 for (size_t i = 0; i < kNumIteration; i++) {
1030 auto key_index = rnd.Next() % kNumKey;
1031 std::string key = "key" + ToString(key_index);
1032 is_blob[key_index] = true;
1033 PutRandom(blob_db_, key, &rnd, &data);
1034 }
1035 VerifyDB(blob_db_, data);
1036 delete blob_db_;
1037 blob_db_ = nullptr;
1038
1039 // Verify plain db return error for keys written by blob db.
1040 ASSERT_OK(DB::Open(options, dbname_, &db));
1041 std::string value;
1042 for (size_t i = 0; i < kNumKey; i++) {
1043 std::string key = "key" + ToString(i);
1044 Status s = db->Get(ReadOptions(), key, &value);
1045 if (data.count(key) == 0) {
1046 ASSERT_TRUE(s.IsNotFound());
1047 } else if (is_blob[i]) {
20effc67 1048 ASSERT_TRUE(s.IsCorruption());
11fdf7f2
TL
1049 } else {
1050 ASSERT_OK(s);
1051 ASSERT_EQ(data[key], value);
1052 }
1053 }
1054 delete db;
1055}
1056
1057// Test to verify that a NoSpace IOError Status is returned on reaching
1058// max_db_size limit.
1059TEST_F(BlobDBTest, OutOfSpace) {
1060 // Use mock env to stop wall clock.
1061 Options options;
1062 options.env = mock_env_.get();
1063 BlobDBOptions bdb_options;
1064 bdb_options.max_db_size = 200;
1065 bdb_options.is_fifo = false;
1066 bdb_options.disable_background_tasks = true;
1067 Open(bdb_options);
1068
1069 // Each stored blob has an overhead of about 42 bytes currently.
1070 // So a small key + a 100 byte blob should take up ~150 bytes in the db.
1071 std::string value(100, 'v');
1072 ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
1073
1074 // Putting another blob should fail as ading it would exceed the max_db_size
1075 // limit.
1076 Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
1077 ASSERT_TRUE(s.IsIOError());
1078 ASSERT_TRUE(s.IsNoSpace());
1079}
1080
1081TEST_F(BlobDBTest, FIFOEviction) {
1082 BlobDBOptions bdb_options;
1083 bdb_options.max_db_size = 200;
1084 bdb_options.blob_file_size = 100;
1085 bdb_options.is_fifo = true;
1086 bdb_options.disable_background_tasks = true;
1087 Open(bdb_options);
1088
1089 std::atomic<int> evict_count{0};
1090 SyncPoint::GetInstance()->SetCallBack(
1091 "BlobDBImpl::EvictOldestBlobFile:Evicted",
1092 [&](void *) { evict_count++; });
1093 SyncPoint::GetInstance()->EnableProcessing();
1094
1095 // Each stored blob has an overhead of 32 bytes currently.
1096 // So a 100 byte blob should take up 132 bytes.
1097 std::string value(100, 'v');
1098 ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
1099 VerifyDB({{"key1", value}});
1100
1101 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1102
1103 // Adding another 100 bytes blob would take the total size to 264 bytes
1104 // (2*132). max_db_size will be exceeded
1105 // than max_db_size and trigger FIFO eviction.
1106 ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
1107 ASSERT_EQ(1, evict_count);
1108 // key1 will exist until corresponding file be deleted.
1109 VerifyDB({{"key1", value}, {"key2", value}});
1110
1111 // Adding another 100 bytes blob without TTL.
1112 ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
1113 ASSERT_EQ(2, evict_count);
1114 // key1 and key2 will exist until corresponding file be deleted.
1115 VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
1116
1117 // The fourth blob file, without TTL.
1118 ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
1119 ASSERT_EQ(3, evict_count);
1120 VerifyDB(
1121 {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
1122
1123 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1124 ASSERT_EQ(4, blob_files.size());
1125 ASSERT_TRUE(blob_files[0]->Obsolete());
1126 ASSERT_TRUE(blob_files[1]->Obsolete());
1127 ASSERT_TRUE(blob_files[2]->Obsolete());
1128 ASSERT_FALSE(blob_files[3]->Obsolete());
1129 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1130 ASSERT_EQ(3, obsolete_files.size());
1131 ASSERT_EQ(blob_files[0], obsolete_files[0]);
1132 ASSERT_EQ(blob_files[1], obsolete_files[1]);
1133 ASSERT_EQ(blob_files[2], obsolete_files[2]);
1134
1135 blob_db_impl()->TEST_DeleteObsoleteFiles();
1136 obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1137 ASSERT_TRUE(obsolete_files.empty());
1138 VerifyDB({{"key4", value}});
1139}
1140
1141TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
1142 Options options;
1143 BlobDBOptions bdb_options;
1144 bdb_options.max_db_size = 1000;
1145 bdb_options.blob_file_size = 5000;
1146 bdb_options.is_fifo = true;
1147 bdb_options.disable_background_tasks = true;
1148 Open(bdb_options);
1149
1150 std::atomic<int> evict_count{0};
1151 SyncPoint::GetInstance()->SetCallBack(
1152 "BlobDBImpl::EvictOldestBlobFile:Evicted",
1153 [&](void *) { evict_count++; });
1154 SyncPoint::GetInstance()->EnableProcessing();
1155
1156 std::string value(2000, 'v');
1157 ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
1158 ASSERT_EQ(0, evict_count);
7c673cae
FG
1159}
1160
11fdf7f2
TL
1161TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
1162 BlobDBOptions bdb_options;
1163 bdb_options.is_fifo = true;
1164 bdb_options.min_blob_size = 100;
1165 bdb_options.disable_background_tasks = true;
1166 Options options;
1167 // Use mock env to stop wall clock.
1168 options.env = mock_env_.get();
1169 options.disable_auto_compactions = true;
1170 auto statistics = CreateDBStatistics();
1171 options.statistics = statistics;
1172 Open(bdb_options, options);
1173
1174 ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
1175 std::string small_value(50, 'v');
1176 std::map<std::string, std::string> data;
1177 // Insert some data into LSM tree to make sure FIFO eviction take SST
1178 // file size into account.
1179 for (int i = 0; i < 1000; i++) {
1180 ASSERT_OK(Put("key" + ToString(i), small_value, &data));
1181 }
1182 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1183 uint64_t live_sst_size = 0;
1184 ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
1185 &live_sst_size));
1186 ASSERT_TRUE(live_sst_size > 0);
1187 ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
1188
1189 bdb_options.max_db_size = live_sst_size + 2000;
1190 Reopen(bdb_options, options);
1191 ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
1192
1193 std::string value_1k(1000, 'v');
1194 ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
1195 ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1196 VerifyDB(data);
1197 // large_key2 evicts large_key1
1198 ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
1199 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1200 blob_db_impl()->TEST_DeleteObsoleteFiles();
1201 data.erase("large_key1");
1202 VerifyDB(data);
1203 // large_key3 get no enough space even after evicting large_key2, so it
1204 // instead return no space error.
1205 std::string value_2k(2000, 'v');
1206 ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
1207 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1208 // Verify large_key2 still exists.
1209 VerifyDB(data);
1210}
1211
1212// Test flush or compaction will trigger FIFO eviction since they update
1213// total SST file size.
1214TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
1215 BlobDBOptions bdb_options;
1216 bdb_options.max_db_size = 1000;
1217 bdb_options.is_fifo = true;
1218 bdb_options.min_blob_size = 100;
1219 bdb_options.disable_background_tasks = true;
1220 Options options;
1221 // Use mock env to stop wall clock.
1222 options.env = mock_env_.get();
1223 auto statistics = CreateDBStatistics();
1224 options.statistics = statistics;
1225 options.compression = kNoCompression;
1226 Open(bdb_options, options);
1227
1228 std::string value(800, 'v');
1229 ASSERT_OK(PutWithTTL("large_key", value, 60));
1230 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1231 ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1232 VerifyDB({{"large_key", value}});
1233
1234 // Insert some small keys and flush to bring DB out of space.
1235 std::map<std::string, std::string> data;
1236 for (int i = 0; i < 10; i++) {
1237 ASSERT_OK(Put("key" + ToString(i), "v", &data));
1238 }
1239 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1240
1241 // Verify large_key is deleted by FIFO eviction.
1242 blob_db_impl()->TEST_DeleteObsoleteFiles();
1243 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1244 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1245 VerifyDB(data);
1246}
1247
1248TEST_F(BlobDBTest, InlineSmallValues) {
1249 constexpr uint64_t kMaxExpiration = 1000;
1250 Random rnd(301);
1251 BlobDBOptions bdb_options;
1252 bdb_options.ttl_range_secs = kMaxExpiration;
1253 bdb_options.min_blob_size = 100;
1254 bdb_options.blob_file_size = 256 * 1000 * 1000;
1255 bdb_options.disable_background_tasks = true;
1256 Options options;
1257 options.env = mock_env_.get();
1258 mock_env_->set_current_time(0);
1259 Open(bdb_options, options);
1260 std::map<std::string, std::string> data;
1261 std::map<std::string, KeyVersion> versions;
1262 for (size_t i = 0; i < 1000; i++) {
1263 bool is_small_value = rnd.Next() % 2;
1264 bool has_ttl = rnd.Next() % 2;
1265 uint64_t expiration = rnd.Next() % kMaxExpiration;
1266 int len = is_small_value ? 50 : 200;
1267 std::string key = "key" + ToString(i);
20effc67 1268 std::string value = rnd.HumanReadableString(len);
11fdf7f2
TL
1269 std::string blob_index;
1270 data[key] = value;
1271 SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1272 if (!has_ttl) {
1273 ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
1274 } else {
1275 ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
1276 }
1277 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1278 versions[key] =
1279 KeyVersion(key, value, sequence,
1280 (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
1281 }
1282 VerifyDB(data);
1283 VerifyBaseDB(versions);
1284 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
1285 auto blob_files = bdb_impl->TEST_GetBlobFiles();
1286 ASSERT_EQ(2, blob_files.size());
1287 std::shared_ptr<BlobFile> non_ttl_file;
1288 std::shared_ptr<BlobFile> ttl_file;
1289 if (blob_files[0]->HasTTL()) {
1290 ttl_file = blob_files[0];
1291 non_ttl_file = blob_files[1];
1292 } else {
1293 non_ttl_file = blob_files[0];
1294 ttl_file = blob_files[1];
1295 }
1296 ASSERT_FALSE(non_ttl_file->HasTTL());
1297 ASSERT_TRUE(ttl_file->HasTTL());
1298}
1299
20effc67
TL
1300TEST_F(BlobDBTest, UserCompactionFilter) {
1301 class CustomerFilter : public CompactionFilter {
1302 public:
1303 bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
1304 std::string *new_value, bool *value_changed) const override {
1305 *value_changed = false;
1306 // changing value size to test value transitions between inlined data
1307 // and stored-in-blob data
1308 if (value.size() % 4 == 1) {
1309 *new_value = value.ToString();
1310 // double size by duplicating value
1311 *new_value += *new_value;
1312 *value_changed = true;
1313 return false;
1314 } else if (value.size() % 3 == 1) {
1315 *new_value = value.ToString();
1316 // trancate value size by half
1317 *new_value = new_value->substr(0, new_value->size() / 2);
1318 *value_changed = true;
1319 return false;
1320 } else if (value.size() % 2 == 1) {
1321 return true;
1322 }
1323 return false;
1324 }
1325 bool IgnoreSnapshots() const override { return true; }
1326 const char *Name() const override { return "CustomerFilter"; }
11fdf7f2 1327 };
20effc67
TL
1328 class CustomerFilterFactory : public CompactionFilterFactory {
1329 const char *Name() const override { return "CustomerFilterFactory"; }
494da23a
TL
1330 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
1331 const CompactionFilter::Context & /*context*/) override {
20effc67 1332 return std::unique_ptr<CompactionFilter>(new CustomerFilter());
11fdf7f2
TL
1333 }
1334 };
20effc67
TL
1335
1336 constexpr size_t kNumPuts = 1 << 10;
1337 // Generate both inlined and blob value
1338 constexpr uint64_t kMinValueSize = 1 << 6;
1339 constexpr uint64_t kMaxValueSize = 1 << 8;
1340 constexpr uint64_t kMinBlobSize = 1 << 7;
1341 static_assert(kMinValueSize < kMinBlobSize, "");
1342 static_assert(kMaxValueSize > kMinBlobSize, "");
1343
1344 BlobDBOptions bdb_options;
1345 bdb_options.min_blob_size = kMinBlobSize;
1346 bdb_options.blob_file_size = kMaxValueSize * 10;
1347 bdb_options.disable_background_tasks = true;
1348 if (Snappy_Supported()) {
1349 bdb_options.compression = CompressionType::kSnappyCompression;
1350 }
1351 // case_num == 0: Test user defined compaction filter
1352 // case_num == 1: Test user defined compaction filter factory
1353 for (int case_num = 0; case_num < 2; case_num++) {
11fdf7f2 1354 Options options;
20effc67
TL
1355 if (case_num == 0) {
1356 options.compaction_filter = new CustomerFilter();
11fdf7f2 1357 } else {
20effc67
TL
1358 options.compaction_filter_factory.reset(new CustomerFilterFactory());
1359 }
1360 options.disable_auto_compactions = true;
1361 options.env = mock_env_.get();
1362 options.statistics = CreateDBStatistics();
1363 Open(bdb_options, options);
1364
1365 std::map<std::string, std::string> data;
1366 std::map<std::string, std::string> data_after_compact;
1367 Random rnd(301);
1368 uint64_t value_size = kMinValueSize;
1369 int drop_record = 0;
1370 for (size_t i = 0; i < kNumPuts; ++i) {
1371 std::ostringstream oss;
1372 oss << "key" << std::setw(4) << std::setfill('0') << i;
1373
1374 const std::string key(oss.str());
1375 const std::string value = rnd.HumanReadableString((int)value_size);
1376 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1377
1378 ASSERT_OK(Put(key, value));
1379 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1380
1381 data[key] = value;
1382 if (value.length() % 4 == 1) {
1383 data_after_compact[key] = value + value;
1384 } else if (value.length() % 3 == 1) {
1385 data_after_compact[key] = value.substr(0, value.size() / 2);
1386 } else if (value.length() % 2 == 1) {
1387 ++drop_record;
1388 } else {
1389 data_after_compact[key] = value;
1390 }
1391
1392 if (++value_size > kMaxValueSize) {
1393 value_size = kMinValueSize;
1394 }
11fdf7f2 1395 }
20effc67
TL
1396 // Verify full data set
1397 VerifyDB(data);
1398 // Applying compaction filter for records
1399 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1400 // Verify data after compaction, only value with even length left.
1401 VerifyDB(data_after_compact);
1402 ASSERT_EQ(drop_record,
1403 options.statistics->getTickerCount(COMPACTION_KEY_DROP_USER));
11fdf7f2 1404 delete options.compaction_filter;
20effc67
TL
1405 Destroy();
1406 }
1407}
1408
1409// Test user comapction filter when there is IO error on blob data.
1410TEST_F(BlobDBTest, UserCompactionFilter_BlobIOError) {
1411 class CustomerFilter : public CompactionFilter {
1412 public:
1413 bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
1414 std::string *new_value, bool *value_changed) const override {
1415 *new_value = value.ToString() + "_new";
1416 *value_changed = true;
1417 return false;
1418 }
1419 bool IgnoreSnapshots() const override { return true; }
1420 const char *Name() const override { return "CustomerFilter"; }
1421 };
1422
1423 constexpr size_t kNumPuts = 100;
1424 constexpr int kValueSize = 100;
1425
1426 BlobDBOptions bdb_options;
1427 bdb_options.min_blob_size = 0;
1428 bdb_options.blob_file_size = kValueSize * 10;
1429 bdb_options.disable_background_tasks = true;
1430 bdb_options.compression = CompressionType::kNoCompression;
1431
1432 std::vector<std::string> io_failure_cases = {
1433 "BlobDBImpl::CreateBlobFileAndWriter",
1434 "BlobIndexCompactionFilterBase::WriteBlobToNewFile",
1435 "BlobDBImpl::CloseBlobFile"};
1436
1437 for (size_t case_num = 0; case_num < io_failure_cases.size(); case_num++) {
1438 Options options;
1439 options.compaction_filter = new CustomerFilter();
1440 options.disable_auto_compactions = true;
1441 options.env = fault_injection_env_.get();
1442 options.statistics = CreateDBStatistics();
1443 Open(bdb_options, options);
1444
1445 std::map<std::string, std::string> data;
1446 Random rnd(301);
1447 for (size_t i = 0; i < kNumPuts; ++i) {
1448 std::ostringstream oss;
1449 oss << "key" << std::setw(4) << std::setfill('0') << i;
1450
1451 const std::string key(oss.str());
1452 const std::string value = rnd.HumanReadableString(kValueSize);
1453 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1454
1455 ASSERT_OK(Put(key, value));
1456 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1457 data[key] = value;
1458 }
1459
1460 // Verify full data set
1461 VerifyDB(data);
1462
1463 SyncPoint::GetInstance()->SetCallBack(
1464 io_failure_cases[case_num], [&](void * /*arg*/) {
1465 fault_injection_env_->SetFilesystemActive(false, Status::IOError());
1466 });
1467 SyncPoint::GetInstance()->EnableProcessing();
1468 auto s = blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
1469 ASSERT_TRUE(s.IsIOError());
1470
1471 // Reactivate file system to allow test to verify and close DB.
1472 fault_injection_env_->SetFilesystemActive(true);
1473 SyncPoint::GetInstance()->DisableProcessing();
1474 SyncPoint::GetInstance()->ClearAllCallBacks();
1475
1476 // Verify full data set after compaction failure
1477 VerifyDB(data);
1478
1479 delete options.compaction_filter;
1480 Destroy();
11fdf7f2
TL
1481 }
1482}
1483
1484// Test comapction filter should remove any expired blob index.
1485TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
1486 constexpr size_t kNumKeys = 100;
1487 constexpr size_t kNumPuts = 1000;
1488 constexpr uint64_t kMaxExpiration = 1000;
1489 constexpr uint64_t kCompactTime = 500;
1490 constexpr uint64_t kMinBlobSize = 100;
1491 Random rnd(301);
1492 mock_env_->set_current_time(0);
1493 BlobDBOptions bdb_options;
1494 bdb_options.min_blob_size = kMinBlobSize;
1495 bdb_options.disable_background_tasks = true;
1496 Options options;
1497 options.env = mock_env_.get();
1498 Open(bdb_options, options);
1499
1500 std::map<std::string, std::string> data;
1501 std::map<std::string, std::string> data_after_compact;
1502 for (size_t i = 0; i < kNumPuts; i++) {
1503 bool is_small_value = rnd.Next() % 2;
1504 bool has_ttl = rnd.Next() % 2;
1505 uint64_t expiration = rnd.Next() % kMaxExpiration;
1506 int len = is_small_value ? 10 : 200;
1507 std::string key = "key" + ToString(rnd.Next() % kNumKeys);
20effc67 1508 std::string value = rnd.HumanReadableString(len);
11fdf7f2
TL
1509 if (!has_ttl) {
1510 if (is_small_value) {
1511 std::string blob_entry;
1512 BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
1513 // Fake blob index with TTL. See what it will do.
1514 ASSERT_GT(kMinBlobSize, blob_entry.size());
1515 value = blob_entry;
1516 }
1517 ASSERT_OK(Put(key, value));
1518 data_after_compact[key] = value;
1519 } else {
1520 ASSERT_OK(PutUntil(key, value, expiration));
1521 if (expiration <= kCompactTime) {
1522 data_after_compact.erase(key);
1523 } else {
1524 data_after_compact[key] = value;
1525 }
1526 }
1527 data[key] = value;
1528 }
1529 VerifyDB(data);
1530
1531 mock_env_->set_current_time(kCompactTime);
1532 // Take a snapshot before compaction. Make sure expired blob indexes is
1533 // filtered regardless of snapshot.
1534 const Snapshot *snapshot = blob_db_->GetSnapshot();
1535 // Issue manual compaction to trigger compaction filter.
f67539c2 1536 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
11fdf7f2
TL
1537 blob_db_->ReleaseSnapshot(snapshot);
1538 // Verify expired blob index are filtered.
1539 std::vector<KeyVersion> versions;
1540 const size_t kMaxKeys = 10000;
1541 GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions);
1542 ASSERT_EQ(data_after_compact.size(), versions.size());
1543 for (auto &version : versions) {
1544 ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
1545 }
1546 VerifyDB(data_after_compact);
1547}
1548
1549// Test compaction filter should remove any blob index where corresponding
f67539c2 1550// blob file has been removed.
11fdf7f2
TL
1551TEST_F(BlobDBTest, FilterFileNotAvailable) {
1552 BlobDBOptions bdb_options;
1553 bdb_options.min_blob_size = 0;
1554 bdb_options.disable_background_tasks = true;
1555 Options options;
1556 options.disable_auto_compactions = true;
1557 Open(bdb_options, options);
1558
1559 ASSERT_OK(Put("foo", "v1"));
1560 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1561 ASSERT_EQ(1, blob_files.size());
1562 ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
1563 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
1564
1565 ASSERT_OK(Put("bar", "v2"));
1566 blob_files = blob_db_impl()->TEST_GetBlobFiles();
1567 ASSERT_EQ(2, blob_files.size());
1568 ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
1569 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
1570
1571 const size_t kMaxKeys = 10000;
1572
1573 DB *base_db = blob_db_->GetRootDB();
1574 std::vector<KeyVersion> versions;
1575 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1576 ASSERT_EQ(2, versions.size());
1577 ASSERT_EQ("bar", versions[0].user_key);
1578 ASSERT_EQ("foo", versions[1].user_key);
1579 VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
1580
11fdf7f2
TL
1581 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1582 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1583 ASSERT_EQ(2, versions.size());
1584 ASSERT_EQ("bar", versions[0].user_key);
1585 ASSERT_EQ("foo", versions[1].user_key);
1586 VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
1587
1588 // Remove the first blob file and compact. foo should be remove from base db.
1589 blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
1590 blob_db_impl()->TEST_DeleteObsoleteFiles();
1591 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1592 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1593 ASSERT_EQ(1, versions.size());
1594 ASSERT_EQ("bar", versions[0].user_key);
1595 VerifyDB({{"bar", "v2"}});
1596
1597 // Remove the second blob file and compact. bar should be remove from base db.
1598 blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
1599 blob_db_impl()->TEST_DeleteObsoleteFiles();
1600 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1601 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1602 ASSERT_EQ(0, versions.size());
1603 VerifyDB({});
1604}
1605
1606// Test compaction filter should filter any inlined TTL keys that would have
1607// been dropped by last FIFO eviction if they are store out-of-line.
1608TEST_F(BlobDBTest, FilterForFIFOEviction) {
1609 Random rnd(215);
1610 BlobDBOptions bdb_options;
1611 bdb_options.min_blob_size = 100;
1612 bdb_options.ttl_range_secs = 60;
1613 bdb_options.max_db_size = 0;
1614 bdb_options.disable_background_tasks = true;
1615 Options options;
1616 // Use mock env to stop wall clock.
1617 mock_env_->set_current_time(0);
1618 options.env = mock_env_.get();
1619 auto statistics = CreateDBStatistics();
1620 options.statistics = statistics;
1621 options.disable_auto_compactions = true;
1622 Open(bdb_options, options);
1623
1624 std::map<std::string, std::string> data;
1625 std::map<std::string, std::string> data_after_compact;
1626 // Insert some small values that will be inlined.
1627 for (int i = 0; i < 1000; i++) {
1628 std::string key = "key" + ToString(i);
20effc67 1629 std::string value = rnd.HumanReadableString(50);
11fdf7f2
TL
1630 uint64_t ttl = rnd.Next() % 120 + 1;
1631 ASSERT_OK(PutWithTTL(key, value, ttl, &data));
1632 if (ttl >= 60) {
1633 data_after_compact[key] = value;
1634 }
1635 }
1636 uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
1637 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1638 uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
1639 ASSERT_GT(live_sst_size, 0);
1640 VerifyDB(data);
1641
1642 bdb_options.max_db_size = live_sst_size + 30000;
1643 bdb_options.is_fifo = true;
1644 Reopen(bdb_options, options);
1645 VerifyDB(data);
1646
1647 // Put two large values, each on a different blob file.
1648 std::string large_value(10000, 'v');
1649 ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
1650 ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
1651 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
1652 ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1653 data["large_key1"] = large_value;
1654 data["large_key2"] = large_value;
1655 VerifyDB(data);
1656
1657 // Put a third large value which will bring the DB out of space.
1658 // FIFO eviction will evict the file of large_key1.
1659 ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
1660 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1661 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
1662 blob_db_impl()->TEST_DeleteObsoleteFiles();
1663 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1664 data.erase("large_key1");
1665 data["large_key3"] = large_value;
1666 VerifyDB(data);
1667
1668 // Putting some more small values. These values shouldn't be evicted by
1669 // compaction filter since they are inserted after FIFO eviction.
1670 ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
1671 ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
1672
1673 // FIFO eviction doesn't trigger again since there enough room for the flush.
1674 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1675 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1676
1677 // Manual compact and check if compaction filter evict those keys with
1678 // expiration < 60.
1679 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1680 // All keys with expiration < 60, plus large_key1 is filtered by
1681 // compaction filter.
1682 ASSERT_EQ(num_keys_to_evict + 1,
1683 statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
1684 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1685 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1686 data_after_compact["large_key2"] = large_value;
1687 data_after_compact["large_key3"] = large_value;
1688 VerifyDB(data_after_compact);
1689}
1690
f67539c2
TL
1691TEST_F(BlobDBTest, GarbageCollection) {
1692 constexpr size_t kNumPuts = 1 << 10;
1693
1694 constexpr uint64_t kExpiration = 1000;
1695 constexpr uint64_t kCompactTime = 500;
1696
1697 constexpr uint64_t kKeySize = 7; // "key" + 4 digits
1698
1699 constexpr uint64_t kSmallValueSize = 1 << 6;
1700 constexpr uint64_t kLargeValueSize = 1 << 8;
1701 constexpr uint64_t kMinBlobSize = 1 << 7;
1702 static_assert(kSmallValueSize < kMinBlobSize, "");
1703 static_assert(kLargeValueSize > kMinBlobSize, "");
1704
1705 constexpr size_t kBlobsPerFile = 8;
1706 constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
1707 constexpr uint64_t kBlobFileSize =
1708 BlobLogHeader::kSize +
1709 (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
1710
1711 BlobDBOptions bdb_options;
1712 bdb_options.min_blob_size = kMinBlobSize;
1713 bdb_options.blob_file_size = kBlobFileSize;
1714 bdb_options.enable_garbage_collection = true;
1715 bdb_options.garbage_collection_cutoff = 0.25;
1716 bdb_options.disable_background_tasks = true;
1717
1718 Options options;
1719 options.env = mock_env_.get();
1720 options.statistics = CreateDBStatistics();
1721
1722 Open(bdb_options, options);
1723
1724 std::map<std::string, std::string> data;
1725 std::map<std::string, KeyVersion> blob_value_versions;
1726 std::map<std::string, BlobIndexVersion> blob_index_versions;
1727
1728 Random rnd(301);
1729
1730 // Add a bunch of large non-TTL values. These will be written to non-TTL
1731 // blob files and will be subject to GC.
1732 for (size_t i = 0; i < kNumPuts; ++i) {
1733 std::ostringstream oss;
1734 oss << "key" << std::setw(4) << std::setfill('0') << i;
1735
1736 const std::string key(oss.str());
20effc67 1737 const std::string value = rnd.HumanReadableString(kLargeValueSize);
f67539c2
TL
1738 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1739
1740 ASSERT_OK(Put(key, value));
1741 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1742
1743 data[key] = value;
1744 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1745 blob_index_versions[key] =
1746 BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
1747 sequence, kTypeBlobIndex);
1748 }
1749
1750 // Add some small and/or TTL values that will be ignored during GC.
1751 // First, add a large TTL value will be written to its own TTL blob file.
1752 {
1753 const std::string key("key2000");
20effc67 1754 const std::string value = rnd.HumanReadableString(kLargeValueSize);
f67539c2
TL
1755 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1756
1757 ASSERT_OK(PutUntil(key, value, kExpiration));
1758 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1759
1760 data[key] = value;
1761 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1762 blob_index_versions[key] =
1763 BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
1764 sequence, kTypeBlobIndex);
1765 }
1766
1767 // Now add a small TTL value (which will be inlined).
1768 {
1769 const std::string key("key3000");
20effc67 1770 const std::string value = rnd.HumanReadableString(kSmallValueSize);
f67539c2
TL
1771 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1772
1773 ASSERT_OK(PutUntil(key, value, kExpiration));
1774 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1775
1776 data[key] = value;
1777 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1778 blob_index_versions[key] = BlobIndexVersion(
1779 key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
1780 }
1781
1782 // Finally, add a small non-TTL value (which will be stored as a regular
1783 // value).
1784 {
1785 const std::string key("key4000");
20effc67 1786 const std::string value = rnd.HumanReadableString(kSmallValueSize);
f67539c2
TL
1787 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1788
1789 ASSERT_OK(Put(key, value));
1790 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1791
1792 data[key] = value;
1793 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
1794 blob_index_versions[key] = BlobIndexVersion(
1795 key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
1796 }
1797
1798 VerifyDB(data);
1799 VerifyBaseDB(blob_value_versions);
1800 VerifyBaseDBBlobIndex(blob_index_versions);
1801
1802 // At this point, we should have 128 immutable non-TTL files with file numbers
1803 // 1..128.
1804 {
1805 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1806 ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
1807 for (size_t i = 0; i < kNumBlobFiles; ++i) {
1808 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1809 ASSERT_EQ(live_imm_files[i]->GetFileSize(),
1810 kBlobFileSize + BlobLogFooter::kSize);
1811 }
1812 }
1813
1814 mock_env_->set_current_time(kCompactTime);
1815
1816 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1817
1818 // We expect the data to remain the same and the blobs from the oldest N files
1819 // to be moved to new files. Sequence numbers get zeroed out during the
1820 // compaction.
1821 VerifyDB(data);
1822
1823 for (auto &pair : blob_value_versions) {
1824 KeyVersion &version = pair.second;
1825 version.sequence = 0;
1826 }
1827
1828 VerifyBaseDB(blob_value_versions);
1829
1830 const uint64_t cutoff = static_cast<uint64_t>(
1831 bdb_options.garbage_collection_cutoff * kNumBlobFiles);
1832 for (auto &pair : blob_index_versions) {
1833 BlobIndexVersion &version = pair.second;
1834
1835 version.sequence = 0;
1836
1837 if (version.file_number == kInvalidBlobFileNumber) {
1838 continue;
1839 }
1840
1841 if (version.file_number > cutoff) {
1842 continue;
1843 }
1844
1845 version.file_number += kNumBlobFiles + 1;
1846 }
1847
1848 VerifyBaseDBBlobIndex(blob_index_versions);
1849
1850 const Statistics *const statistics = options.statistics.get();
1851 assert(statistics);
1852
1853 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
1854 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
1855 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
1856 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
1857 cutoff * kBlobsPerFile);
1858 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
1859 cutoff * kBlobsPerFile * kLargeValueSize);
1860
1861 // At this point, we should have 128 immutable non-TTL files with file numbers
1862 // 33..128 and 130..161. (129 was taken by the TTL blob file.)
1863 {
1864 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1865 ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
1866 for (size_t i = 0; i < kNumBlobFiles; ++i) {
1867 uint64_t expected_file_number = i + cutoff + 1;
1868 if (expected_file_number > kNumBlobFiles) {
1869 ++expected_file_number;
1870 }
1871
1872 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
1873 ASSERT_EQ(live_imm_files[i]->GetFileSize(),
1874 kBlobFileSize + BlobLogFooter::kSize);
1875 }
1876 }
1877}
1878
1879TEST_F(BlobDBTest, GarbageCollectionFailure) {
1880 BlobDBOptions bdb_options;
1881 bdb_options.min_blob_size = 0;
1882 bdb_options.enable_garbage_collection = true;
1883 bdb_options.garbage_collection_cutoff = 1.0;
1884 bdb_options.disable_background_tasks = true;
1885
1886 Options db_options;
1887 db_options.statistics = CreateDBStatistics();
1888
1889 Open(bdb_options, db_options);
1890
1891 // Write a couple of valid blobs.
1892 Put("foo", "bar");
1893 Put("dead", "beef");
1894
1895 // Write a fake blob reference into the base DB that cannot be parsed.
1896 WriteBatch batch;
1897 ASSERT_OK(WriteBatchInternal::PutBlobIndex(
1898 &batch, blob_db_->DefaultColumnFamily()->GetID(), "key",
1899 "not a valid blob index"));
1900 ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
1901
1902 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1903 ASSERT_EQ(blob_files.size(), 1);
1904 auto blob_file = blob_files[0];
1905 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
1906
1907 ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
1908 .IsCorruption());
1909
1910 const Statistics *const statistics = db_options.statistics.get();
1911 assert(statistics);
1912
1913 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
1914 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
1915 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
1916 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
1917 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
1918}
1919
11fdf7f2
TL
1920// File should be evicted after expiration.
1921TEST_F(BlobDBTest, EvictExpiredFile) {
1922 BlobDBOptions bdb_options;
1923 bdb_options.ttl_range_secs = 100;
1924 bdb_options.min_blob_size = 0;
1925 bdb_options.disable_background_tasks = true;
1926 Options options;
1927 options.env = mock_env_.get();
1928 Open(bdb_options, options);
1929 mock_env_->set_current_time(50);
1930 std::map<std::string, std::string> data;
1931 ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
1932 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1933 ASSERT_EQ(1, blob_files.size());
1934 auto blob_file = blob_files[0];
1935 ASSERT_FALSE(blob_file->Immutable());
1936 ASSERT_FALSE(blob_file->Obsolete());
1937 VerifyDB(data);
1938 mock_env_->set_current_time(250);
1939 // The key should expired now.
1940 blob_db_impl()->TEST_EvictExpiredFiles();
1941 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1942 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1943 ASSERT_TRUE(blob_file->Immutable());
1944 ASSERT_TRUE(blob_file->Obsolete());
1945 blob_db_impl()->TEST_DeleteObsoleteFiles();
1946 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1947 ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
1948 // Make sure we don't return garbage value after blob file being evicted,
1949 // but the blob index still exists in the LSM tree.
1950 std::string val = "";
1951 ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
1952 ASSERT_EQ("", val);
1953}
1954
1955TEST_F(BlobDBTest, DisableFileDeletions) {
1956 BlobDBOptions bdb_options;
1957 bdb_options.disable_background_tasks = true;
1958 Open(bdb_options);
1959 std::map<std::string, std::string> data;
1960 for (bool force : {true, false}) {
1961 ASSERT_OK(Put("foo", "v", &data));
1962 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1963 ASSERT_EQ(1, blob_files.size());
1964 auto blob_file = blob_files[0];
1965 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
1966 blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
1967 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1968 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1969 // Call DisableFileDeletions twice.
1970 ASSERT_OK(blob_db_->DisableFileDeletions());
1971 ASSERT_OK(blob_db_->DisableFileDeletions());
1972 // File deletions should be disabled.
1973 blob_db_impl()->TEST_DeleteObsoleteFiles();
1974 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1975 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1976 VerifyDB(data);
1977 // Enable file deletions once. If force=true, file deletion is enabled.
1978 // Otherwise it needs to enable it for a second time.
1979 ASSERT_OK(blob_db_->EnableFileDeletions(force));
1980 blob_db_impl()->TEST_DeleteObsoleteFiles();
1981 if (!force) {
1982 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1983 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1984 VerifyDB(data);
1985 // Call EnableFileDeletions a second time.
1986 ASSERT_OK(blob_db_->EnableFileDeletions(false));
1987 blob_db_impl()->TEST_DeleteObsoleteFiles();
1988 }
1989 // Regardless of value of `force`, file should be deleted by now.
1990 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1991 ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
1992 VerifyDB({});
1993 }
1994}
1995
f67539c2
TL
1996TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
1997 BlobDBOptions bdb_options;
1998 bdb_options.enable_garbage_collection = true;
1999 bdb_options.disable_background_tasks = true;
2000 Open(bdb_options);
2001
2002 // Register some dummy blob files.
2003 blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
2004 blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
2005 blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
2006 blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
2007 blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
2008
2009 // Initialize the blob <-> SST file mapping. First, add some SST files with
2010 // blob file references, then some without.
2011 std::vector<LiveFileMetaData> live_files;
2012
2013 for (uint64_t i = 1; i <= 10; ++i) {
2014 LiveFileMetaData live_file;
2015 live_file.file_number = i;
2016 live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
2017
2018 live_files.emplace_back(live_file);
2019 }
2020
2021 for (uint64_t i = 11; i <= 20; ++i) {
2022 LiveFileMetaData live_file;
2023 live_file.file_number = i;
2024
2025 live_files.emplace_back(live_file);
2026 }
2027
2028 blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
2029
2030 // Check that the blob <-> SST mappings have been correctly initialized.
2031 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
2032
2033 ASSERT_EQ(blob_files.size(), 5);
2034
2035 {
2036 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2037 ASSERT_EQ(live_imm_files.size(), 5);
2038 for (size_t i = 0; i < 5; ++i) {
2039 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
2040 }
2041
2042 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2043 }
2044
2045 {
2046 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2047 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
2048 const std::vector<bool> expected_obsolete{false, false, false, false,
2049 false};
2050 for (size_t i = 0; i < 5; ++i) {
2051 const auto &blob_file = blob_files[i];
2052 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2053 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2054 }
2055
2056 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2057 ASSERT_EQ(live_imm_files.size(), 5);
2058 for (size_t i = 0; i < 5; ++i) {
2059 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
2060 }
2061
2062 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2063 }
2064
2065 // Simulate a flush where the SST does not reference any blob files.
2066 {
2067 FlushJobInfo info{};
2068 info.file_number = 21;
2069 info.smallest_seqno = 1;
2070 info.largest_seqno = 100;
2071
2072 blob_db_impl()->TEST_ProcessFlushJobInfo(info);
2073
2074 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2075 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
2076 const std::vector<bool> expected_obsolete{false, false, false, false,
2077 false};
2078 for (size_t i = 0; i < 5; ++i) {
2079 const auto &blob_file = blob_files[i];
2080 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2081 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2082 }
2083
2084 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2085 ASSERT_EQ(live_imm_files.size(), 5);
2086 for (size_t i = 0; i < 5; ++i) {
2087 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
2088 }
2089
2090 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2091 }
2092
2093 // Simulate a flush where the SST references a blob file.
2094 {
2095 FlushJobInfo info{};
2096 info.file_number = 22;
2097 info.oldest_blob_file_number = 5;
2098 info.smallest_seqno = 101;
2099 info.largest_seqno = 200;
2100
2101 blob_db_impl()->TEST_ProcessFlushJobInfo(info);
2102
2103 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2104 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
2105 const std::vector<bool> expected_obsolete{false, false, false, false,
2106 false};
2107 for (size_t i = 0; i < 5; ++i) {
2108 const auto &blob_file = blob_files[i];
2109 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2110 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2111 }
2112
2113 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2114 ASSERT_EQ(live_imm_files.size(), 5);
2115 for (size_t i = 0; i < 5; ++i) {
2116 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
2117 }
2118
2119 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2120 }
2121
2122 // Simulate a compaction. Some inputs and outputs have blob file references,
2123 // some don't. There is also a trivial move (which means the SST appears on
2124 // both the input and the output list). Blob file 1 loses all its linked SSTs,
2125 // and since it got marked immutable at sequence number 200 which has already
2126 // been flushed, it can be marked obsolete.
2127 {
2128 CompactionJobInfo info{};
2129 info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
2130 info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
2131 info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
2132 info.input_file_infos.emplace_back(
2133 CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
2134 info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
2135 info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
2136 info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
2137 info.output_file_infos.emplace_back(
2138 CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
2139
2140 blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
2141
2142 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2143 {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
2144 const std::vector<bool> expected_obsolete{true, false, false, false, false};
2145 for (size_t i = 0; i < 5; ++i) {
2146 const auto &blob_file = blob_files[i];
2147 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2148 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2149 }
2150
2151 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2152 ASSERT_EQ(live_imm_files.size(), 4);
2153 for (size_t i = 0; i < 4; ++i) {
2154 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
2155 }
2156
2157 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
2158 ASSERT_EQ(obsolete_files.size(), 1);
2159 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
2160 }
2161
2162 // Simulate a failed compaction. No mappings should be updated.
2163 {
2164 CompactionJobInfo info{};
2165 info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
2166 info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
2167 info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
2168 info.status = Status::Corruption();
2169
2170 blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
2171
2172 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2173 {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
2174 const std::vector<bool> expected_obsolete{true, false, false, false, false};
2175 for (size_t i = 0; i < 5; ++i) {
2176 const auto &blob_file = blob_files[i];
2177 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2178 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2179 }
2180
2181 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2182 ASSERT_EQ(live_imm_files.size(), 4);
2183 for (size_t i = 0; i < 4; ++i) {
2184 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
2185 }
2186
2187 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
2188 ASSERT_EQ(obsolete_files.size(), 1);
2189 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
2190 }
2191
2192 // Simulate another compaction. Blob file 2 loses all its linked SSTs
2193 // but since it got marked immutable at sequence number 300 which hasn't
2194 // been flushed yet, it cannot be marked obsolete at this point.
2195 {
2196 CompactionJobInfo info{};
2197 info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
2198 info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
2199 info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
2200
2201 blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
2202
2203 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2204 {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
2205 const std::vector<bool> expected_obsolete{true, false, false, false, false};
2206 for (size_t i = 0; i < 5; ++i) {
2207 const auto &blob_file = blob_files[i];
2208 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2209 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2210 }
2211
2212 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2213 ASSERT_EQ(live_imm_files.size(), 4);
2214 for (size_t i = 0; i < 4; ++i) {
2215 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
2216 }
2217
2218 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
2219 ASSERT_EQ(obsolete_files.size(), 1);
2220 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
2221 }
2222
2223 // Simulate a flush with largest sequence number 300. This will make it
2224 // possible to mark blob file 2 obsolete.
2225 {
2226 FlushJobInfo info{};
2227 info.file_number = 26;
2228 info.smallest_seqno = 201;
2229 info.largest_seqno = 300;
2230
2231 blob_db_impl()->TEST_ProcessFlushJobInfo(info);
2232
2233 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
2234 {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
2235 const std::vector<bool> expected_obsolete{true, true, false, false, false};
2236 for (size_t i = 0; i < 5; ++i) {
2237 const auto &blob_file = blob_files[i];
2238 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
2239 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
2240 }
2241
2242 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2243 ASSERT_EQ(live_imm_files.size(), 3);
2244 for (size_t i = 0; i < 3; ++i) {
2245 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
2246 }
2247
2248 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
2249 ASSERT_EQ(obsolete_files.size(), 2);
2250 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
2251 ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
2252 }
2253}
2254
494da23a
TL
2255TEST_F(BlobDBTest, ShutdownWait) {
2256 BlobDBOptions bdb_options;
2257 bdb_options.ttl_range_secs = 100;
2258 bdb_options.min_blob_size = 0;
2259 bdb_options.disable_background_tasks = false;
2260 Options options;
2261 options.env = mock_env_.get();
2262
2263 SyncPoint::GetInstance()->LoadDependency({
2264 {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
2265 {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
2266 {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
2267 {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
2268 });
2269 // Force all tasks to be scheduled immediately.
20effc67 2270 SyncPoint::GetInstance()->SetCallBack(
494da23a
TL
2271 "TimeQueue::Add:item.end", [&](void *arg) {
2272 std::chrono::steady_clock::time_point *tp =
2273 static_cast<std::chrono::steady_clock::time_point *>(arg);
2274 *tp =
2275 std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
2276 });
2277
20effc67 2278 SyncPoint::GetInstance()->SetCallBack(
494da23a
TL
2279 "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
2280 // Sleep 3 ms to increase the chance of data race.
2281 // We've synced up the code so that EvictExpiredFiles()
2282 // is called concurrently with ~BlobDBImpl().
2283 // ~BlobDBImpl() is supposed to wait for all background
2284 // task to shutdown before doing anything else. In order
2285 // to use the same test to reproduce a bug of the waiting
2286 // logic, we wait a little bit here, so that TSAN can
2287 // catch the data race.
2288 // We should improve the test if we find a better way.
2289 Env::Default()->SleepForMicroseconds(3000);
2290 });
2291
2292 SyncPoint::GetInstance()->EnableProcessing();
2293
2294 Open(bdb_options, options);
2295 mock_env_->set_current_time(50);
2296 std::map<std::string, std::string> data;
2297 ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
2298 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
2299 ASSERT_EQ(1, blob_files.size());
2300 auto blob_file = blob_files[0];
2301 ASSERT_FALSE(blob_file->Immutable());
2302 ASSERT_FALSE(blob_file->Obsolete());
2303 VerifyDB(data);
2304
2305 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
2306 mock_env_->set_current_time(250);
2307 // The key should expired now.
2308 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
2309
2310 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
2311 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
2312 Close();
2313
2314 SyncPoint::GetInstance()->DisableProcessing();
2315}
2316
20effc67
TL
2317TEST_F(BlobDBTest, SyncBlobFileBeforeClose) {
2318 Options options;
2319 options.statistics = CreateDBStatistics();
2320
2321 BlobDBOptions blob_options;
2322 blob_options.min_blob_size = 0;
2323 blob_options.bytes_per_sync = 1 << 20;
2324 blob_options.disable_background_tasks = true;
2325
2326 Open(blob_options, options);
2327
2328 Put("foo", "bar");
2329
2330 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
2331 ASSERT_EQ(blob_files.size(), 1);
2332
2333 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
2334 ASSERT_EQ(options.statistics->getTickerCount(BLOB_DB_BLOB_FILE_SYNCED), 1);
2335}
2336
2337TEST_F(BlobDBTest, SyncBlobFileBeforeCloseIOError) {
2338 Options options;
2339 options.env = fault_injection_env_.get();
2340
2341 BlobDBOptions blob_options;
2342 blob_options.min_blob_size = 0;
2343 blob_options.bytes_per_sync = 1 << 20;
2344 blob_options.disable_background_tasks = true;
2345
2346 Open(blob_options, options);
2347
2348 Put("foo", "bar");
2349
2350 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
2351 ASSERT_EQ(blob_files.size(), 1);
2352
2353 SyncPoint::GetInstance()->SetCallBack(
2354 "BlobLogWriter::Sync", [this](void * /* arg */) {
2355 fault_injection_env_->SetFilesystemActive(false, Status::IOError());
2356 });
2357 SyncPoint::GetInstance()->EnableProcessing();
2358
2359 const Status s = blob_db_impl()->TEST_CloseBlobFile(blob_files[0]);
2360
2361 fault_injection_env_->SetFilesystemActive(true);
2362 SyncPoint::GetInstance()->DisableProcessing();
2363 SyncPoint::GetInstance()->ClearAllCallBacks();
2364
2365 ASSERT_TRUE(s.IsIOError());
2366}
2367
11fdf7f2 2368} // namespace blob_db
f67539c2 2369} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
2370
2371// A black-box test for the ttl wrapper around rocksdb
2372int main(int argc, char** argv) {
2373 ::testing::InitGoogleTest(&argc, argv);
2374 return RUN_ALL_TESTS();
2375}
2376
2377#else
2378#include <stdio.h>
2379
11fdf7f2 2380int main(int /*argc*/, char** /*argv*/) {
7c673cae
FG
2381 fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
2382 return 0;
2383}
2384
2385#endif // !ROCKSDB_LITE