1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
13 #include "db/db_impl/db_impl.h"
14 #include "db/db_test_util.h"
15 #include "env/mock_env.h"
16 #include "file/filename.h"
17 #include "port/port.h"
18 #include "port/stack_trace.h"
19 #include "rocksdb/utilities/transaction_db.h"
20 #include "test_util/sync_point.h"
21 #include "test_util/testutil.h"
22 #include "util/cast_util.h"
23 #include "util/mutexlock.h"
24 #include "utilities/fault_injection_env.h"
25 #include "utilities/fault_injection_fs.h"
27 namespace ROCKSDB_NAMESPACE
{
29 // This is a static filter used for filtering
30 // kvs during the compaction process.
31 static std::string NEW_VALUE
= "NewValue";
33 class DBFlushTest
: public DBTestBase
{
35 DBFlushTest() : DBTestBase("db_flush_test", /*env_do_fsync=*/true) {}
38 class DBFlushDirectIOTest
: public DBFlushTest
,
39 public ::testing::WithParamInterface
<bool> {
41 DBFlushDirectIOTest() : DBFlushTest() {}
44 class DBAtomicFlushTest
: public DBFlushTest
,
45 public ::testing::WithParamInterface
<bool> {
47 DBAtomicFlushTest() : DBFlushTest() {}
50 // We had issue when two background threads trying to flush at the same time,
51 // only one of them get committed. The test verifies the issue is fixed.
52 TEST_F(DBFlushTest
, FlushWhileWritingManifest
) {
54 options
.disable_auto_compactions
= true;
55 options
.max_background_flushes
= 2;
60 no_wait
.allow_write_stall
= true;
62 SyncPoint::GetInstance()->LoadDependency(
63 {{"VersionSet::LogAndApply:WriteManifest",
64 "DBFlushTest::FlushWhileWritingManifest:1"},
65 {"MemTableList::TryInstallMemtableFlushResults:InProgress",
66 "VersionSet::LogAndApply:WriteManifestDone"}});
67 SyncPoint::GetInstance()->EnableProcessing();
69 ASSERT_OK(Put("foo", "v"));
70 ASSERT_OK(dbfull()->Flush(no_wait
));
71 TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
72 ASSERT_OK(Put("bar", "v"));
73 ASSERT_OK(dbfull()->Flush(no_wait
));
74 // If the issue is hit we will wait here forever.
75 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
77 ASSERT_EQ(2, TotalTableFiles());
78 #endif // ROCKSDB_LITE
81 // Disable this test temporarily on Travis as it fails intermittently.
82 // Github issue: #4151
83 TEST_F(DBFlushTest
, SyncFail
) {
84 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
85 new FaultInjectionTestEnv(env_
));
87 options
.disable_auto_compactions
= true;
88 options
.env
= fault_injection_env
.get();
90 SyncPoint::GetInstance()->LoadDependency(
91 {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
92 {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
93 SyncPoint::GetInstance()->EnableProcessing();
95 CreateAndReopenWithCF({"pikachu"}, options
);
96 ASSERT_OK(Put("key", "value"));
97 FlushOptions flush_options
;
98 flush_options
.wait
= false;
99 ASSERT_OK(dbfull()->Flush(flush_options
));
100 // Flush installs a new super-version. Get the ref count after that.
101 fault_injection_env
->SetFilesystemActive(false);
102 TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
103 TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
104 fault_injection_env
->SetFilesystemActive(true);
105 // Now the background job will do the flush; wait for it.
106 // Returns the IO error happend during flush.
107 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
109 ASSERT_EQ("", FilesPerLevel()); // flush failed.
110 #endif // ROCKSDB_LITE
114 TEST_F(DBFlushTest
, SyncSkip
) {
115 Options options
= CurrentOptions();
117 SyncPoint::GetInstance()->LoadDependency(
118 {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},
119 {"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});
120 SyncPoint::GetInstance()->EnableProcessing();
123 ASSERT_OK(Put("key", "value"));
125 FlushOptions flush_options
;
126 flush_options
.wait
= false;
127 ASSERT_OK(dbfull()->Flush(flush_options
));
129 TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");
130 TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
132 // Now the background job will do the flush; wait for it.
133 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
138 TEST_F(DBFlushTest
, FlushInLowPriThreadPool
) {
139 // Verify setting an empty high-pri (flush) thread pool causes flushes to be
140 // scheduled in the low-pri (compaction) thread pool.
141 Options options
= CurrentOptions();
142 options
.level0_file_num_compaction_trigger
= 4;
143 options
.memtable_factory
.reset(test::NewSpecialSkipListFactory(1));
145 env_
->SetBackgroundThreads(0, Env::HIGH
);
148 int num_flushes
= 0, num_compactions
= 0;
149 SyncPoint::GetInstance()->SetCallBack(
150 "DBImpl::BGWorkFlush", [&](void* /*arg*/) {
151 if (tid
== std::thread::id()) {
152 tid
= std::this_thread::get_id();
154 ASSERT_EQ(tid
, std::this_thread::get_id());
158 SyncPoint::GetInstance()->SetCallBack(
159 "DBImpl::BGWorkCompaction", [&](void* /*arg*/) {
160 ASSERT_EQ(tid
, std::this_thread::get_id());
163 SyncPoint::GetInstance()->EnableProcessing();
165 ASSERT_OK(Put("key", "val"));
166 for (int i
= 0; i
< 4; ++i
) {
167 ASSERT_OK(Put("key", "val"));
168 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
170 ASSERT_OK(dbfull()->TEST_WaitForCompact());
171 ASSERT_EQ(4, num_flushes
);
172 ASSERT_EQ(1, num_compactions
);
175 // Test when flush job is submitted to low priority thread pool and when DB is
176 // closed in the meanwhile, CloseHelper doesn't hang.
177 TEST_F(DBFlushTest
, CloseDBWhenFlushInLowPri
) {
178 Options options
= CurrentOptions();
179 options
.max_background_flushes
= 1;
180 options
.max_total_wal_size
= 8192;
182 DestroyAndReopen(options
);
183 CreateColumnFamilies({"cf1", "cf2"}, options
);
185 env_
->SetBackgroundThreads(0, Env::HIGH
);
186 env_
->SetBackgroundThreads(1, Env::LOW
);
187 test::SleepingBackgroundTask sleeping_task_low
;
190 SyncPoint::GetInstance()->SetCallBack("DBImpl::BGWorkFlush",
191 [&](void* /*arg*/) { ++num_flushes
; });
193 int num_low_flush_unscheduled
= 0;
194 SyncPoint::GetInstance()->SetCallBack(
195 "DBImpl::UnscheduleLowFlushCallback", [&](void* /*arg*/) {
196 num_low_flush_unscheduled
++;
197 // There should be one flush job in low pool that needs to be
199 ASSERT_EQ(num_low_flush_unscheduled
, 1);
202 int num_high_flush_unscheduled
= 0;
203 SyncPoint::GetInstance()->SetCallBack(
204 "DBImpl::UnscheduleHighFlushCallback", [&](void* /*arg*/) {
205 num_high_flush_unscheduled
++;
206 // There should be no flush job in high pool
207 ASSERT_EQ(num_high_flush_unscheduled
, 0);
210 SyncPoint::GetInstance()->EnableProcessing();
212 ASSERT_OK(Put(0, "key1", DummyString(8192)));
213 // Block thread so that flush cannot be run and can be removed from the queue
214 // when called Unschedule.
215 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
217 sleeping_task_low
.WaitUntilSleeping();
219 // Trigger flush and flush job will be scheduled to LOW priority thread.
220 ASSERT_OK(Put(0, "key2", DummyString(8192)));
222 // Close DB and flush job in low priority queue will be removed without
225 sleeping_task_low
.WakeUp();
226 sleeping_task_low
.WaitUntilDone();
227 ASSERT_EQ(0, num_flushes
);
229 TryReopenWithColumnFamilies({"default", "cf1", "cf2"}, options
);
230 ASSERT_OK(Put(0, "key3", DummyString(8192)));
232 ASSERT_EQ(1, num_flushes
);
235 TEST_F(DBFlushTest
, ManualFlushWithMinWriteBufferNumberToMerge
) {
236 Options options
= CurrentOptions();
237 options
.write_buffer_size
= 100;
238 options
.max_write_buffer_number
= 4;
239 options
.min_write_buffer_number_to_merge
= 3;
242 SyncPoint::GetInstance()->LoadDependency(
243 {{"DBImpl::BGWorkFlush",
244 "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
245 {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
246 "FlushJob::WriteLevel0Table"}});
247 SyncPoint::GetInstance()->EnableProcessing();
249 ASSERT_OK(Put("key1", "value1"));
251 port::Thread
t([&]() {
252 // The call wait for flush to finish, i.e. with flush_options.wait = true.
256 // Wait for flush start.
257 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
258 // Insert a second memtable before the manual flush finish.
259 // At the end of the manual flush job, it will check if further flush
260 // is needed, but it will not trigger flush of the second memtable because
261 // min_write_buffer_number_to_merge is not reached.
262 ASSERT_OK(Put("key2", "value2"));
263 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
264 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
266 // Manual flush should return, without waiting for flush indefinitely.
270 TEST_F(DBFlushTest
, ScheduleOnlyOneBgThread
) {
271 Options options
= CurrentOptions();
273 SyncPoint::GetInstance()->DisableProcessing();
274 SyncPoint::GetInstance()->ClearAllCallBacks();
276 SyncPoint::GetInstance()->SetCallBack(
277 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg
) {
278 ASSERT_NE(nullptr, arg
);
279 auto unscheduled_flushes
= *reinterpret_cast<int*>(arg
);
280 ASSERT_EQ(0, unscheduled_flushes
);
283 SyncPoint::GetInstance()->EnableProcessing();
285 ASSERT_OK(Put("a", "foo"));
286 FlushOptions flush_opts
;
287 ASSERT_OK(dbfull()->Flush(flush_opts
));
288 ASSERT_EQ(1, called
);
290 SyncPoint::GetInstance()->DisableProcessing();
291 SyncPoint::GetInstance()->ClearAllCallBacks();
294 // The following 3 tests are designed for testing garbage statistics at flush
297 // ======= General Information ======= (from GitHub Wiki).
298 // There are three scenarios where memtable flush can be triggered:
300 // 1 - Memtable size exceeds ColumnFamilyOptions::write_buffer_size
302 // 2 - Total memtable size across all column families exceeds
303 // DBOptions::db_write_buffer_size,
304 // or DBOptions::write_buffer_manager signals a flush. In this scenario
305 // the largest memtable will be flushed.
306 // 3 - Total WAL file size exceeds DBOptions::max_total_wal_size.
307 // In this scenario the memtable with the oldest data will be flushed,
308 // in order to allow the WAL file with data from this memtable to be
311 // As a result, a memtable can be flushed before it is full. This is one
312 // reason the generated SST file can be smaller than the corresponding
313 // memtable. Compression is another factor to make SST file smaller than
314 // corresponding memtable, since data in memtable is uncompressed.
316 TEST_F(DBFlushTest
, StatisticsGarbageBasic
) {
317 Options options
= CurrentOptions();
319 // The following options are used to enforce several values that
320 // may already exist as default values to make this test resilient
321 // to default value updates in the future.
322 options
.statistics
= CreateDBStatistics();
324 // Record all statistics.
325 options
.statistics
->set_stats_level(StatsLevel::kAll
);
327 // create the DB if it's not already present
328 options
.create_if_missing
= true;
330 // Useful for now as we are trying to compare uncompressed data savings on
332 options
.compression
= kNoCompression
;
334 // Prevent memtable in place updates. Should already be disabled
336 // In place updates can be enabled by toggling on the bool
337 // inplace_update_support flag. However, this flag is by default set to
339 // because this thread-safe in-place update support is not compatible
340 // with concurrent memtable writes. Note that the bool
341 // allow_concurrent_memtable_write is set to true by default )
342 options
.inplace_update_support
= false;
343 options
.allow_concurrent_memtable_write
= true;
345 // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
346 options
.write_buffer_size
= 64 << 20;
348 ASSERT_OK(TryReopen(options
));
350 // Put multiple times the same key-values.
351 // The encoded length of a db entry in the memtable is
352 // defined in db/memtable.cc (MemTable::Add) as the variable:
353 // encoded_len= VarintLength(internal_key_size) --> =
354 // log_256(internal_key).
358 // internal_key_size.
359 // + internal_key_size --> = actual key string,
360 // (size key_size: w/o term null char)
366 // + VarintLength(val_size) --> = min # of bytes to
368 // + val_size --> = actual value
370 // For example, in our situation, "key1" : size 4, "value1" : size 6
371 // (the terminating null characters are not copied over to the memtable).
372 // And therefore encoded_len = 1 + (4+8) + 1 + 6 = 20 bytes per entry.
373 // However in terms of raw data contained in the memtable, and written
374 // over to the SSTable, we only count internal_key_size and val_size,
375 // because this is the only raw chunk of bytes that contains everything
376 // necessary to reconstruct a user entry: sequence number, insertion type,
379 // To test the relevance of our Memtable garbage statistics,
380 // namely MEMTABLE_PAYLOAD_BYTES_AT_FLUSH and MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
381 // we insert K-V pairs with 3 distinct keys (of length 4),
382 // and random values of arbitrary length RAND_VALUES_LENGTH,
383 // and we repeat this step NUM_REPEAT times total.
384 // At the end, we insert 3 final K-V pairs with the same 3 keys
385 // and known values (these will be the final values, of length 6).
386 // I chose NUM_REPEAT=2,000 such that no automatic flush is
387 // triggered (the number of bytes in the memtable is therefore
388 // well below any meaningful heuristic for a memtable of size 64MB).
389 // As a result, since each K-V pair is inserted as a payload
390 // of N meaningful bytes (sequence number, insertion type,
391 // key, and value = 8 + 4 + RAND_VALUE_LENGTH),
392 // MEMTABLE_GARBAGE_BYTES_AT_FLUSH should be equal to 2,000 * N bytes
393 // and MEMTABLE_PAYLAOD_BYTES_AT_FLUSH = MEMTABLE_GARBAGE_BYTES_AT_FLUSH +
394 // (3*(8 + 4 + 6)) bytes. For RAND_VALUE_LENGTH = 172 (arbitrary value), we
396 // N = 8 + 4 + 172 = 184 bytes
397 // MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 2,000 * 184 = 368,000 bytes.
398 // MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 368,000 + 3*18 = 368,054 bytes.
400 const size_t NUM_REPEAT
= 2000;
401 const size_t RAND_VALUES_LENGTH
= 172;
402 const std::string KEY1
= "key1";
403 const std::string KEY2
= "key2";
404 const std::string KEY3
= "key3";
405 const std::string VALUE1
= "value1";
406 const std::string VALUE2
= "value2";
407 const std::string VALUE3
= "value3";
408 uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
= 0;
409 uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
= 0;
412 // Insertion of of K-V pairs, multiple times.
413 for (size_t i
= 0; i
< NUM_REPEAT
; i
++) {
414 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
415 std::string p_v1
= rnd
.RandomString(RAND_VALUES_LENGTH
);
416 std::string p_v2
= rnd
.RandomString(RAND_VALUES_LENGTH
);
417 std::string p_v3
= rnd
.RandomString(RAND_VALUES_LENGTH
);
418 ASSERT_OK(Put(KEY1
, p_v1
));
419 ASSERT_OK(Put(KEY2
, p_v2
));
420 ASSERT_OK(Put(KEY3
, p_v3
));
421 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
422 KEY1
.size() + p_v1
.size() + sizeof(uint64_t);
423 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
424 KEY2
.size() + p_v2
.size() + sizeof(uint64_t);
425 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
426 KEY3
.size() + p_v3
.size() + sizeof(uint64_t);
429 // The memtable data bytes includes the "garbage"
430 // bytes along with the useful payload.
431 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
=
432 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
;
434 ASSERT_OK(Put(KEY1
, VALUE1
));
435 ASSERT_OK(Put(KEY2
, VALUE2
));
436 ASSERT_OK(Put(KEY3
, VALUE3
));
438 // Add useful payload to the memtable data bytes:
439 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
+=
440 KEY1
.size() + VALUE1
.size() + KEY2
.size() + VALUE2
.size() + KEY3
.size() +
441 VALUE3
.size() + 3 * sizeof(uint64_t);
443 // We assert that the last K-V pairs have been successfully inserted,
444 // and that the valid values are VALUE1, VALUE2, VALUE3.
446 ASSERT_OK(Get(KEY1
, &value
));
447 ASSERT_EQ(value
.ToString(), VALUE1
);
448 ASSERT_OK(Get(KEY2
, &value
));
449 ASSERT_EQ(value
.ToString(), VALUE2
);
450 ASSERT_OK(Get(KEY3
, &value
));
451 ASSERT_EQ(value
.ToString(), VALUE3
);
453 // Force flush to SST. Increments the statistics counter.
456 // Collect statistics.
457 uint64_t mem_data_bytes
=
458 TestGetTickerCount(options
, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
);
459 uint64_t mem_garbage_bytes
=
460 TestGetTickerCount(options
, MEMTABLE_GARBAGE_BYTES_AT_FLUSH
);
462 EXPECT_EQ(mem_data_bytes
, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
);
463 EXPECT_EQ(mem_garbage_bytes
, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
);
468 TEST_F(DBFlushTest
, StatisticsGarbageInsertAndDeletes
) {
469 Options options
= CurrentOptions();
470 options
.statistics
= CreateDBStatistics();
471 options
.statistics
->set_stats_level(StatsLevel::kAll
);
472 options
.create_if_missing
= true;
473 options
.compression
= kNoCompression
;
474 options
.inplace_update_support
= false;
475 options
.allow_concurrent_memtable_write
= true;
476 options
.write_buffer_size
= 67108864;
478 ASSERT_OK(TryReopen(options
));
480 const size_t NUM_REPEAT
= 2000;
481 const size_t RAND_VALUES_LENGTH
= 37;
482 const std::string KEY1
= "key1";
483 const std::string KEY2
= "key2";
484 const std::string KEY3
= "key3";
485 const std::string KEY4
= "key4";
486 const std::string KEY5
= "key5";
487 const std::string KEY6
= "key6";
489 uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
= 0;
490 uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
= 0;
495 // Insertion of of K-V pairs, multiple times.
496 for (size_t i
= 0; i
< NUM_REPEAT
; i
++) {
497 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
498 std::string p_v1
= rnd
.RandomString(RAND_VALUES_LENGTH
);
499 std::string p_v2
= rnd
.RandomString(RAND_VALUES_LENGTH
);
500 std::string p_v3
= rnd
.RandomString(RAND_VALUES_LENGTH
);
501 ASSERT_OK(Put(KEY1
, p_v1
));
502 ASSERT_OK(Put(KEY2
, p_v2
));
503 ASSERT_OK(Put(KEY3
, p_v3
));
504 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
505 KEY1
.size() + p_v1
.size() + sizeof(uint64_t);
506 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
507 KEY2
.size() + p_v2
.size() + sizeof(uint64_t);
508 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
509 KEY3
.size() + p_v3
.size() + sizeof(uint64_t);
510 ASSERT_OK(Delete(KEY1
));
511 ASSERT_OK(Delete(KEY2
));
512 ASSERT_OK(Delete(KEY3
));
513 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
514 KEY1
.size() + KEY2
.size() + KEY3
.size() + 3 * sizeof(uint64_t);
517 // The memtable data bytes includes the "garbage"
518 // bytes along with the useful payload.
519 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
=
520 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
;
522 // Note : one set of delete for KEY1, KEY2, KEY3 is written to
523 // SSTable to propagate the delete operations to K-V pairs
524 // that could have been inserted into the database during past Flush
526 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
-=
527 KEY1
.size() + KEY2
.size() + KEY3
.size() + 3 * sizeof(uint64_t);
529 // Additional useful paylaod.
530 ASSERT_OK(Delete(KEY4
));
531 ASSERT_OK(Delete(KEY5
));
532 ASSERT_OK(Delete(KEY6
));
534 // // Add useful payload to the memtable data bytes:
535 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
+=
536 KEY4
.size() + KEY5
.size() + KEY6
.size() + 3 * sizeof(uint64_t);
538 // We assert that the K-V pairs have been successfully deleted.
540 ASSERT_NOK(Get(KEY1
, &value
));
541 ASSERT_NOK(Get(KEY2
, &value
));
542 ASSERT_NOK(Get(KEY3
, &value
));
544 // Force flush to SST. Increments the statistics counter.
547 // Collect statistics.
548 uint64_t mem_data_bytes
=
549 TestGetTickerCount(options
, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
);
550 uint64_t mem_garbage_bytes
=
551 TestGetTickerCount(options
, MEMTABLE_GARBAGE_BYTES_AT_FLUSH
);
553 EXPECT_EQ(mem_data_bytes
, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
);
554 EXPECT_EQ(mem_garbage_bytes
, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
);
559 TEST_F(DBFlushTest
, StatisticsGarbageRangeDeletes
) {
560 Options options
= CurrentOptions();
561 options
.statistics
= CreateDBStatistics();
562 options
.statistics
->set_stats_level(StatsLevel::kAll
);
563 options
.create_if_missing
= true;
564 options
.compression
= kNoCompression
;
565 options
.inplace_update_support
= false;
566 options
.allow_concurrent_memtable_write
= true;
567 options
.write_buffer_size
= 67108864;
569 ASSERT_OK(TryReopen(options
));
571 const size_t NUM_REPEAT
= 1000;
572 const size_t RAND_VALUES_LENGTH
= 42;
573 const std::string KEY1
= "key1";
574 const std::string KEY2
= "key2";
575 const std::string KEY3
= "key3";
576 const std::string KEY4
= "key4";
577 const std::string KEY5
= "key5";
578 const std::string KEY6
= "key6";
579 const std::string VALUE3
= "value3";
581 uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
= 0;
582 uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
= 0;
585 // Insertion of of K-V pairs, multiple times.
586 // Also insert DeleteRange
587 for (size_t i
= 0; i
< NUM_REPEAT
; i
++) {
588 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
589 std::string p_v1
= rnd
.RandomString(RAND_VALUES_LENGTH
);
590 std::string p_v2
= rnd
.RandomString(RAND_VALUES_LENGTH
);
591 std::string p_v3
= rnd
.RandomString(RAND_VALUES_LENGTH
);
592 ASSERT_OK(Put(KEY1
, p_v1
));
593 ASSERT_OK(Put(KEY2
, p_v2
));
594 ASSERT_OK(Put(KEY3
, p_v3
));
595 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
596 KEY1
.size() + p_v1
.size() + sizeof(uint64_t);
597 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
598 KEY2
.size() + p_v2
.size() + sizeof(uint64_t);
599 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
600 KEY3
.size() + p_v3
.size() + sizeof(uint64_t);
601 ASSERT_OK(db_
->DeleteRange(WriteOptions(), db_
->DefaultColumnFamily(), KEY1
,
603 // Note: DeleteRange have an exclusive upper bound, e.g. here: [KEY2,KEY3)
605 ASSERT_OK(db_
->DeleteRange(WriteOptions(), db_
->DefaultColumnFamily(), KEY2
,
607 // Delete ranges are stored as a regular K-V pair, with key=STARTKEY,
609 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
+=
610 (KEY1
.size() + KEY2
.size() + sizeof(uint64_t)) +
611 (KEY2
.size() + KEY3
.size() + sizeof(uint64_t));
614 // The memtable data bytes includes the "garbage"
615 // bytes along with the useful payload.
616 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
=
617 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
;
619 // Note : one set of deleteRange for (KEY1, KEY2) and (KEY2, KEY3) is written
620 // to SSTable to propagate the deleteRange operations to K-V pairs that could
621 // have been inserted into the database during past Flush opeartions.
622 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
-=
623 (KEY1
.size() + KEY2
.size() + sizeof(uint64_t)) +
624 (KEY2
.size() + KEY3
.size() + sizeof(uint64_t));
626 // Overwrite KEY3 with known value (VALUE3)
627 // Note that during the whole time KEY3 has never been deleted
628 // by the RangeDeletes.
629 ASSERT_OK(Put(KEY3
, VALUE3
));
630 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
+=
631 KEY3
.size() + VALUE3
.size() + sizeof(uint64_t);
633 // Additional useful paylaod.
635 db_
->DeleteRange(WriteOptions(), db_
->DefaultColumnFamily(), KEY4
, KEY5
));
637 db_
->DeleteRange(WriteOptions(), db_
->DefaultColumnFamily(), KEY5
, KEY6
));
639 // Add useful payload to the memtable data bytes:
640 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
+=
641 (KEY4
.size() + KEY5
.size() + sizeof(uint64_t)) +
642 (KEY5
.size() + KEY6
.size() + sizeof(uint64_t));
644 // We assert that the K-V pairs have been successfully deleted.
646 ASSERT_NOK(Get(KEY1
, &value
));
647 ASSERT_NOK(Get(KEY2
, &value
));
648 // And that KEY3's value is correct.
649 ASSERT_OK(Get(KEY3
, &value
));
650 ASSERT_EQ(value
, VALUE3
);
652 // Force flush to SST. Increments the statistics counter.
655 // Collect statistics.
656 uint64_t mem_data_bytes
=
657 TestGetTickerCount(options
, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
);
658 uint64_t mem_garbage_bytes
=
659 TestGetTickerCount(options
, MEMTABLE_GARBAGE_BYTES_AT_FLUSH
);
661 EXPECT_EQ(mem_data_bytes
, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH
);
662 EXPECT_EQ(mem_garbage_bytes
, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH
);
668 // This simple Listener can only handle one flush at a time.
669 class TestFlushListener
: public EventListener
{
671 TestFlushListener(Env
* env
, DBFlushTest
* test
)
672 : slowdown_count(0), stop_count(0), db_closed(), env_(env
), test_(test
) {
676 ~TestFlushListener() override
{
677 prev_fc_info_
.status
.PermitUncheckedError(); // Ignore the status
680 void OnTableFileCreated(const TableFileCreationInfo
& info
) override
{
681 // remember the info for later checking the FlushJobInfo.
682 prev_fc_info_
= info
;
683 ASSERT_GT(info
.db_name
.size(), 0U);
684 ASSERT_GT(info
.cf_name
.size(), 0U);
685 ASSERT_GT(info
.file_path
.size(), 0U);
686 ASSERT_GT(info
.job_id
, 0);
687 ASSERT_GT(info
.table_properties
.data_size
, 0U);
688 ASSERT_GT(info
.table_properties
.raw_key_size
, 0U);
689 ASSERT_GT(info
.table_properties
.raw_value_size
, 0U);
690 ASSERT_GT(info
.table_properties
.num_data_blocks
, 0U);
691 ASSERT_GT(info
.table_properties
.num_entries
, 0U);
692 ASSERT_EQ(info
.file_checksum
, kUnknownFileChecksum
);
693 ASSERT_EQ(info
.file_checksum_func_name
, kUnknownFileChecksumFuncName
);
696 void OnFlushCompleted(DB
* db
, const FlushJobInfo
& info
) override
{
697 flushed_dbs_
.push_back(db
);
698 flushed_column_family_names_
.push_back(info
.cf_name
);
699 if (info
.triggered_writes_slowdown
) {
702 if (info
.triggered_writes_stop
) {
705 // verify whether the previously created file matches the flushed file.
706 ASSERT_EQ(prev_fc_info_
.db_name
, db
->GetName());
707 ASSERT_EQ(prev_fc_info_
.cf_name
, info
.cf_name
);
708 ASSERT_EQ(prev_fc_info_
.job_id
, info
.job_id
);
709 ASSERT_EQ(prev_fc_info_
.file_path
, info
.file_path
);
710 ASSERT_EQ(TableFileNameToNumber(info
.file_path
), info
.file_number
);
712 // Note: the following chunk relies on the notification pertaining to the
713 // database pointed to by DBTestBase::db_, and is thus bypassed when
714 // that assumption does not hold (see the test case MultiDBMultiListeners
717 if (db
== test_
->db_
) {
718 std::vector
<std::vector
<FileMetaData
>> files_by_level
;
719 test_
->dbfull()->TEST_GetFilesMetaData(db
->DefaultColumnFamily(),
722 ASSERT_FALSE(files_by_level
.empty());
723 auto it
= std::find_if(files_by_level
[0].begin(), files_by_level
[0].end(),
724 [&](const FileMetaData
& meta
) {
725 return meta
.fd
.GetNumber() == info
.file_number
;
727 ASSERT_NE(it
, files_by_level
[0].end());
728 ASSERT_EQ(info
.oldest_blob_file_number
, it
->oldest_blob_file_number
);
731 ASSERT_EQ(db
->GetEnv()->GetThreadID(), info
.thread_id
);
732 ASSERT_GT(info
.thread_id
, 0U);
735 std::vector
<std::string
> flushed_column_family_names_
;
736 std::vector
<DB
*> flushed_dbs_
;
740 std::atomic_bool db_closed
;
741 TableFileCreationInfo prev_fc_info_
;
747 #endif // !ROCKSDB_LITE
749 TEST_F(DBFlushTest
, MemPurgeBasic
) {
750 Options options
= CurrentOptions();
752 // The following options are used to enforce several values that
753 // may already exist as default values to make this test resilient
754 // to default value updates in the future.
755 options
.statistics
= CreateDBStatistics();
757 // Record all statistics.
758 options
.statistics
->set_stats_level(StatsLevel::kAll
);
760 // create the DB if it's not already present
761 options
.create_if_missing
= true;
763 // Useful for now as we are trying to compare uncompressed data savings on
765 options
.compression
= kNoCompression
;
767 // Prevent memtable in place updates. Should already be disabled
769 // In place updates can be enabled by toggling on the bool
770 // inplace_update_support flag. However, this flag is by default set to
772 // because this thread-safe in-place update support is not compatible
773 // with concurrent memtable writes. Note that the bool
774 // allow_concurrent_memtable_write is set to true by default )
775 options
.inplace_update_support
= false;
776 options
.allow_concurrent_memtable_write
= true;
778 // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
779 options
.write_buffer_size
= 1 << 20;
781 // Initially deactivate the MemPurge prototype.
782 options
.experimental_mempurge_threshold
= 0.0;
783 TestFlushListener
* listener
= new TestFlushListener(options
.env
, this);
784 options
.listeners
.emplace_back(listener
);
786 // Activate directly the MemPurge prototype.
787 // (RocksDB lite does not support dynamic options)
788 options
.experimental_mempurge_threshold
= 1.0;
789 #endif // !ROCKSDB_LITE
790 ASSERT_OK(TryReopen(options
));
792 // RocksDB lite does not support dynamic options
794 // Dynamically activate the MemPurge prototype without restarting the DB.
795 ColumnFamilyHandle
* cfh
= db_
->DefaultColumnFamily();
796 ASSERT_OK(db_
->SetOptions(cfh
, {{"experimental_mempurge_threshold", "1.0"}}));
799 std::atomic
<uint32_t> mempurge_count
{0};
800 std::atomic
<uint32_t> sst_count
{0};
801 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
802 "DBImpl::FlushJob:MemPurgeSuccessful",
803 [&](void* /*arg*/) { mempurge_count
++; });
804 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
805 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count
++; });
806 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
808 std::string KEY1
= "IamKey1";
809 std::string KEY2
= "IamKey2";
810 std::string KEY3
= "IamKey3";
811 std::string KEY4
= "IamKey4";
812 std::string KEY5
= "IamKey5";
813 std::string KEY6
= "IamKey6";
814 std::string KEY7
= "IamKey7";
815 std::string KEY8
= "IamKey8";
816 std::string KEY9
= "IamKey9";
817 std::string RNDKEY1
, RNDKEY2
, RNDKEY3
;
818 const std::string NOT_FOUND
= "NOT_FOUND";
820 // Heavy overwrite workload,
821 // more than would fit in maximum allowed memtables.
823 const size_t NUM_REPEAT
= 100;
824 const size_t RAND_KEYS_LENGTH
= 57;
825 const size_t RAND_VALUES_LENGTH
= 10240;
826 std::string p_v1
, p_v2
, p_v3
, p_v4
, p_v5
, p_v6
, p_v7
, p_v8
, p_v9
, p_rv1
,
829 // Insert a very first set of keys that will be
830 // mempurged at least once.
831 p_v1
= rnd
.RandomString(RAND_VALUES_LENGTH
);
832 p_v2
= rnd
.RandomString(RAND_VALUES_LENGTH
);
833 p_v3
= rnd
.RandomString(RAND_VALUES_LENGTH
);
834 p_v4
= rnd
.RandomString(RAND_VALUES_LENGTH
);
835 ASSERT_OK(Put(KEY1
, p_v1
));
836 ASSERT_OK(Put(KEY2
, p_v2
));
837 ASSERT_OK(Put(KEY3
, p_v3
));
838 ASSERT_OK(Put(KEY4
, p_v4
));
839 ASSERT_EQ(Get(KEY1
), p_v1
);
840 ASSERT_EQ(Get(KEY2
), p_v2
);
841 ASSERT_EQ(Get(KEY3
), p_v3
);
842 ASSERT_EQ(Get(KEY4
), p_v4
);
844 // Insertion of of K-V pairs, multiple times (overwrites).
845 for (size_t i
= 0; i
< NUM_REPEAT
; i
++) {
846 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
847 p_v5
= rnd
.RandomString(RAND_VALUES_LENGTH
);
848 p_v6
= rnd
.RandomString(RAND_VALUES_LENGTH
);
849 p_v7
= rnd
.RandomString(RAND_VALUES_LENGTH
);
850 p_v8
= rnd
.RandomString(RAND_VALUES_LENGTH
);
851 p_v9
= rnd
.RandomString(RAND_VALUES_LENGTH
);
853 ASSERT_OK(Put(KEY5
, p_v5
));
854 ASSERT_OK(Put(KEY6
, p_v6
));
855 ASSERT_OK(Put(KEY7
, p_v7
));
856 ASSERT_OK(Put(KEY8
, p_v8
));
857 ASSERT_OK(Put(KEY9
, p_v9
));
859 ASSERT_EQ(Get(KEY1
), p_v1
);
860 ASSERT_EQ(Get(KEY2
), p_v2
);
861 ASSERT_EQ(Get(KEY3
), p_v3
);
862 ASSERT_EQ(Get(KEY4
), p_v4
);
863 ASSERT_EQ(Get(KEY5
), p_v5
);
864 ASSERT_EQ(Get(KEY6
), p_v6
);
865 ASSERT_EQ(Get(KEY7
), p_v7
);
866 ASSERT_EQ(Get(KEY8
), p_v8
);
867 ASSERT_EQ(Get(KEY9
), p_v9
);
870 // Check that there was at least one mempurge
871 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT
= 1;
872 // Check that there was no SST files created during flush.
873 const uint32_t EXPECTED_SST_COUNT
= 0;
875 EXPECT_GE(mempurge_count
.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT
);
876 EXPECT_EQ(sst_count
.exchange(0), EXPECTED_SST_COUNT
);
878 // Insertion of of K-V pairs, no overwrites.
879 for (size_t i
= 0; i
< NUM_REPEAT
; i
++) {
880 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
881 RNDKEY1
= rnd
.RandomString(RAND_KEYS_LENGTH
);
882 RNDKEY2
= rnd
.RandomString(RAND_KEYS_LENGTH
);
883 RNDKEY3
= rnd
.RandomString(RAND_KEYS_LENGTH
);
884 p_rv1
= rnd
.RandomString(RAND_VALUES_LENGTH
);
885 p_rv2
= rnd
.RandomString(RAND_VALUES_LENGTH
);
886 p_rv3
= rnd
.RandomString(RAND_VALUES_LENGTH
);
888 ASSERT_OK(Put(RNDKEY1
, p_rv1
));
889 ASSERT_OK(Put(RNDKEY2
, p_rv2
));
890 ASSERT_OK(Put(RNDKEY3
, p_rv3
));
892 ASSERT_EQ(Get(KEY1
), p_v1
);
893 ASSERT_EQ(Get(KEY2
), p_v2
);
894 ASSERT_EQ(Get(KEY3
), p_v3
);
895 ASSERT_EQ(Get(KEY4
), p_v4
);
896 ASSERT_EQ(Get(KEY5
), p_v5
);
897 ASSERT_EQ(Get(KEY6
), p_v6
);
898 ASSERT_EQ(Get(KEY7
), p_v7
);
899 ASSERT_EQ(Get(KEY8
), p_v8
);
900 ASSERT_EQ(Get(KEY9
), p_v9
);
901 ASSERT_EQ(Get(RNDKEY1
), p_rv1
);
902 ASSERT_EQ(Get(RNDKEY2
), p_rv2
);
903 ASSERT_EQ(Get(RNDKEY3
), p_rv3
);
906 // Assert that at least one flush to storage has been performed
907 EXPECT_GT(sst_count
.exchange(0), EXPECTED_SST_COUNT
);
908 // (which will consequently increase the number of mempurges recorded too).
909 EXPECT_GE(mempurge_count
.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT
);
911 // Assert that there is no data corruption, even with
912 // a flush to storage.
913 ASSERT_EQ(Get(KEY1
), p_v1
);
914 ASSERT_EQ(Get(KEY2
), p_v2
);
915 ASSERT_EQ(Get(KEY3
), p_v3
);
916 ASSERT_EQ(Get(KEY4
), p_v4
);
917 ASSERT_EQ(Get(KEY5
), p_v5
);
918 ASSERT_EQ(Get(KEY6
), p_v6
);
919 ASSERT_EQ(Get(KEY7
), p_v7
);
920 ASSERT_EQ(Get(KEY8
), p_v8
);
921 ASSERT_EQ(Get(KEY9
), p_v9
);
922 ASSERT_EQ(Get(RNDKEY1
), p_rv1
);
923 ASSERT_EQ(Get(RNDKEY2
), p_rv2
);
924 ASSERT_EQ(Get(RNDKEY3
), p_rv3
);
929 // RocksDB lite does not support dynamic options
931 TEST_F(DBFlushTest
, MemPurgeBasicToggle
) {
932 Options options
= CurrentOptions();
934 // The following options are used to enforce several values that
935 // may already exist as default values to make this test resilient
936 // to default value updates in the future.
937 options
.statistics
= CreateDBStatistics();
939 // Record all statistics.
940 options
.statistics
->set_stats_level(StatsLevel::kAll
);
942 // create the DB if it's not already present
943 options
.create_if_missing
= true;
945 // Useful for now as we are trying to compare uncompressed data savings on
947 options
.compression
= kNoCompression
;
949 // Prevent memtable in place updates. Should already be disabled
951 // In place updates can be enabled by toggling on the bool
952 // inplace_update_support flag. However, this flag is by default set to
954 // because this thread-safe in-place update support is not compatible
955 // with concurrent memtable writes. Note that the bool
956 // allow_concurrent_memtable_write is set to true by default )
957 options
.inplace_update_support
= false;
958 options
.allow_concurrent_memtable_write
= true;
960 // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
961 options
.write_buffer_size
= 1 << 20;
962 // Initially deactivate the MemPurge prototype.
963 // (negative values are equivalent to 0.0).
964 options
.experimental_mempurge_threshold
= -25.3;
965 TestFlushListener
* listener
= new TestFlushListener(options
.env
, this);
966 options
.listeners
.emplace_back(listener
);
968 ASSERT_OK(TryReopen(options
));
969 // Dynamically activate the MemPurge prototype without restarting the DB.
970 ColumnFamilyHandle
* cfh
= db_
->DefaultColumnFamily();
971 // Values greater than 1.0 are equivalent to 1.0
973 db_
->SetOptions(cfh
, {{"experimental_mempurge_threshold", "3.7898"}}));
974 std::atomic
<uint32_t> mempurge_count
{0};
975 std::atomic
<uint32_t> sst_count
{0};
976 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
977 "DBImpl::FlushJob:MemPurgeSuccessful",
978 [&](void* /*arg*/) { mempurge_count
++; });
979 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
980 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count
++; });
981 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
982 const size_t KVSIZE
= 3;
983 std::vector
<std::string
> KEYS(KVSIZE
);
984 for (size_t k
= 0; k
< KVSIZE
; k
++) {
985 KEYS
[k
] = "IamKey" + std::to_string(k
);
988 std::vector
<std::string
> RNDVALS(KVSIZE
);
989 const std::string NOT_FOUND
= "NOT_FOUND";
991 // Heavy overwrite workload,
992 // more than would fit in maximum allowed memtables.
994 const size_t NUM_REPEAT
= 100;
995 const size_t RAND_VALUES_LENGTH
= 10240;
997 // Insertion of of K-V pairs, multiple times (overwrites).
998 for (size_t i
= 0; i
< NUM_REPEAT
; i
++) {
999 for (size_t j
= 0; j
< KEYS
.size(); j
++) {
1000 RNDVALS
[j
] = rnd
.RandomString(RAND_VALUES_LENGTH
);
1001 ASSERT_OK(Put(KEYS
[j
], RNDVALS
[j
]));
1002 ASSERT_EQ(Get(KEYS
[j
]), RNDVALS
[j
]);
1004 for (size_t j
= 0; j
< KEYS
.size(); j
++) {
1005 ASSERT_EQ(Get(KEYS
[j
]), RNDVALS
[j
]);
1009 // Check that there was at least one mempurge
1010 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT
= 1;
1011 // Check that there was no SST files created during flush.
1012 const uint32_t EXPECTED_SST_COUNT
= 0;
1014 EXPECT_GE(mempurge_count
.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT
);
1015 EXPECT_EQ(sst_count
.exchange(0), EXPECTED_SST_COUNT
);
1017 // Dynamically deactivate MemPurge.
1019 db_
->SetOptions(cfh
, {{"experimental_mempurge_threshold", "-1023.0"}}));
1021 // Insertion of of K-V pairs, multiple times (overwrites).
1022 for (size_t i
= 0; i
< NUM_REPEAT
; i
++) {
1023 for (size_t j
= 0; j
< KEYS
.size(); j
++) {
1024 RNDVALS
[j
] = rnd
.RandomString(RAND_VALUES_LENGTH
);
1025 ASSERT_OK(Put(KEYS
[j
], RNDVALS
[j
]));
1026 ASSERT_EQ(Get(KEYS
[j
]), RNDVALS
[j
]);
1028 for (size_t j
= 0; j
< KEYS
.size(); j
++) {
1029 ASSERT_EQ(Get(KEYS
[j
]), RNDVALS
[j
]);
1033 // Check that there was at least one mempurge
1034 const uint32_t ZERO
= 0;
1035 // Assert that at least one flush to storage has been performed
1036 EXPECT_GT(sst_count
.exchange(0), EXPECTED_SST_COUNT
);
1037 // The mempurge count is expected to be set to 0 when the options are updated.
1038 // We expect no mempurge at all.
1039 EXPECT_EQ(mempurge_count
.exchange(0), ZERO
);
1043 // Closes the "#ifndef ROCKSDB_LITE"
1044 // End of MemPurgeBasicToggle, which is not
1045 // supported with RocksDB LITE because it
1046 // relies on dynamically changing the option
1047 // flag experimental_mempurge_threshold.
1050 // At the moment, MemPurge feature is deactivated
1051 // when atomic_flush is enabled. This is because the level
1052 // of garbage between Column Families is not guaranteed to
1053 // be consistent, therefore a CF could hypothetically
1054 // trigger a MemPurge while another CF would trigger
1056 TEST_F(DBFlushTest
, MemPurgeWithAtomicFlush
) {
1057 Options options
= CurrentOptions();
1059 // The following options are used to enforce several values that
1060 // may already exist as default values to make this test resilient
1061 // to default value updates in the future.
1062 options
.statistics
= CreateDBStatistics();
1064 // Record all statistics.
1065 options
.statistics
->set_stats_level(StatsLevel::kAll
);
1067 // create the DB if it's not already present
1068 options
.create_if_missing
= true;
1070 // Useful for now as we are trying to compare uncompressed data savings on
1072 options
.compression
= kNoCompression
;
1074 // Prevent memtable in place updates. Should already be disabled
1076 // In place updates can be enabled by toggling on the bool
1077 // inplace_update_support flag. However, this flag is by default set to
1079 // because this thread-safe in-place update support is not compatible
1080 // with concurrent memtable writes. Note that the bool
1081 // allow_concurrent_memtable_write is set to true by default )
1082 options
.inplace_update_support
= false;
1083 options
.allow_concurrent_memtable_write
= true;
1085 // Enforce size of a single MemTable to 64KB (64KB = 65,536 bytes).
1086 options
.write_buffer_size
= 1 << 20;
1087 // Activate the MemPurge prototype.
1088 options
.experimental_mempurge_threshold
= 153.245;
1089 // Activate atomic_flush.
1090 options
.atomic_flush
= true;
1092 const std::vector
<std::string
> new_cf_names
= {"pikachu", "eevie"};
1093 CreateColumnFamilies(new_cf_names
, options
);
1097 // 3 CFs: default will be filled with overwrites (would normally trigger
1099 // new_cf_names[1] will be filled with random values (would trigger
1100 // flush) new_cf_names[2] not filled with anything.
1101 ReopenWithColumnFamilies(
1102 {kDefaultColumnFamilyName
, new_cf_names
[0], new_cf_names
[1]}, options
);
1103 size_t num_cfs
= handles_
.size();
1104 ASSERT_EQ(3, num_cfs
);
1105 ASSERT_OK(Put(1, "foo", "bar"));
1106 ASSERT_OK(Put(2, "bar", "baz"));
1108 std::atomic
<uint32_t> mempurge_count
{0};
1109 std::atomic
<uint32_t> sst_count
{0};
1110 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1111 "DBImpl::FlushJob:MemPurgeSuccessful",
1112 [&](void* /*arg*/) { mempurge_count
++; });
1113 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1114 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count
++; });
1115 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1117 const size_t KVSIZE
= 3;
1118 std::vector
<std::string
> KEYS(KVSIZE
);
1119 for (size_t k
= 0; k
< KVSIZE
; k
++) {
1120 KEYS
[k
] = "IamKey" + std::to_string(k
);
1124 std::vector
<std::string
> RNDVALS(KVSIZE
);
1125 const std::string NOT_FOUND
= "NOT_FOUND";
1127 // Heavy overwrite workload,
1128 // more than would fit in maximum allowed memtables.
1130 const size_t NUM_REPEAT
= 100;
1131 const size_t RAND_KEY_LENGTH
= 128;
1132 const size_t RAND_VALUES_LENGTH
= 10240;
1134 // Insertion of of K-V pairs, multiple times (overwrites).
1135 for (size_t i
= 0; i
< NUM_REPEAT
; i
++) {
1136 for (size_t j
= 0; j
< KEYS
.size(); j
++) {
1137 RNDKEY
= rnd
.RandomString(RAND_KEY_LENGTH
);
1138 RNDVALS
[j
] = rnd
.RandomString(RAND_VALUES_LENGTH
);
1139 ASSERT_OK(Put(KEYS
[j
], RNDVALS
[j
]));
1140 ASSERT_OK(Put(1, RNDKEY
, RNDVALS
[j
]));
1141 ASSERT_EQ(Get(KEYS
[j
]), RNDVALS
[j
]);
1142 ASSERT_EQ(Get(1, RNDKEY
), RNDVALS
[j
]);
1146 // Check that there was no mempurge because atomic_flush option is true.
1147 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT
= 0;
1148 // Check that there was at least one SST files created during flush.
1149 const uint32_t EXPECTED_SST_COUNT
= 1;
1151 EXPECT_EQ(mempurge_count
.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT
);
1152 EXPECT_GE(sst_count
.exchange(0), EXPECTED_SST_COUNT
);
1157 TEST_F(DBFlushTest
, MemPurgeDeleteAndDeleteRange
) {
1158 Options options
= CurrentOptions();
1160 options
.statistics
= CreateDBStatistics();
1161 options
.statistics
->set_stats_level(StatsLevel::kAll
);
1162 options
.create_if_missing
= true;
1163 options
.compression
= kNoCompression
;
1164 options
.inplace_update_support
= false;
1165 options
.allow_concurrent_memtable_write
= true;
1166 #ifndef ROCKSDB_LITE
1167 TestFlushListener
* listener
= new TestFlushListener(options
.env
, this);
1168 options
.listeners
.emplace_back(listener
);
1169 #endif // !ROCKSDB_LITE
1170 // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
1171 options
.write_buffer_size
= 1 << 20;
1172 // Activate the MemPurge prototype.
1173 options
.experimental_mempurge_threshold
= 15.0;
1175 ASSERT_OK(TryReopen(options
));
1177 std::atomic
<uint32_t> mempurge_count
{0};
1178 std::atomic
<uint32_t> sst_count
{0};
1179 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1180 "DBImpl::FlushJob:MemPurgeSuccessful",
1181 [&](void* /*arg*/) { mempurge_count
++; });
1182 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1183 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count
++; });
1184 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1186 std::string KEY1
= "ThisIsKey1";
1187 std::string KEY2
= "ThisIsKey2";
1188 std::string KEY3
= "ThisIsKey3";
1189 std::string KEY4
= "ThisIsKey4";
1190 std::string KEY5
= "ThisIsKey5";
1191 const std::string NOT_FOUND
= "NOT_FOUND";
1194 const size_t NUM_REPEAT
= 100;
1195 const size_t RAND_VALUES_LENGTH
= 10240;
1197 std::string key
, value
, p_v1
, p_v2
, p_v3
, p_v3b
, p_v4
, p_v5
;
1199 const int EXPECTED_COUNT_FORLOOP
= 3;
1200 const int EXPECTED_COUNT_END
= 4;
1203 ropt
.pin_data
= true;
1204 ropt
.total_order_seek
= true;
1205 Iterator
* iter
= nullptr;
1207 // Insertion of of K-V pairs, multiple times.
1208 // Also insert DeleteRange
1209 for (size_t i
= 0; i
< NUM_REPEAT
; i
++) {
1210 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
1211 p_v1
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1212 p_v2
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1213 p_v3
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1214 p_v3b
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1215 p_v4
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1216 p_v5
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1217 ASSERT_OK(Put(KEY1
, p_v1
));
1218 ASSERT_OK(Put(KEY2
, p_v2
));
1219 ASSERT_OK(Put(KEY3
, p_v3
));
1220 ASSERT_OK(Put(KEY4
, p_v4
));
1221 ASSERT_OK(Put(KEY5
, p_v5
));
1222 ASSERT_OK(Delete(KEY2
));
1223 ASSERT_OK(db_
->DeleteRange(WriteOptions(), db_
->DefaultColumnFamily(), KEY2
,
1225 ASSERT_OK(Put(KEY3
, p_v3b
));
1226 ASSERT_OK(db_
->DeleteRange(WriteOptions(), db_
->DefaultColumnFamily(), KEY1
,
1228 ASSERT_OK(Delete(KEY1
));
1230 ASSERT_EQ(Get(KEY1
), NOT_FOUND
);
1231 ASSERT_EQ(Get(KEY2
), NOT_FOUND
);
1232 ASSERT_EQ(Get(KEY3
), p_v3b
);
1233 ASSERT_EQ(Get(KEY4
), p_v4
);
1234 ASSERT_EQ(Get(KEY5
), p_v5
);
1236 iter
= db_
->NewIterator(ropt
);
1237 iter
->SeekToFirst();
1239 for (; iter
->Valid(); iter
->Next()) {
1240 ASSERT_OK(iter
->status());
1241 key
= (iter
->key()).ToString(false);
1242 value
= (iter
->value()).ToString(false);
1243 if (key
.compare(KEY3
) == 0)
1244 ASSERT_EQ(value
, p_v3b
);
1245 else if (key
.compare(KEY4
) == 0)
1246 ASSERT_EQ(value
, p_v4
);
1247 else if (key
.compare(KEY5
) == 0)
1248 ASSERT_EQ(value
, p_v5
);
1250 ASSERT_EQ(value
, NOT_FOUND
);
1254 // Expected count here is 3: KEY3, KEY4, KEY5.
1255 ASSERT_EQ(count
, EXPECTED_COUNT_FORLOOP
);
1261 // Check that there was at least one mempurge
1262 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT
= 1;
1263 // Check that there was no SST files created during flush.
1264 const uint32_t EXPECTED_SST_COUNT
= 0;
1266 EXPECT_GE(mempurge_count
.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT
);
1267 EXPECT_EQ(sst_count
.exchange(0), EXPECTED_SST_COUNT
);
1269 // Additional test for the iterator+memPurge.
1270 ASSERT_OK(Put(KEY2
, p_v2
));
1271 iter
= db_
->NewIterator(ropt
);
1272 iter
->SeekToFirst();
1273 ASSERT_OK(Put(KEY4
, p_v4
));
1275 for (; iter
->Valid(); iter
->Next()) {
1276 ASSERT_OK(iter
->status());
1277 key
= (iter
->key()).ToString(false);
1278 value
= (iter
->value()).ToString(false);
1279 if (key
.compare(KEY2
) == 0)
1280 ASSERT_EQ(value
, p_v2
);
1281 else if (key
.compare(KEY3
) == 0)
1282 ASSERT_EQ(value
, p_v3b
);
1283 else if (key
.compare(KEY4
) == 0)
1284 ASSERT_EQ(value
, p_v4
);
1285 else if (key
.compare(KEY5
) == 0)
1286 ASSERT_EQ(value
, p_v5
);
1288 ASSERT_EQ(value
, NOT_FOUND
);
1292 // Expected count here is 4: KEY2, KEY3, KEY4, KEY5.
1293 ASSERT_EQ(count
, EXPECTED_COUNT_END
);
1294 if (iter
) delete iter
;
1299 // Create a Compaction Fitler that will be invoked
1300 // at flush time and will update the value of a KV pair
1301 // if the key string is "lower" than the filter_key_ string.
1302 class ConditionalUpdateFilter
: public CompactionFilter
{
1304 explicit ConditionalUpdateFilter(const std::string
* filtered_key
)
1305 : filtered_key_(filtered_key
) {}
1306 bool Filter(int /*level*/, const Slice
& key
, const Slice
& /*value*/,
1307 std::string
* new_value
, bool* value_changed
) const override
{
1308 // If key<filtered_key_, update the value of the KV-pair.
1309 if (key
.compare(*filtered_key_
) < 0) {
1310 assert(new_value
!= nullptr);
1311 *new_value
= NEW_VALUE
;
1312 *value_changed
= true;
1314 return false /*do not remove this KV-pair*/;
1317 const char* Name() const override
{ return "ConditionalUpdateFilter"; }
1320 const std::string
* filtered_key_
;
1323 class ConditionalUpdateFilterFactory
: public CompactionFilterFactory
{
1325 explicit ConditionalUpdateFilterFactory(const Slice
& filtered_key
)
1326 : filtered_key_(filtered_key
.ToString()) {}
1328 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
1329 const CompactionFilter::Context
& /*context*/) override
{
1330 return std::unique_ptr
<CompactionFilter
>(
1331 new ConditionalUpdateFilter(&filtered_key_
));
1334 const char* Name() const override
{ return "ConditionalUpdateFilterFactory"; }
1336 bool ShouldFilterTableFileCreation(
1337 TableFileCreationReason reason
) const override
{
1338 // This compaction filter will be invoked
1339 // at flush time (and therefore at MemPurge time).
1340 return (reason
== TableFileCreationReason::kFlush
);
1344 std::string filtered_key_
;
1347 TEST_F(DBFlushTest
, MemPurgeAndCompactionFilter
) {
1348 Options options
= CurrentOptions();
1350 std::string KEY1
= "ThisIsKey1";
1351 std::string KEY2
= "ThisIsKey2";
1352 std::string KEY3
= "ThisIsKey3";
1353 std::string KEY4
= "ThisIsKey4";
1354 std::string KEY5
= "ThisIsKey5";
1355 std::string KEY6
= "ThisIsKey6";
1356 std::string KEY7
= "ThisIsKey7";
1357 std::string KEY8
= "ThisIsKey8";
1358 std::string KEY9
= "ThisIsKey9";
1359 const std::string NOT_FOUND
= "NOT_FOUND";
1361 options
.statistics
= CreateDBStatistics();
1362 options
.statistics
->set_stats_level(StatsLevel::kAll
);
1363 options
.create_if_missing
= true;
1364 options
.compression
= kNoCompression
;
1365 options
.inplace_update_support
= false;
1366 options
.allow_concurrent_memtable_write
= true;
1367 #ifndef ROCKSDB_LITE
1368 TestFlushListener
* listener
= new TestFlushListener(options
.env
, this);
1369 options
.listeners
.emplace_back(listener
);
1370 #endif // !ROCKSDB_LITE
1371 // Create a ConditionalUpdate compaction filter
1372 // that will update all the values of the KV pairs
1373 // where the keys are "lower" than KEY4.
1374 options
.compaction_filter_factory
=
1375 std::make_shared
<ConditionalUpdateFilterFactory
>(KEY4
);
1377 // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
1378 options
.write_buffer_size
= 1 << 20;
1379 // Activate the MemPurge prototype.
1380 options
.experimental_mempurge_threshold
= 26.55;
1382 ASSERT_OK(TryReopen(options
));
1384 std::atomic
<uint32_t> mempurge_count
{0};
1385 std::atomic
<uint32_t> sst_count
{0};
1386 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1387 "DBImpl::FlushJob:MemPurgeSuccessful",
1388 [&](void* /*arg*/) { mempurge_count
++; });
1389 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1390 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count
++; });
1391 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1394 const size_t NUM_REPEAT
= 1000;
1395 const size_t RAND_VALUES_LENGTH
= 10240;
1396 std::string p_v1
, p_v2
, p_v3
, p_v4
, p_v5
, p_v6
, p_v7
, p_v8
, p_v9
;
1398 p_v1
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1399 p_v2
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1400 p_v3
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1401 p_v4
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1402 p_v5
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1403 ASSERT_OK(Put(KEY1
, p_v1
));
1404 ASSERT_OK(Put(KEY2
, p_v2
));
1405 ASSERT_OK(Put(KEY3
, p_v3
));
1406 ASSERT_OK(Put(KEY4
, p_v4
));
1407 ASSERT_OK(Put(KEY5
, p_v5
));
1408 ASSERT_OK(Delete(KEY1
));
1410 // Insertion of of K-V pairs, multiple times.
1411 for (size_t i
= 0; i
< NUM_REPEAT
; i
++) {
1412 // Create value strings of arbitrary
1413 // length RAND_VALUES_LENGTH bytes.
1414 p_v6
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1415 p_v7
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1416 p_v8
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1417 p_v9
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1418 ASSERT_OK(Put(KEY6
, p_v6
));
1419 ASSERT_OK(Put(KEY7
, p_v7
));
1420 ASSERT_OK(Put(KEY8
, p_v8
));
1421 ASSERT_OK(Put(KEY9
, p_v9
));
1423 ASSERT_OK(Delete(KEY7
));
1426 // Check that there was at least one mempurge
1427 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT
= 1;
1428 // Check that there was no SST files created during flush.
1429 const uint32_t EXPECTED_SST_COUNT
= 0;
1431 EXPECT_GE(mempurge_count
.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT
);
1432 EXPECT_EQ(sst_count
.exchange(0), EXPECTED_SST_COUNT
);
1434 // Verify that the ConditionalUpdateCompactionFilter
1435 // updated the values of KEY2 and KEY3, and not KEY4 and KEY5.
1436 ASSERT_EQ(Get(KEY1
), NOT_FOUND
);
1437 ASSERT_EQ(Get(KEY2
), NEW_VALUE
);
1438 ASSERT_EQ(Get(KEY3
), NEW_VALUE
);
1439 ASSERT_EQ(Get(KEY4
), p_v4
);
1440 ASSERT_EQ(Get(KEY5
), p_v5
);
1443 TEST_F(DBFlushTest
, DISABLED_MemPurgeWALSupport
) {
1444 Options options
= CurrentOptions();
1446 options
.statistics
= CreateDBStatistics();
1447 options
.statistics
->set_stats_level(StatsLevel::kAll
);
1448 options
.create_if_missing
= true;
1449 options
.compression
= kNoCompression
;
1450 options
.inplace_update_support
= false;
1451 options
.allow_concurrent_memtable_write
= true;
1453 // Enforce size of a single MemTable to 128KB.
1454 options
.write_buffer_size
= 128 << 10;
1455 // Activate the MemPurge prototype
1456 // (values >1.0 are equivalent to 1.0).
1457 options
.experimental_mempurge_threshold
= 2.5;
1459 ASSERT_OK(TryReopen(options
));
1461 const size_t KVSIZE
= 10;
1464 CreateAndReopenWithCF({"pikachu"}, options
);
1465 ASSERT_OK(Put(1, "foo", "v1"));
1466 ASSERT_OK(Put(1, "baz", "v5"));
1468 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1469 ASSERT_EQ("v1", Get(1, "foo"));
1471 ASSERT_EQ("v1", Get(1, "foo"));
1472 ASSERT_EQ("v5", Get(1, "baz"));
1473 ASSERT_OK(Put(0, "bar", "v2"));
1474 ASSERT_OK(Put(1, "bar", "v2"));
1475 ASSERT_OK(Put(1, "foo", "v3"));
1476 std::atomic
<uint32_t> mempurge_count
{0};
1477 std::atomic
<uint32_t> sst_count
{0};
1478 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1479 "DBImpl::FlushJob:MemPurgeSuccessful",
1480 [&](void* /*arg*/) { mempurge_count
++; });
1481 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1482 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count
++; });
1483 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1485 std::vector
<std::string
> keys
;
1486 for (size_t k
= 0; k
< KVSIZE
; k
++) {
1487 keys
.push_back("IamKey" + std::to_string(k
));
1490 std::string RNDKEY
, RNDVALUE
;
1491 const std::string NOT_FOUND
= "NOT_FOUND";
1493 // Heavy overwrite workload,
1494 // more than would fit in maximum allowed memtables.
1496 const size_t NUM_REPEAT
= 100;
1497 const size_t RAND_KEY_LENGTH
= 4096;
1498 const size_t RAND_VALUES_LENGTH
= 1024;
1499 std::vector
<std::string
> values_default(KVSIZE
), values_pikachu(KVSIZE
);
1501 // Insert a very first set of keys that will be
1502 // mempurged at least once.
1503 for (size_t k
= 0; k
< KVSIZE
/ 2; k
++) {
1504 values_default
[k
] = rnd
.RandomString(RAND_VALUES_LENGTH
);
1505 values_pikachu
[k
] = rnd
.RandomString(RAND_VALUES_LENGTH
);
1508 // Insert keys[0:KVSIZE/2] to
1509 // both 'default' and 'pikachu' CFs.
1510 for (size_t k
= 0; k
< KVSIZE
/ 2; k
++) {
1511 ASSERT_OK(Put(0, keys
[k
], values_default
[k
]));
1512 ASSERT_OK(Put(1, keys
[k
], values_pikachu
[k
]));
1515 // Check that the insertion was seamless.
1516 for (size_t k
= 0; k
< KVSIZE
/ 2; k
++) {
1517 ASSERT_EQ(Get(0, keys
[k
]), values_default
[k
]);
1518 ASSERT_EQ(Get(1, keys
[k
]), values_pikachu
[k
]);
1521 // Insertion of of K-V pairs, multiple times (overwrites)
1522 // into 'default' CF. Will trigger mempurge.
1523 for (size_t j
= 0; j
< NUM_REPEAT
; j
++) {
1524 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
1525 for (size_t k
= KVSIZE
/ 2; k
< KVSIZE
; k
++) {
1526 values_default
[k
] = rnd
.RandomString(RAND_VALUES_LENGTH
);
1529 // Insert K-V into default CF.
1530 for (size_t k
= KVSIZE
/ 2; k
< KVSIZE
; k
++) {
1531 ASSERT_OK(Put(0, keys
[k
], values_default
[k
]));
1534 // Check key validity, for all keys, both in
1535 // default and pikachu CFs.
1536 for (size_t k
= 0; k
< KVSIZE
; k
++) {
1537 ASSERT_EQ(Get(0, keys
[k
]), values_default
[k
]);
1539 // Note that at this point, only keys[0:KVSIZE/2]
1540 // have been inserted into Pikachu.
1541 for (size_t k
= 0; k
< KVSIZE
/ 2; k
++) {
1542 ASSERT_EQ(Get(1, keys
[k
]), values_pikachu
[k
]);
1546 // Insertion of of K-V pairs, multiple times (overwrites)
1547 // into 'pikachu' CF. Will trigger mempurge.
1548 // Check that we keep the older logs for 'default' imm().
1549 for (size_t j
= 0; j
< NUM_REPEAT
; j
++) {
1550 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
1551 for (size_t k
= KVSIZE
/ 2; k
< KVSIZE
; k
++) {
1552 values_pikachu
[k
] = rnd
.RandomString(RAND_VALUES_LENGTH
);
1555 // Insert K-V into pikachu CF.
1556 for (size_t k
= KVSIZE
/ 2; k
< KVSIZE
; k
++) {
1557 ASSERT_OK(Put(1, keys
[k
], values_pikachu
[k
]));
1560 // Check key validity, for all keys,
1561 // both in default and pikachu.
1562 for (size_t k
= 0; k
< KVSIZE
; k
++) {
1563 ASSERT_EQ(Get(0, keys
[k
]), values_default
[k
]);
1564 ASSERT_EQ(Get(1, keys
[k
]), values_pikachu
[k
]);
1568 // Check that there was at least one mempurge
1569 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT
= 1;
1570 // Check that there was no SST files created during flush.
1571 const uint32_t EXPECTED_SST_COUNT
= 0;
1573 EXPECT_GE(mempurge_count
.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT
);
1574 if (options
.experimental_mempurge_threshold
==
1575 std::numeric_limits
<double>::max()) {
1576 EXPECT_EQ(sst_count
.exchange(0), EXPECTED_SST_COUNT
);
1579 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1580 // Check that there was no data corruption anywhere,
1581 // not in 'default' nor in 'Pikachu' CFs.
1582 ASSERT_EQ("v3", Get(1, "foo"));
1583 ASSERT_OK(Put(1, "foo", "v4"));
1584 ASSERT_EQ("v4", Get(1, "foo"));
1585 ASSERT_EQ("v2", Get(1, "bar"));
1586 ASSERT_EQ("v5", Get(1, "baz"));
1587 // Check keys in 'Default' and 'Pikachu'.
1588 // keys[0:KVSIZE/2] were for sure contained
1589 // in the imm() at Reopen/recovery time.
1590 for (size_t k
= 0; k
< KVSIZE
; k
++) {
1591 ASSERT_EQ(Get(0, keys
[k
]), values_default
[k
]);
1592 ASSERT_EQ(Get(1, keys
[k
]), values_pikachu
[k
]);
1594 // Insertion of random K-V pairs to trigger
1595 // a flush in the Pikachu CF.
1596 for (size_t j
= 0; j
< NUM_REPEAT
; j
++) {
1597 RNDKEY
= rnd
.RandomString(RAND_KEY_LENGTH
);
1598 RNDVALUE
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1599 ASSERT_OK(Put(1, RNDKEY
, RNDVALUE
));
1601 // ASsert than there was at least one flush to storage.
1602 EXPECT_GT(sst_count
.exchange(0), EXPECTED_SST_COUNT
);
1603 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
1604 ASSERT_EQ("v4", Get(1, "foo"));
1605 ASSERT_EQ("v2", Get(1, "bar"));
1606 ASSERT_EQ("v5", Get(1, "baz"));
1607 // Since values in default are held in mutable mem()
1608 // and imm(), check if the flush in pikachu didn't
1609 // affect these values.
1610 for (size_t k
= 0; k
< KVSIZE
; k
++) {
1611 ASSERT_EQ(Get(0, keys
[k
]), values_default
[k
]);
1612 ASSERT_EQ(Get(1, keys
[k
]), values_pikachu
[k
]);
1614 ASSERT_EQ(Get(1, RNDKEY
), RNDVALUE
);
1615 } while (ChangeWalOptions());
1618 TEST_F(DBFlushTest
, MemPurgeCorrectLogNumberAndSSTFileCreation
) {
1619 // Before our bug fix, we noticed that when 2 memtables were
1620 // being flushed (with one memtable being the output of a
1621 // previous MemPurge and one memtable being a newly-sealed memtable),
1622 // the SST file created was not properly added to the DB version
1623 // (via the VersionEdit obj), leading to data loss (the SST file
1624 // was later being purged as an obsolete file).
1625 // Therefore, we reproduce this scenario to test our fix.
1626 Options options
= CurrentOptions();
1628 options
.create_if_missing
= true;
1629 options
.compression
= kNoCompression
;
1630 options
.inplace_update_support
= false;
1631 options
.allow_concurrent_memtable_write
= true;
1633 // Enforce size of a single MemTable to 1MB (64MB = 1048576 bytes).
1634 options
.write_buffer_size
= 1 << 20;
1635 // Activate the MemPurge prototype.
1636 options
.experimental_mempurge_threshold
= 1.0;
1638 // Force to have more than one memtable to trigger a flush.
1639 // For some reason this option does not seem to be enforced,
1640 // so the following test is designed to make sure that we
1641 // are testing the correct test case.
1642 options
.min_write_buffer_number_to_merge
= 3;
1643 options
.max_write_buffer_number
= 5;
1644 options
.max_write_buffer_size_to_maintain
= 2 * (options
.write_buffer_size
);
1645 options
.disable_auto_compactions
= true;
1646 ASSERT_OK(TryReopen(options
));
1648 std::atomic
<uint32_t> mempurge_count
{0};
1649 std::atomic
<uint32_t> sst_count
{0};
1650 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1651 "DBImpl::FlushJob:MemPurgeSuccessful",
1652 [&](void* /*arg*/) { mempurge_count
++; });
1653 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1654 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count
++; });
1655 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1657 // Dummy variable used for the following callback function.
1659 // We will first execute mempurge operations exclusively.
1660 // Therefore, when the first flush is triggered, we want to make
1661 // sure there is at least 2 memtables being flushed: one output
1662 // from a previous mempurge, and one newly sealed memtable.
1663 // This is when we observed in the past that some SST files created
1664 // were not properly added to the DB version (via the VersionEdit obj).
1665 std::atomic
<uint64_t> num_memtable_at_first_flush(0);
1666 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1667 "FlushJob::WriteLevel0Table:num_memtables", [&](void* arg
) {
1668 uint64_t* mems_size
= reinterpret_cast<uint64_t*>(arg
);
1669 // atomic_compare_exchange_strong sometimes updates the value
1670 // of ZERO (the "expected" object), so we make sure ZERO is indeed...
1673 std::atomic_compare_exchange_strong(&num_memtable_at_first_flush
, &ZERO
,
1677 const std::vector
<std::string
> KEYS
= {
1678 "ThisIsKey1", "ThisIsKey2", "ThisIsKey3", "ThisIsKey4", "ThisIsKey5",
1679 "ThisIsKey6", "ThisIsKey7", "ThisIsKey8", "ThisIsKey9"};
1680 const std::string NOT_FOUND
= "NOT_FOUND";
1683 const uint64_t NUM_REPEAT_OVERWRITES
= 100;
1684 const uint64_t NUM_RAND_INSERTS
= 500;
1685 const uint64_t RAND_VALUES_LENGTH
= 10240;
1687 std::string key
, value
;
1688 std::vector
<std::string
> values(9, "");
1690 // Keys used to check that no SST file disappeared.
1691 for (uint64_t k
= 0; k
< 5; k
++) {
1692 values
[k
] = rnd
.RandomString(RAND_VALUES_LENGTH
);
1693 ASSERT_OK(Put(KEYS
[k
], values
[k
]));
1696 // Insertion of of K-V pairs, multiple times.
1697 // Trigger at least one mempurge and no SST file creation.
1698 for (size_t i
= 0; i
< NUM_REPEAT_OVERWRITES
; i
++) {
1699 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
1700 for (uint64_t k
= 5; k
< values
.size(); k
++) {
1701 values
[k
] = rnd
.RandomString(RAND_VALUES_LENGTH
);
1702 ASSERT_OK(Put(KEYS
[k
], values
[k
]));
1704 // Check database consistency.
1705 for (uint64_t k
= 0; k
< values
.size(); k
++) {
1706 ASSERT_EQ(Get(KEYS
[k
]), values
[k
]);
1710 // Check that there was at least one mempurge
1711 uint32_t expected_min_mempurge_count
= 1;
1712 // Check that there was no SST files created during flush.
1713 uint32_t expected_sst_count
= 0;
1714 EXPECT_GE(mempurge_count
.load(), expected_min_mempurge_count
);
1715 EXPECT_EQ(sst_count
.load(), expected_sst_count
);
1717 // Trigger an SST file creation and no mempurge.
1718 for (size_t i
= 0; i
< NUM_RAND_INSERTS
; i
++) {
1719 key
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1720 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
1721 value
= rnd
.RandomString(RAND_VALUES_LENGTH
);
1722 ASSERT_OK(Put(key
, value
));
1723 // Check database consistency.
1724 for (uint64_t k
= 0; k
< values
.size(); k
++) {
1725 ASSERT_EQ(Get(KEYS
[k
]), values
[k
]);
1727 ASSERT_EQ(Get(key
), value
);
1730 // Check that there was at least one SST files created during flush.
1731 expected_sst_count
= 1;
1732 EXPECT_GE(sst_count
.load(), expected_sst_count
);
1734 // Oddly enough, num_memtable_at_first_flush is not enforced to be
1735 // equal to min_write_buffer_number_to_merge. So by asserting that
1736 // the first SST file creation comes from one output memtable
1737 // from a previous mempurge, and one newly sealed memtable. This
1738 // is the scenario where we observed that some SST files created
1739 // were not properly added to the DB version before our bug fix.
1740 ASSERT_GE(num_memtable_at_first_flush
.load(), 2);
1742 // Check that no data was lost after SST file creation.
1743 for (uint64_t k
= 0; k
< values
.size(); k
++) {
1744 ASSERT_EQ(Get(KEYS
[k
]), values
[k
]);
1746 // Extra check of database consistency.
1747 ASSERT_EQ(Get(key
), value
);
1752 TEST_P(DBFlushDirectIOTest
, DirectIO
) {
1754 options
.create_if_missing
= true;
1755 options
.disable_auto_compactions
= true;
1756 options
.max_background_flushes
= 2;
1757 options
.use_direct_io_for_flush_and_compaction
= GetParam();
1758 options
.env
= MockEnv::Create(Env::Default());
1759 SyncPoint::GetInstance()->SetCallBack(
1760 "BuildTable:create_file", [&](void* arg
) {
1761 bool* use_direct_writes
= static_cast<bool*>(arg
);
1762 ASSERT_EQ(*use_direct_writes
,
1763 options
.use_direct_io_for_flush_and_compaction
);
1766 SyncPoint::GetInstance()->EnableProcessing();
1768 ASSERT_OK(Put("foo", "v"));
1769 FlushOptions flush_options
;
1770 flush_options
.wait
= true;
1771 ASSERT_OK(dbfull()->Flush(flush_options
));
1776 TEST_F(DBFlushTest
, FlushError
) {
1778 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
1779 new FaultInjectionTestEnv(env_
));
1780 options
.write_buffer_size
= 100;
1781 options
.max_write_buffer_number
= 4;
1782 options
.min_write_buffer_number_to_merge
= 3;
1783 options
.disable_auto_compactions
= true;
1784 options
.env
= fault_injection_env
.get();
1787 ASSERT_OK(Put("key1", "value1"));
1788 ASSERT_OK(Put("key2", "value2"));
1789 fault_injection_env
->SetFilesystemActive(false);
1790 Status s
= dbfull()->TEST_SwitchMemtable();
1791 fault_injection_env
->SetFilesystemActive(true);
1793 ASSERT_NE(s
, Status::OK());
1796 TEST_F(DBFlushTest
, ManualFlushFailsInReadOnlyMode
) {
1797 // Regression test for bug where manual flush hangs forever when the DB
1798 // is in read-only mode. Verify it now at least returns, despite failing.
1800 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
1801 new FaultInjectionTestEnv(env_
));
1802 options
.env
= fault_injection_env
.get();
1803 options
.max_write_buffer_number
= 2;
1806 // Trigger a first flush but don't let it run
1807 ASSERT_OK(db_
->PauseBackgroundWork());
1808 ASSERT_OK(Put("key1", "value1"));
1809 FlushOptions flush_opts
;
1810 flush_opts
.wait
= false;
1811 ASSERT_OK(db_
->Flush(flush_opts
));
1813 // Write a key to the second memtable so we have something to flush later
1814 // after the DB is in read-only mode.
1815 ASSERT_OK(Put("key2", "value2"));
1817 // Let the first flush continue, hit an error, and put the DB in read-only
1819 fault_injection_env
->SetFilesystemActive(false);
1820 ASSERT_OK(db_
->ContinueBackgroundWork());
1821 // We ingested the error to env, so the returned status is not OK.
1822 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
1823 #ifndef ROCKSDB_LITE
1824 uint64_t num_bg_errors
;
1826 db_
->GetIntProperty(DB::Properties::kBackgroundErrors
, &num_bg_errors
));
1827 ASSERT_GT(num_bg_errors
, 0);
1828 #endif // ROCKSDB_LITE
1830 // In the bug scenario, triggering another flush would cause the second flush
1831 // to hang forever. After the fix we expect it to return an error.
1832 ASSERT_NOK(db_
->Flush(FlushOptions()));
1837 TEST_F(DBFlushTest
, CFDropRaceWithWaitForFlushMemTables
) {
1838 Options options
= CurrentOptions();
1839 options
.create_if_missing
= true;
1840 CreateAndReopenWithCF({"pikachu"}, options
);
1841 SyncPoint::GetInstance()->DisableProcessing();
1842 SyncPoint::GetInstance()->LoadDependency(
1843 {{"DBImpl::FlushMemTable:AfterScheduleFlush",
1844 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
1845 {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
1846 "DBImpl::BackgroundCallFlush:start"},
1847 {"DBImpl::BackgroundCallFlush:start",
1848 "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
1849 SyncPoint::GetInstance()->EnableProcessing();
1850 ASSERT_EQ(2, handles_
.size());
1851 ASSERT_OK(Put(1, "key", "value"));
1852 auto* cfd
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[1])->cfd();
1853 port::Thread
drop_cf_thr([&]() {
1855 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
1856 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
1857 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_
[1]));
1860 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
1862 FlushOptions flush_opts
;
1863 flush_opts
.allow_write_stall
= true;
1864 ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd
, flush_opts
));
1867 SyncPoint::GetInstance()->DisableProcessing();
1870 #ifndef ROCKSDB_LITE
1871 TEST_F(DBFlushTest
, FireOnFlushCompletedAfterCommittedResult
) {
1872 class TestListener
: public EventListener
{
1874 void OnFlushCompleted(DB
* db
, const FlushJobInfo
& info
) override
{
1875 // There's only one key in each flush.
1876 ASSERT_EQ(info
.smallest_seqno
, info
.largest_seqno
);
1877 ASSERT_NE(0, info
.smallest_seqno
);
1878 if (info
.smallest_seqno
== seq1
) {
1879 // First flush completed
1880 ASSERT_FALSE(completed1
);
1882 CheckFlushResultCommitted(db
, seq1
);
1884 // Second flush completed
1885 ASSERT_FALSE(completed2
);
1887 ASSERT_EQ(info
.smallest_seqno
, seq2
);
1888 CheckFlushResultCommitted(db
, seq2
);
1892 void CheckFlushResultCommitted(DB
* db
, SequenceNumber seq
) {
1893 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
);
1894 InstrumentedMutex
* mutex
= db_impl
->mutex();
1896 auto* cfd
= static_cast_with_check
<ColumnFamilyHandleImpl
>(
1897 db
->DefaultColumnFamily())
1899 ASSERT_LT(seq
, cfd
->imm()->current()->GetEarliestSequenceNumber());
1903 std::atomic
<SequenceNumber
> seq1
{0};
1904 std::atomic
<SequenceNumber
> seq2
{0};
1905 std::atomic
<bool> completed1
{false};
1906 std::atomic
<bool> completed2
{false};
1908 std::shared_ptr
<TestListener
> listener
= std::make_shared
<TestListener
>();
1910 SyncPoint::GetInstance()->LoadDependency(
1911 {{"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
1912 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
1913 {"DBImpl::FlushMemTableToOutputFile:Finish",
1914 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
1915 SyncPoint::GetInstance()->SetCallBack(
1916 "FlushJob::WriteLevel0Table", [&listener
](void* arg
) {
1917 // Wait for the second flush finished, out of mutex.
1918 auto* mems
= reinterpret_cast<autovector
<MemTable
*>*>(arg
);
1919 if (mems
->front()->GetEarliestSequenceNumber() == listener
->seq1
- 1) {
1921 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
1926 Options options
= CurrentOptions();
1927 options
.create_if_missing
= true;
1928 options
.listeners
.push_back(listener
);
1929 // Setting max_flush_jobs = max_background_jobs / 4 = 2.
1930 options
.max_background_jobs
= 8;
1931 // Allow 2 immutable memtables.
1932 options
.max_write_buffer_number
= 3;
1934 SyncPoint::GetInstance()->EnableProcessing();
1935 ASSERT_OK(Put("foo", "v"));
1936 listener
->seq1
= db_
->GetLatestSequenceNumber();
1937 // t1 will wait for the second flush complete before committing flush result.
1938 auto t1
= port::Thread([&]() {
1939 // flush_opts.wait = true
1940 ASSERT_OK(db_
->Flush(FlushOptions()));
1942 // Wait for first flush started.
1944 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
1945 // The second flush will exit early without commit its result. The work
1946 // is delegated to the first flush.
1947 ASSERT_OK(Put("bar", "v"));
1948 listener
->seq2
= db_
->GetLatestSequenceNumber();
1949 FlushOptions flush_opts
;
1950 flush_opts
.wait
= false;
1951 ASSERT_OK(db_
->Flush(flush_opts
));
1953 // Ensure background work is fully finished including listener callbacks
1954 // before accessing listener state.
1955 ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
1956 ASSERT_TRUE(listener
->completed1
);
1957 ASSERT_TRUE(listener
->completed2
);
1958 SyncPoint::GetInstance()->DisableProcessing();
1959 SyncPoint::GetInstance()->ClearAllCallBacks();
1961 #endif // !ROCKSDB_LITE
1963 TEST_F(DBFlushTest
, FlushWithBlob
) {
1964 constexpr uint64_t min_blob_size
= 10;
1967 options
.enable_blob_files
= true;
1968 options
.min_blob_size
= min_blob_size
;
1969 options
.disable_auto_compactions
= true;
1974 constexpr char short_value
[] = "short";
1975 static_assert(sizeof(short_value
) - 1 < min_blob_size
,
1976 "short_value too long");
1978 constexpr char long_value
[] = "long_value";
1979 static_assert(sizeof(long_value
) - 1 >= min_blob_size
,
1980 "long_value too short");
1982 ASSERT_OK(Put("key1", short_value
));
1983 ASSERT_OK(Put("key2", long_value
));
1987 ASSERT_EQ(Get("key1"), short_value
);
1988 ASSERT_EQ(Get("key2"), long_value
);
1990 VersionSet
* const versions
= dbfull()->GetVersionSet();
1993 ColumnFamilyData
* const cfd
= versions
->GetColumnFamilySet()->GetDefault();
1996 Version
* const current
= cfd
->current();
1999 const VersionStorageInfo
* const storage_info
= current
->storage_info();
2000 assert(storage_info
);
2002 const auto& l0_files
= storage_info
->LevelFiles(0);
2003 ASSERT_EQ(l0_files
.size(), 1);
2005 const FileMetaData
* const table_file
= l0_files
[0];
2008 const auto& blob_files
= storage_info
->GetBlobFiles();
2009 ASSERT_EQ(blob_files
.size(), 1);
2011 const auto& blob_file
= blob_files
.front();
2014 ASSERT_EQ(table_file
->smallest
.user_key(), "key1");
2015 ASSERT_EQ(table_file
->largest
.user_key(), "key2");
2016 ASSERT_EQ(table_file
->fd
.smallest_seqno
, 1);
2017 ASSERT_EQ(table_file
->fd
.largest_seqno
, 2);
2018 ASSERT_EQ(table_file
->oldest_blob_file_number
,
2019 blob_file
->GetBlobFileNumber());
2021 ASSERT_EQ(blob_file
->GetTotalBlobCount(), 1);
2023 #ifndef ROCKSDB_LITE
2024 const InternalStats
* const internal_stats
= cfd
->internal_stats();
2025 assert(internal_stats
);
2027 const auto& compaction_stats
= internal_stats
->TEST_GetCompactionStats();
2028 ASSERT_FALSE(compaction_stats
.empty());
2029 ASSERT_EQ(compaction_stats
[0].bytes_written
, table_file
->fd
.GetFileSize());
2030 ASSERT_EQ(compaction_stats
[0].bytes_written_blob
,
2031 blob_file
->GetTotalBlobBytes());
2032 ASSERT_EQ(compaction_stats
[0].num_output_files
, 1);
2033 ASSERT_EQ(compaction_stats
[0].num_output_files_blob
, 1);
2035 const uint64_t* const cf_stats_value
= internal_stats
->TEST_GetCFStatsValue();
2036 ASSERT_EQ(cf_stats_value
[InternalStats::BYTES_FLUSHED
],
2037 compaction_stats
[0].bytes_written
+
2038 compaction_stats
[0].bytes_written_blob
);
2039 #endif // ROCKSDB_LITE
2042 TEST_F(DBFlushTest
, FlushWithChecksumHandoff1
) {
2043 if (mem_env_
|| encrypted_env_
) {
2044 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
2047 std::shared_ptr
<FaultInjectionTestFS
> fault_fs(
2048 new FaultInjectionTestFS(FileSystem::Default()));
2049 std::unique_ptr
<Env
> fault_fs_env(NewCompositeEnv(fault_fs
));
2050 Options options
= CurrentOptions();
2051 options
.write_buffer_size
= 100;
2052 options
.max_write_buffer_number
= 4;
2053 options
.min_write_buffer_number_to_merge
= 3;
2054 options
.disable_auto_compactions
= true;
2055 options
.env
= fault_fs_env
.get();
2056 options
.checksum_handoff_file_types
.Add(FileType::kTableFile
);
2059 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kCRC32c
);
2060 ASSERT_OK(Put("key1", "value1"));
2061 ASSERT_OK(Put("key2", "value2"));
2062 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
2064 // The hash does not match, write fails
2065 // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
2066 // Since the file system returns IOStatus::Corruption, it is an
2067 // unrecoverable error.
2068 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2069 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kxxHash
);
2071 ASSERT_OK(Put("key3", "value3"));
2072 ASSERT_OK(Put("key4", "value4"));
2073 SyncPoint::GetInstance()->EnableProcessing();
2075 ASSERT_EQ(s
.severity(),
2076 ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError
);
2077 SyncPoint::GetInstance()->DisableProcessing();
2081 // The file system does not support checksum handoff. The check
2083 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum
);
2084 ASSERT_OK(Put("key5", "value5"));
2085 ASSERT_OK(Put("key6", "value6"));
2086 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
2088 // Each write will be similated as corrupted.
2089 // Since the file system returns IOStatus::Corruption, it is an
2090 // unrecoverable error.
2091 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kCRC32c
);
2092 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2093 fault_fs
->IngestDataCorruptionBeforeWrite();
2095 ASSERT_OK(Put("key7", "value7"));
2096 ASSERT_OK(Put("key8", "value8"));
2097 SyncPoint::GetInstance()->EnableProcessing();
2099 ASSERT_EQ(s
.severity(),
2100 ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError
);
2101 SyncPoint::GetInstance()->DisableProcessing();
2106 TEST_F(DBFlushTest
, FlushWithChecksumHandoff2
) {
2107 if (mem_env_
|| encrypted_env_
) {
2108 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
2111 std::shared_ptr
<FaultInjectionTestFS
> fault_fs(
2112 new FaultInjectionTestFS(FileSystem::Default()));
2113 std::unique_ptr
<Env
> fault_fs_env(NewCompositeEnv(fault_fs
));
2114 Options options
= CurrentOptions();
2115 options
.write_buffer_size
= 100;
2116 options
.max_write_buffer_number
= 4;
2117 options
.min_write_buffer_number_to_merge
= 3;
2118 options
.disable_auto_compactions
= true;
2119 options
.env
= fault_fs_env
.get();
2122 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kCRC32c
);
2123 ASSERT_OK(Put("key1", "value1"));
2124 ASSERT_OK(Put("key2", "value2"));
2127 // options is not set, the checksum handoff will not be triggered
2128 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2129 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kxxHash
);
2131 ASSERT_OK(Put("key3", "value3"));
2132 ASSERT_OK(Put("key4", "value4"));
2133 SyncPoint::GetInstance()->EnableProcessing();
2135 SyncPoint::GetInstance()->DisableProcessing();
2139 // The file system does not support checksum handoff. The check
2141 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum
);
2142 ASSERT_OK(Put("key5", "value5"));
2143 ASSERT_OK(Put("key6", "value6"));
2146 // options is not set, the checksum handoff will not be triggered
2147 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kCRC32c
);
2148 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2149 fault_fs
->IngestDataCorruptionBeforeWrite();
2151 ASSERT_OK(Put("key7", "value7"));
2152 ASSERT_OK(Put("key8", "value8"));
2153 SyncPoint::GetInstance()->EnableProcessing();
2155 SyncPoint::GetInstance()->DisableProcessing();
2160 TEST_F(DBFlushTest
, FlushWithChecksumHandoffManifest1
) {
2161 if (mem_env_
|| encrypted_env_
) {
2162 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
2165 std::shared_ptr
<FaultInjectionTestFS
> fault_fs(
2166 new FaultInjectionTestFS(FileSystem::Default()));
2167 std::unique_ptr
<Env
> fault_fs_env(NewCompositeEnv(fault_fs
));
2168 Options options
= CurrentOptions();
2169 options
.write_buffer_size
= 100;
2170 options
.max_write_buffer_number
= 4;
2171 options
.min_write_buffer_number_to_merge
= 3;
2172 options
.disable_auto_compactions
= true;
2173 options
.env
= fault_fs_env
.get();
2174 options
.checksum_handoff_file_types
.Add(FileType::kDescriptorFile
);
2175 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kCRC32c
);
2178 ASSERT_OK(Put("key1", "value1"));
2179 ASSERT_OK(Put("key2", "value2"));
2182 // The hash does not match, write fails
2183 // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
2184 // Since the file system returns IOStatus::Corruption, it is mapped to
2185 // kFatalError error.
2186 ASSERT_OK(Put("key3", "value3"));
2187 SyncPoint::GetInstance()->SetCallBack(
2188 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
2189 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kxxHash
);
2191 ASSERT_OK(Put("key3", "value3"));
2192 ASSERT_OK(Put("key4", "value4"));
2193 SyncPoint::GetInstance()->EnableProcessing();
2195 ASSERT_EQ(s
.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError
);
2196 SyncPoint::GetInstance()->DisableProcessing();
2200 TEST_F(DBFlushTest
, FlushWithChecksumHandoffManifest2
) {
2201 if (mem_env_
|| encrypted_env_
) {
2202 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
2205 std::shared_ptr
<FaultInjectionTestFS
> fault_fs(
2206 new FaultInjectionTestFS(FileSystem::Default()));
2207 std::unique_ptr
<Env
> fault_fs_env(NewCompositeEnv(fault_fs
));
2208 Options options
= CurrentOptions();
2209 options
.write_buffer_size
= 100;
2210 options
.max_write_buffer_number
= 4;
2211 options
.min_write_buffer_number_to_merge
= 3;
2212 options
.disable_auto_compactions
= true;
2213 options
.env
= fault_fs_env
.get();
2214 options
.checksum_handoff_file_types
.Add(FileType::kDescriptorFile
);
2215 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum
);
2217 // The file system does not support checksum handoff. The check
2219 ASSERT_OK(Put("key5", "value5"));
2220 ASSERT_OK(Put("key6", "value6"));
2223 // Each write will be similated as corrupted.
2224 // Since the file system returns IOStatus::Corruption, it is mapped to
2225 // kFatalError error.
2226 fault_fs
->SetChecksumHandoffFuncType(ChecksumType::kCRC32c
);
2227 SyncPoint::GetInstance()->SetCallBack(
2228 "VersionSet::LogAndApply:WriteManifest",
2229 [&](void*) { fault_fs
->IngestDataCorruptionBeforeWrite(); });
2230 ASSERT_OK(Put("key7", "value7"));
2231 ASSERT_OK(Put("key8", "value8"));
2232 SyncPoint::GetInstance()->EnableProcessing();
2234 ASSERT_EQ(s
.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError
);
2235 SyncPoint::GetInstance()->DisableProcessing();
2240 TEST_F(DBFlushTest
, PickRightMemtables
) {
2241 Options options
= CurrentOptions();
2242 DestroyAndReopen(options
);
2243 options
.create_if_missing
= true;
2245 const std::string test_cf_name
= "test_cf";
2246 options
.max_write_buffer_number
= 128;
2247 CreateColumnFamilies({test_cf_name
}, options
);
2251 ReopenWithColumnFamilies({kDefaultColumnFamilyName
, test_cf_name
}, options
);
2253 ASSERT_OK(db_
->Put(WriteOptions(), "key", "value"));
2255 ASSERT_OK(db_
->Put(WriteOptions(), handles_
[1], "key", "value"));
2257 SyncPoint::GetInstance()->DisableProcessing();
2258 SyncPoint::GetInstance()->ClearAllCallBacks();
2259 SyncPoint::GetInstance()->SetCallBack(
2260 "DBImpl::SyncClosedLogs:BeforeReLock", [&](void* /*arg*/) {
2261 ASSERT_OK(db_
->Put(WriteOptions(), handles_
[1], "what", "v"));
2263 static_cast_with_check
<ColumnFamilyHandleImpl
>(handles_
[1]);
2265 ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfhi
->cfd()));
2267 SyncPoint::GetInstance()->SetCallBack(
2268 "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", [&](void* arg
) {
2269 auto* job
= reinterpret_cast<FlushJob
*>(arg
);
2271 const auto& mems
= job
->GetMemTables();
2272 assert(mems
.size() == 1);
2274 ASSERT_EQ(1, mems
[0]->GetID());
2276 SyncPoint::GetInstance()->EnableProcessing();
2278 ASSERT_OK(db_
->Flush(FlushOptions(), handles_
[1]));
2280 SyncPoint::GetInstance()->DisableProcessing();
2281 SyncPoint::GetInstance()->ClearAllCallBacks();
2284 class DBFlushTestBlobError
: public DBFlushTest
,
2285 public testing::WithParamInterface
<std::string
> {
2287 DBFlushTestBlobError() : sync_point_(GetParam()) {}
2289 std::string sync_point_
;
2292 INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError
, DBFlushTestBlobError
,
2293 ::testing::ValuesIn(std::vector
<std::string
>{
2294 "BlobFileBuilder::WriteBlobToFile:AddRecord",
2295 "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
2297 TEST_P(DBFlushTestBlobError
, FlushError
) {
2299 options
.enable_blob_files
= true;
2300 options
.disable_auto_compactions
= true;
2305 ASSERT_OK(Put("key", "blob"));
2307 SyncPoint::GetInstance()->SetCallBack(sync_point_
, [this](void* arg
) {
2308 Status
* const s
= static_cast<Status
*>(arg
);
2311 (*s
) = Status::IOError(sync_point_
);
2313 SyncPoint::GetInstance()->EnableProcessing();
2315 ASSERT_NOK(Flush());
2317 SyncPoint::GetInstance()->DisableProcessing();
2318 SyncPoint::GetInstance()->ClearAllCallBacks();
2320 VersionSet
* const versions
= dbfull()->GetVersionSet();
2323 ColumnFamilyData
* const cfd
= versions
->GetColumnFamilySet()->GetDefault();
2326 Version
* const current
= cfd
->current();
2329 const VersionStorageInfo
* const storage_info
= current
->storage_info();
2330 assert(storage_info
);
2332 const auto& l0_files
= storage_info
->LevelFiles(0);
2333 ASSERT_TRUE(l0_files
.empty());
2335 const auto& blob_files
= storage_info
->GetBlobFiles();
2336 ASSERT_TRUE(blob_files
.empty());
2338 // Make sure the files generated by the failed job have been deleted
2339 std::vector
<std::string
> files
;
2340 ASSERT_OK(env_
->GetChildren(dbname_
, &files
));
2341 for (const auto& file
: files
) {
2342 uint64_t number
= 0;
2343 FileType type
= kTableFile
;
2345 if (!ParseFileName(file
, &number
, &type
)) {
2349 ASSERT_NE(type
, kTableFile
);
2350 ASSERT_NE(type
, kBlobFile
);
2353 #ifndef ROCKSDB_LITE
2354 const InternalStats
* const internal_stats
= cfd
->internal_stats();
2355 assert(internal_stats
);
2357 const auto& compaction_stats
= internal_stats
->TEST_GetCompactionStats();
2358 ASSERT_FALSE(compaction_stats
.empty());
2360 if (sync_point_
== "BlobFileBuilder::WriteBlobToFile:AddRecord") {
2361 ASSERT_EQ(compaction_stats
[0].bytes_written
, 0);
2362 ASSERT_EQ(compaction_stats
[0].bytes_written_blob
, 0);
2363 ASSERT_EQ(compaction_stats
[0].num_output_files
, 0);
2364 ASSERT_EQ(compaction_stats
[0].num_output_files_blob
, 0);
2366 // SST file writing succeeded; blob file writing failed (during Finish)
2367 ASSERT_GT(compaction_stats
[0].bytes_written
, 0);
2368 ASSERT_EQ(compaction_stats
[0].bytes_written_blob
, 0);
2369 ASSERT_EQ(compaction_stats
[0].num_output_files
, 1);
2370 ASSERT_EQ(compaction_stats
[0].num_output_files_blob
, 0);
2373 const uint64_t* const cf_stats_value
= internal_stats
->TEST_GetCFStatsValue();
2374 ASSERT_EQ(cf_stats_value
[InternalStats::BYTES_FLUSHED
],
2375 compaction_stats
[0].bytes_written
+
2376 compaction_stats
[0].bytes_written_blob
);
2377 #endif // ROCKSDB_LITE
2380 #ifndef ROCKSDB_LITE
2381 TEST_F(DBFlushTest
, TombstoneVisibleInSnapshot
) {
2382 class SimpleTestFlushListener
: public EventListener
{
2384 explicit SimpleTestFlushListener(DBFlushTest
* _test
) : test_(_test
) {}
2385 ~SimpleTestFlushListener() override
{}
2387 void OnFlushBegin(DB
* db
, const FlushJobInfo
& info
) override
{
2388 ASSERT_EQ(static_cast<uint32_t>(0), info
.cf_id
);
2390 ASSERT_OK(db
->Delete(WriteOptions(), "foo"));
2391 snapshot_
= db
->GetSnapshot();
2392 ASSERT_OK(db
->Put(WriteOptions(), "foo", "value"));
2394 auto* dbimpl
= static_cast_with_check
<DBImpl
>(db
);
2397 ColumnFamilyHandle
* cfh
= db
->DefaultColumnFamily();
2398 auto* cfhi
= static_cast_with_check
<ColumnFamilyHandleImpl
>(cfh
);
2400 ASSERT_OK(dbimpl
->TEST_SwitchMemtable(cfhi
->cfd()));
2403 DBFlushTest
* test_
= nullptr;
2404 const Snapshot
* snapshot_
= nullptr;
2407 Options options
= CurrentOptions();
2408 options
.create_if_missing
= true;
2409 auto* listener
= new SimpleTestFlushListener(this);
2410 options
.listeners
.emplace_back(listener
);
2411 DestroyAndReopen(options
);
2413 ASSERT_OK(db_
->Put(WriteOptions(), "foo", "value0"));
2415 ManagedSnapshot
snapshot_guard(db_
);
2417 ColumnFamilyHandle
* default_cf
= db_
->DefaultColumnFamily();
2418 ASSERT_OK(db_
->Flush(FlushOptions(), default_cf
));
2420 const Snapshot
* snapshot
= listener
->snapshot_
;
2423 ReadOptions read_opts
;
2424 read_opts
.snapshot
= snapshot
;
2426 // Using snapshot should not see "foo".
2429 Status s
= db_
->Get(read_opts
, "foo", &value
);
2430 ASSERT_TRUE(s
.IsNotFound());
2433 db_
->ReleaseSnapshot(snapshot
);
2436 TEST_P(DBAtomicFlushTest
, ManualFlushUnder2PC
) {
2437 Options options
= CurrentOptions();
2438 options
.create_if_missing
= true;
2439 options
.allow_2pc
= true;
2440 options
.atomic_flush
= GetParam();
2441 // 64MB so that memtable flush won't be trigger by the small writes.
2442 options
.write_buffer_size
= (static_cast<size_t>(64) << 20);
2444 // Destroy the DB to recreate as a TransactionDB.
2446 Destroy(options
, true);
2448 // Create a TransactionDB.
2449 TransactionDB
* txn_db
= nullptr;
2450 TransactionDBOptions txn_db_opts
;
2451 txn_db_opts
.write_policy
= TxnDBWritePolicy::WRITE_COMMITTED
;
2452 ASSERT_OK(TransactionDB::Open(options
, txn_db_opts
, dbname_
, &txn_db
));
2453 ASSERT_NE(txn_db
, nullptr);
2456 // Create two more columns other than default CF.
2457 std::vector
<std::string
> cfs
= {"puppy", "kitty"};
2458 CreateColumnFamilies(cfs
, options
);
2459 ASSERT_EQ(handles_
.size(), 2);
2460 ASSERT_EQ(handles_
[0]->GetName(), cfs
[0]);
2461 ASSERT_EQ(handles_
[1]->GetName(), cfs
[1]);
2462 const size_t kNumCfToFlush
= options
.atomic_flush
? 2 : 1;
2465 TransactionOptions txn_opts
;
2466 // txn1 only prepare, but does not commit.
2467 // The WAL containing the prepared but uncommitted data must be kept.
2468 Transaction
* txn1
= txn_db
->BeginTransaction(wopts
, txn_opts
, nullptr);
2469 // txn2 not only prepare, but also commit.
2470 Transaction
* txn2
= txn_db
->BeginTransaction(wopts
, txn_opts
, nullptr);
2471 ASSERT_NE(txn1
, nullptr);
2472 ASSERT_NE(txn2
, nullptr);
2473 for (size_t i
= 0; i
< kNumCfToFlush
; i
++) {
2474 ASSERT_OK(txn1
->Put(handles_
[i
], "k1", "v1"));
2475 ASSERT_OK(txn2
->Put(handles_
[i
], "k2", "v2"));
2477 // A txn must be named before prepare.
2478 ASSERT_OK(txn1
->SetName("txn1"));
2479 ASSERT_OK(txn2
->SetName("txn2"));
2480 // Prepare writes to WAL, but not to memtable. (WriteCommitted)
2481 ASSERT_OK(txn1
->Prepare());
2482 ASSERT_OK(txn2
->Prepare());
2483 // Commit writes to memtable.
2484 ASSERT_OK(txn2
->Commit());
2488 // There are still data in memtable not flushed.
2489 // But since data is small enough to reside in the active memtable,
2490 // there are no immutable memtable.
2491 for (size_t i
= 0; i
< kNumCfToFlush
; i
++) {
2492 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
2493 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
2494 ASSERT_FALSE(cfh
->cfd()->mem()->IsEmpty());
2497 // Atomic flush memtables,
2498 // the min log with prepared data should be written to MANIFEST.
2499 std::vector
<ColumnFamilyHandle
*> cfs_to_flush(kNumCfToFlush
);
2500 for (size_t i
= 0; i
< kNumCfToFlush
; i
++) {
2501 cfs_to_flush
[i
] = handles_
[i
];
2503 ASSERT_OK(txn_db
->Flush(FlushOptions(), cfs_to_flush
));
2505 // There are no remaining data in memtable after flush.
2506 for (size_t i
= 0; i
< kNumCfToFlush
; i
++) {
2507 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
2508 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
2509 ASSERT_TRUE(cfh
->cfd()->mem()->IsEmpty());
2510 ASSERT_EQ(cfh
->cfd()->GetFlushReason(), FlushReason::kManualFlush
);
2513 // The recovered min log number with prepared data should be non-zero.
2514 // In 2pc mode, MinLogNumberToKeep returns the
2515 // VersionSet::min_log_number_to_keep recovered from MANIFEST, if it's 0,
2516 // it means atomic flush didn't write the min_log_number_to_keep to MANIFEST.
2517 cfs
.push_back(kDefaultColumnFamilyName
);
2518 ASSERT_OK(TryReopenWithColumnFamilies(cfs
, options
));
2519 DBImpl
* db_impl
= reinterpret_cast<DBImpl
*>(db_
);
2520 ASSERT_TRUE(db_impl
->allow_2pc());
2521 ASSERT_NE(db_impl
->MinLogNumberToKeep(), 0);
2523 #endif // ROCKSDB_LITE
2525 TEST_P(DBAtomicFlushTest
, ManualAtomicFlush
) {
2526 Options options
= CurrentOptions();
2527 options
.create_if_missing
= true;
2528 options
.atomic_flush
= GetParam();
2529 options
.write_buffer_size
= (static_cast<size_t>(64) << 20);
2531 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
2532 size_t num_cfs
= handles_
.size();
2533 ASSERT_EQ(3, num_cfs
);
2535 wopts
.disableWAL
= true;
2536 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2537 ASSERT_OK(Put(static_cast<int>(i
) /*cf*/, "key", "value", wopts
));
2540 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2541 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
2542 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
2543 ASSERT_FALSE(cfh
->cfd()->mem()->IsEmpty());
2546 std::vector
<int> cf_ids
;
2547 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2548 cf_ids
.emplace_back(static_cast<int>(i
));
2550 ASSERT_OK(Flush(cf_ids
));
2552 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2553 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
2554 ASSERT_EQ(cfh
->cfd()->GetFlushReason(), FlushReason::kManualFlush
);
2555 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
2556 ASSERT_TRUE(cfh
->cfd()->mem()->IsEmpty());
2560 TEST_P(DBAtomicFlushTest
, PrecomputeMinLogNumberToKeepNon2PC
) {
2561 Options options
= CurrentOptions();
2562 options
.create_if_missing
= true;
2563 options
.atomic_flush
= GetParam();
2564 options
.write_buffer_size
= (static_cast<size_t>(64) << 20);
2565 CreateAndReopenWithCF({"pikachu"}, options
);
2567 const size_t num_cfs
= handles_
.size();
2568 ASSERT_EQ(num_cfs
, 2);
2570 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2571 ASSERT_OK(Put(static_cast<int>(i
) /*cf*/, "key", "value", wopts
));
2575 // Flush the default CF only.
2576 std::vector
<int> cf_ids
{0};
2577 ASSERT_OK(Flush(cf_ids
));
2579 autovector
<ColumnFamilyData
*> flushed_cfds
;
2580 autovector
<autovector
<VersionEdit
*>> flush_edits
;
2581 auto flushed_cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[0]);
2582 flushed_cfds
.push_back(flushed_cfh
->cfd());
2583 flush_edits
.push_back({});
2584 auto unflushed_cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[1]);
2586 ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
2587 flushed_cfds
, flush_edits
),
2588 unflushed_cfh
->cfd()->GetLogNumber());
2593 std::vector
<int> cf_ids
;
2594 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2595 cf_ids
.emplace_back(static_cast<int>(i
));
2597 ASSERT_OK(Flush(cf_ids
));
2598 uint64_t log_num_after_flush
= dbfull()->TEST_GetCurrentLogNumber();
2600 uint64_t min_log_number_to_keep
= std::numeric_limits
<uint64_t>::max();
2601 autovector
<ColumnFamilyData
*> flushed_cfds
;
2602 autovector
<autovector
<VersionEdit
*>> flush_edits
;
2603 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2604 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
2605 flushed_cfds
.push_back(cfh
->cfd());
2606 flush_edits
.push_back({});
2607 min_log_number_to_keep
=
2608 std::min(min_log_number_to_keep
, cfh
->cfd()->GetLogNumber());
2610 ASSERT_EQ(min_log_number_to_keep
, log_num_after_flush
);
2611 ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
2612 flushed_cfds
, flush_edits
),
2613 min_log_number_to_keep
);
2617 TEST_P(DBAtomicFlushTest
, AtomicFlushTriggeredByMemTableFull
) {
2618 Options options
= CurrentOptions();
2619 options
.create_if_missing
= true;
2620 options
.atomic_flush
= GetParam();
2621 // 4KB so that we can easily trigger auto flush.
2622 options
.write_buffer_size
= 4096;
2624 SyncPoint::GetInstance()->LoadDependency(
2625 {{"DBImpl::BackgroundCallFlush:FlushFinish:0",
2626 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
2627 SyncPoint::GetInstance()->EnableProcessing();
2629 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
2630 size_t num_cfs
= handles_
.size();
2631 ASSERT_EQ(3, num_cfs
);
2633 wopts
.disableWAL
= true;
2634 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2635 ASSERT_OK(Put(static_cast<int>(i
) /*cf*/, "key", "value", wopts
));
2637 // Keep writing to one of them column families to trigger auto flush.
2638 for (int i
= 0; i
!= 4000; ++i
) {
2639 ASSERT_OK(Put(static_cast<int>(num_cfs
) - 1 /*cf*/,
2640 "key" + std::to_string(i
), "value" + std::to_string(i
),
2645 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
2646 if (options
.atomic_flush
) {
2647 for (size_t i
= 0; i
+ 1 != num_cfs
; ++i
) {
2648 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
2649 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
2650 ASSERT_TRUE(cfh
->cfd()->mem()->IsEmpty());
2653 for (size_t i
= 0; i
+ 1 != num_cfs
; ++i
) {
2654 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
2655 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
2656 ASSERT_FALSE(cfh
->cfd()->mem()->IsEmpty());
2659 SyncPoint::GetInstance()->DisableProcessing();
2662 TEST_P(DBAtomicFlushTest
, AtomicFlushRollbackSomeJobs
) {
2663 bool atomic_flush
= GetParam();
2664 if (!atomic_flush
) {
2667 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
2668 new FaultInjectionTestEnv(env_
));
2669 Options options
= CurrentOptions();
2670 options
.create_if_missing
= true;
2671 options
.atomic_flush
= atomic_flush
;
2672 options
.env
= fault_injection_env
.get();
2673 SyncPoint::GetInstance()->DisableProcessing();
2674 SyncPoint::GetInstance()->LoadDependency(
2675 {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
2676 "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
2677 {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
2678 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
2679 SyncPoint::GetInstance()->EnableProcessing();
2681 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
2682 size_t num_cfs
= handles_
.size();
2683 ASSERT_EQ(3, num_cfs
);
2685 wopts
.disableWAL
= true;
2686 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2687 int cf_id
= static_cast<int>(i
);
2688 ASSERT_OK(Put(cf_id
, "key", "value", wopts
));
2690 FlushOptions flush_opts
;
2691 flush_opts
.wait
= false;
2692 ASSERT_OK(dbfull()->Flush(flush_opts
, handles_
));
2693 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
2694 fault_injection_env
->SetFilesystemActive(false);
2695 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
2696 for (auto* cfh
: handles_
) {
2697 // Returns the IO error happend during flush.
2698 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable(cfh
));
2700 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2701 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
2702 ASSERT_EQ(1, cfh
->cfd()->imm()->NumNotFlushed());
2703 ASSERT_TRUE(cfh
->cfd()->mem()->IsEmpty());
2705 fault_injection_env
->SetFilesystemActive(true);
2709 TEST_P(DBAtomicFlushTest
, FlushMultipleCFs_DropSomeBeforeRequestFlush
) {
2710 bool atomic_flush
= GetParam();
2711 if (!atomic_flush
) {
2714 Options options
= CurrentOptions();
2715 options
.create_if_missing
= true;
2716 options
.atomic_flush
= atomic_flush
;
2717 SyncPoint::GetInstance()->DisableProcessing();
2718 SyncPoint::GetInstance()->ClearAllCallBacks();
2719 SyncPoint::GetInstance()->EnableProcessing();
2721 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
2722 size_t num_cfs
= handles_
.size();
2723 ASSERT_EQ(3, num_cfs
);
2725 wopts
.disableWAL
= true;
2726 std::vector
<int> cf_ids
;
2727 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2728 int cf_id
= static_cast<int>(i
);
2729 ASSERT_OK(Put(cf_id
, "key", "value", wopts
));
2730 cf_ids
.push_back(cf_id
);
2732 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
2733 ASSERT_TRUE(Flush(cf_ids
).IsColumnFamilyDropped());
2737 TEST_P(DBAtomicFlushTest
,
2738 FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun
) {
2739 bool atomic_flush
= GetParam();
2740 if (!atomic_flush
) {
2743 Options options
= CurrentOptions();
2744 options
.create_if_missing
= true;
2745 options
.atomic_flush
= atomic_flush
;
2747 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
2749 SyncPoint::GetInstance()->DisableProcessing();
2750 SyncPoint::GetInstance()->ClearAllCallBacks();
2751 SyncPoint::GetInstance()->LoadDependency(
2752 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
2753 "DBAtomicFlushTest::BeforeDropCF"},
2754 {"DBAtomicFlushTest::AfterDropCF",
2755 "DBImpl::BackgroundCallFlush:start"}});
2756 SyncPoint::GetInstance()->EnableProcessing();
2758 size_t num_cfs
= handles_
.size();
2759 ASSERT_EQ(3, num_cfs
);
2761 wopts
.disableWAL
= true;
2762 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2763 int cf_id
= static_cast<int>(i
);
2764 ASSERT_OK(Put(cf_id
, "key", "value", wopts
));
2766 port::Thread
user_thread([&]() {
2767 TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
2768 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
2769 TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
2771 FlushOptions flush_opts
;
2772 flush_opts
.wait
= true;
2773 ASSERT_OK(dbfull()->Flush(flush_opts
, handles_
));
2775 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2776 int cf_id
= static_cast<int>(i
);
2777 ASSERT_EQ("value", Get(cf_id
, "key"));
2780 ReopenWithColumnFamilies({kDefaultColumnFamilyName
, "eevee"}, options
);
2781 num_cfs
= handles_
.size();
2782 ASSERT_EQ(2, num_cfs
);
2783 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
2784 int cf_id
= static_cast<int>(i
);
2785 ASSERT_EQ("value", Get(cf_id
, "key"));
2790 TEST_P(DBAtomicFlushTest
, TriggerFlushAndClose
) {
2791 bool atomic_flush
= GetParam();
2792 if (!atomic_flush
) {
2795 const int kNumKeysTriggerFlush
= 4;
2796 Options options
= CurrentOptions();
2797 options
.create_if_missing
= true;
2798 options
.atomic_flush
= atomic_flush
;
2799 options
.memtable_factory
.reset(
2800 test::NewSpecialSkipListFactory(kNumKeysTriggerFlush
));
2801 CreateAndReopenWithCF({"pikachu"}, options
);
2803 for (int i
= 0; i
!= kNumKeysTriggerFlush
; ++i
) {
2804 ASSERT_OK(Put(0, "key" + std::to_string(i
), "value" + std::to_string(i
)));
2806 SyncPoint::GetInstance()->DisableProcessing();
2807 SyncPoint::GetInstance()->ClearAllCallBacks();
2808 SyncPoint::GetInstance()->EnableProcessing();
2809 ASSERT_OK(Put(0, "key", "value"));
2812 ReopenWithColumnFamilies({kDefaultColumnFamilyName
, "pikachu"}, options
);
2813 ASSERT_EQ("value", Get(0, "key"));
2816 TEST_P(DBAtomicFlushTest
, PickMemtablesRaceWithBackgroundFlush
) {
2817 bool atomic_flush
= GetParam();
2818 Options options
= CurrentOptions();
2819 options
.create_if_missing
= true;
2820 options
.atomic_flush
= atomic_flush
;
2821 options
.max_write_buffer_number
= 4;
2822 // Set min_write_buffer_number_to_merge to be greater than 1, so that
2823 // a column family with one memtable in the imm will not cause IsFlushPending
2824 // to return true when flush_requested_ is false.
2825 options
.min_write_buffer_number_to_merge
= 2;
2826 CreateAndReopenWithCF({"pikachu"}, options
);
2827 ASSERT_EQ(2, handles_
.size());
2828 ASSERT_OK(dbfull()->PauseBackgroundWork());
2829 ASSERT_OK(Put(0, "key00", "value00"));
2830 ASSERT_OK(Put(1, "key10", "value10"));
2831 FlushOptions flush_opts
;
2832 flush_opts
.wait
= false;
2833 ASSERT_OK(dbfull()->Flush(flush_opts
, handles_
));
2834 ASSERT_OK(Put(0, "key01", "value01"));
2835 // Since max_write_buffer_number is 4, the following flush won't cause write
2837 ASSERT_OK(dbfull()->Flush(flush_opts
));
2838 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
2839 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_
[1]));
2840 handles_
[1] = nullptr;
2841 ASSERT_OK(dbfull()->ContinueBackgroundWork());
2842 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_
[0]));
2847 TEST_P(DBAtomicFlushTest
, CFDropRaceWithWaitForFlushMemTables
) {
2848 bool atomic_flush
= GetParam();
2849 if (!atomic_flush
) {
2852 Options options
= CurrentOptions();
2853 options
.create_if_missing
= true;
2854 options
.atomic_flush
= atomic_flush
;
2855 CreateAndReopenWithCF({"pikachu"}, options
);
2856 SyncPoint::GetInstance()->DisableProcessing();
2857 SyncPoint::GetInstance()->LoadDependency(
2858 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
2859 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
2860 {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
2861 "DBImpl::BackgroundCallFlush:start"},
2862 {"DBImpl::BackgroundCallFlush:start",
2863 "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
2864 SyncPoint::GetInstance()->EnableProcessing();
2865 ASSERT_EQ(2, handles_
.size());
2866 ASSERT_OK(Put(0, "key", "value"));
2867 ASSERT_OK(Put(1, "key", "value"));
2869 static_cast<ColumnFamilyHandleImpl
*>(dbfull()->DefaultColumnFamily())
2871 auto* cfd_pikachu
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[1])->cfd();
2872 port::Thread
drop_cf_thr([&]() {
2874 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
2875 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
2879 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
2881 FlushOptions flush_opts
;
2882 flush_opts
.allow_write_stall
= true;
2883 ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default
, cfd_pikachu
},
2887 SyncPoint::GetInstance()->DisableProcessing();
2890 TEST_P(DBAtomicFlushTest
, RollbackAfterFailToInstallResults
) {
2891 bool atomic_flush
= GetParam();
2892 if (!atomic_flush
) {
2895 auto fault_injection_env
= std::make_shared
<FaultInjectionTestEnv
>(env_
);
2896 Options options
= CurrentOptions();
2897 options
.env
= fault_injection_env
.get();
2898 options
.create_if_missing
= true;
2899 options
.atomic_flush
= atomic_flush
;
2900 CreateAndReopenWithCF({"pikachu"}, options
);
2901 ASSERT_EQ(2, handles_
.size());
2902 for (size_t cf
= 0; cf
< handles_
.size(); ++cf
) {
2903 ASSERT_OK(Put(static_cast<int>(cf
), "a", "value"));
2905 SyncPoint::GetInstance()->DisableProcessing();
2906 SyncPoint::GetInstance()->ClearAllCallBacks();
2907 SyncPoint::GetInstance()->SetCallBack(
2908 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
2909 [&](void* /*arg*/) { fault_injection_env
->SetFilesystemActive(false); });
2910 SyncPoint::GetInstance()->EnableProcessing();
2911 FlushOptions flush_opts
;
2912 Status s
= db_
->Flush(flush_opts
, handles_
);
2914 fault_injection_env
->SetFilesystemActive(true);
2916 SyncPoint::GetInstance()->ClearAllCallBacks();
2919 // In atomic flush, concurrent bg flush threads commit to the MANIFEST in
2920 // serial, in the order of their picked memtables for each column family.
2921 // Only when a bg flush thread finds out that its memtables are the earliest
2922 // unflushed ones for all the included column families will this bg flush
2923 // thread continue to commit to MANIFEST.
2924 // This unit test uses sync point to coordinate the execution of two bg threads
2925 // executing the same sequence of functions. The interleaving are as follows.
2927 // | pick memtables to flush
2928 // | flush memtables cf1_m1, cf2_m1
2929 // | join MANIFEST write queue
2930 // | pick memtabls to flush
2931 // | flush memtables cf1_(m1+1)
2932 // | join MANIFEST write queue
2933 // | wait to write MANIFEST
2936 // | detect IO error and stop waiting
2938 TEST_P(DBAtomicFlushTest
, BgThreadNoWaitAfterManifestError
) {
2939 bool atomic_flush
= GetParam();
2940 if (!atomic_flush
) {
2943 auto fault_injection_env
= std::make_shared
<FaultInjectionTestEnv
>(env_
);
2944 Options options
= GetDefaultOptions();
2945 options
.create_if_missing
= true;
2946 options
.atomic_flush
= true;
2947 options
.env
= fault_injection_env
.get();
2948 // Set a larger value than default so that RocksDB can schedule concurrent
2949 // background flush threads.
2950 options
.max_background_jobs
= 8;
2951 options
.max_write_buffer_number
= 8;
2952 CreateAndReopenWithCF({"pikachu"}, options
);
2954 assert(2 == handles_
.size());
2956 WriteOptions write_opts
;
2957 write_opts
.disableWAL
= true;
2959 ASSERT_OK(Put(0, "a", "v_0_a", write_opts
));
2960 ASSERT_OK(Put(1, "a", "v_1_a", write_opts
));
2962 SyncPoint::GetInstance()->DisableProcessing();
2963 SyncPoint::GetInstance()->ClearAllCallBacks();
2965 SyncPoint::GetInstance()->LoadDependency({
2966 {"BgFlushThr2:WaitToCommit", "BgFlushThr1:BeforeWriteManifest"},
2969 std::thread::id bg_flush_thr1
, bg_flush_thr2
;
2970 SyncPoint::GetInstance()->SetCallBack(
2971 "DBImpl::BackgroundCallFlush:start", [&](void*) {
2972 if (bg_flush_thr1
== std::thread::id()) {
2973 bg_flush_thr1
= std::this_thread::get_id();
2974 } else if (bg_flush_thr2
== std::thread::id()) {
2975 bg_flush_thr2
= std::this_thread::get_id();
2980 SyncPoint::GetInstance()->SetCallBack(
2981 "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg
) {
2982 if (std::this_thread::get_id() == bg_flush_thr2
) {
2983 const auto* ptr
= reinterpret_cast<std::pair
<Status
, bool>*>(arg
);
2986 // When bg flush thread 2 reaches here for the first time.
2987 ASSERT_OK(ptr
->first
);
2988 ASSERT_TRUE(ptr
->second
);
2989 } else if (1 == called
) {
2990 // When bg flush thread 2 reaches here for the second time.
2991 ASSERT_TRUE(ptr
->first
.IsIOError());
2992 ASSERT_FALSE(ptr
->second
);
2995 TEST_SYNC_POINT("BgFlushThr2:WaitToCommit");
2999 SyncPoint::GetInstance()->SetCallBack(
3000 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
3002 if (std::this_thread::get_id() == bg_flush_thr1
) {
3003 TEST_SYNC_POINT("BgFlushThr1:BeforeWriteManifest");
3007 SyncPoint::GetInstance()->SetCallBack(
3008 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
3009 if (std::this_thread::get_id() != bg_flush_thr1
) {
3012 ASSERT_OK(db_
->Put(write_opts
, "b", "v_1_b"));
3014 FlushOptions flush_opts
;
3015 flush_opts
.wait
= false;
3016 std::vector
<ColumnFamilyHandle
*> cfhs(1, db_
->DefaultColumnFamily());
3017 ASSERT_OK(dbfull()->Flush(flush_opts
, cfhs
));
3020 SyncPoint::GetInstance()->SetCallBack(
3021 "VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg
) {
3022 auto* ptr
= reinterpret_cast<IOStatus
*>(arg
);
3024 *ptr
= IOStatus::IOError("Injected failure");
3026 SyncPoint::GetInstance()->EnableProcessing();
3028 ASSERT_TRUE(dbfull()->Flush(FlushOptions(), handles_
).IsIOError());
3031 SyncPoint::GetInstance()->DisableProcessing();
3032 SyncPoint::GetInstance()->ClearAllCallBacks();
3035 TEST_P(DBAtomicFlushTest
, NoWaitWhenWritesStopped
) {
3036 Options options
= GetDefaultOptions();
3037 options
.create_if_missing
= true;
3038 options
.atomic_flush
= GetParam();
3039 options
.max_write_buffer_number
= 2;
3040 options
.memtable_factory
.reset(test::NewSpecialSkipListFactory(1));
3044 SyncPoint::GetInstance()->DisableProcessing();
3045 SyncPoint::GetInstance()->LoadDependency(
3046 {{"DBImpl::DelayWrite:Start",
3047 "DBAtomicFlushTest::NoWaitWhenWritesStopped:0"}});
3048 SyncPoint::GetInstance()->EnableProcessing();
3050 ASSERT_OK(dbfull()->PauseBackgroundWork());
3051 for (int i
= 0; i
< options
.max_write_buffer_number
; ++i
) {
3052 ASSERT_OK(Put("k" + std::to_string(i
), "v" + std::to_string(i
)));
3054 std::thread
stalled_writer([&]() { ASSERT_OK(Put("k", "v")); });
3056 TEST_SYNC_POINT("DBAtomicFlushTest::NoWaitWhenWritesStopped:0");
3059 FlushOptions flush_opts
;
3060 flush_opts
.wait
= false;
3061 flush_opts
.allow_write_stall
= true;
3062 ASSERT_TRUE(db_
->Flush(flush_opts
).IsTryAgain());
3065 ASSERT_OK(dbfull()->ContinueBackgroundWork());
3066 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
3068 stalled_writer
.join();
3070 SyncPoint::GetInstance()->DisableProcessing();
3073 INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest
, DBFlushDirectIOTest
,
3076 INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest
, DBAtomicFlushTest
, testing::Bool());
3078 } // namespace ROCKSDB_NAMESPACE
3080 int main(int argc
, char** argv
) {
3081 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
3082 ::testing::InitGoogleTest(&argc
, argv
);
3083 return RUN_ALL_TESTS();