]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_flush_test.cc
3b3f7e1836c3e6a3c1df9898d7a5d1ee43b1c38a
[ceph.git] / ceph / src / rocksdb / db / db_flush_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include <atomic>
11 #include <limits>
12
13 #include "db/db_impl/db_impl.h"
14 #include "db/db_test_util.h"
15 #include "env/mock_env.h"
16 #include "file/filename.h"
17 #include "port/port.h"
18 #include "port/stack_trace.h"
19 #include "rocksdb/utilities/transaction_db.h"
20 #include "test_util/sync_point.h"
21 #include "test_util/testutil.h"
22 #include "util/cast_util.h"
23 #include "util/mutexlock.h"
24 #include "utilities/fault_injection_env.h"
25 #include "utilities/fault_injection_fs.h"
26
27 namespace ROCKSDB_NAMESPACE {
28
29 // This is a static filter used for filtering
30 // kvs during the compaction process.
31 static std::string NEW_VALUE = "NewValue";
32
33 class DBFlushTest : public DBTestBase {
34 public:
35 DBFlushTest() : DBTestBase("db_flush_test", /*env_do_fsync=*/true) {}
36 };
37
38 class DBFlushDirectIOTest : public DBFlushTest,
39 public ::testing::WithParamInterface<bool> {
40 public:
41 DBFlushDirectIOTest() : DBFlushTest() {}
42 };
43
44 class DBAtomicFlushTest : public DBFlushTest,
45 public ::testing::WithParamInterface<bool> {
46 public:
47 DBAtomicFlushTest() : DBFlushTest() {}
48 };
49
50 // We had issue when two background threads trying to flush at the same time,
51 // only one of them get committed. The test verifies the issue is fixed.
52 TEST_F(DBFlushTest, FlushWhileWritingManifest) {
53 Options options;
54 options.disable_auto_compactions = true;
55 options.max_background_flushes = 2;
56 options.env = env_;
57 Reopen(options);
58 FlushOptions no_wait;
59 no_wait.wait = false;
60 no_wait.allow_write_stall = true;
61
62 SyncPoint::GetInstance()->LoadDependency(
63 {{"VersionSet::LogAndApply:WriteManifest",
64 "DBFlushTest::FlushWhileWritingManifest:1"},
65 {"MemTableList::TryInstallMemtableFlushResults:InProgress",
66 "VersionSet::LogAndApply:WriteManifestDone"}});
67 SyncPoint::GetInstance()->EnableProcessing();
68
69 ASSERT_OK(Put("foo", "v"));
70 ASSERT_OK(dbfull()->Flush(no_wait));
71 TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
72 ASSERT_OK(Put("bar", "v"));
73 ASSERT_OK(dbfull()->Flush(no_wait));
74 // If the issue is hit we will wait here forever.
75 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
76 #ifndef ROCKSDB_LITE
77 ASSERT_EQ(2, TotalTableFiles());
78 #endif // ROCKSDB_LITE
79 }
80
81 // Disable this test temporarily on Travis as it fails intermittently.
82 // Github issue: #4151
83 TEST_F(DBFlushTest, SyncFail) {
84 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
85 new FaultInjectionTestEnv(env_));
86 Options options;
87 options.disable_auto_compactions = true;
88 options.env = fault_injection_env.get();
89
90 SyncPoint::GetInstance()->LoadDependency(
91 {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
92 {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
93 SyncPoint::GetInstance()->EnableProcessing();
94
95 CreateAndReopenWithCF({"pikachu"}, options);
96 ASSERT_OK(Put("key", "value"));
97 FlushOptions flush_options;
98 flush_options.wait = false;
99 ASSERT_OK(dbfull()->Flush(flush_options));
100 // Flush installs a new super-version. Get the ref count after that.
101 fault_injection_env->SetFilesystemActive(false);
102 TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
103 TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
104 fault_injection_env->SetFilesystemActive(true);
105 // Now the background job will do the flush; wait for it.
106 // Returns the IO error happend during flush.
107 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
108 #ifndef ROCKSDB_LITE
109 ASSERT_EQ("", FilesPerLevel()); // flush failed.
110 #endif // ROCKSDB_LITE
111 Destroy(options);
112 }
113
114 TEST_F(DBFlushTest, SyncSkip) {
115 Options options = CurrentOptions();
116
117 SyncPoint::GetInstance()->LoadDependency(
118 {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},
119 {"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});
120 SyncPoint::GetInstance()->EnableProcessing();
121
122 Reopen(options);
123 ASSERT_OK(Put("key", "value"));
124
125 FlushOptions flush_options;
126 flush_options.wait = false;
127 ASSERT_OK(dbfull()->Flush(flush_options));
128
129 TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");
130 TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
131
132 // Now the background job will do the flush; wait for it.
133 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
134
135 Destroy(options);
136 }
137
138 TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
139 // Verify setting an empty high-pri (flush) thread pool causes flushes to be
140 // scheduled in the low-pri (compaction) thread pool.
141 Options options = CurrentOptions();
142 options.level0_file_num_compaction_trigger = 4;
143 options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
144 Reopen(options);
145 env_->SetBackgroundThreads(0, Env::HIGH);
146
147 std::thread::id tid;
148 int num_flushes = 0, num_compactions = 0;
149 SyncPoint::GetInstance()->SetCallBack(
150 "DBImpl::BGWorkFlush", [&](void* /*arg*/) {
151 if (tid == std::thread::id()) {
152 tid = std::this_thread::get_id();
153 } else {
154 ASSERT_EQ(tid, std::this_thread::get_id());
155 }
156 ++num_flushes;
157 });
158 SyncPoint::GetInstance()->SetCallBack(
159 "DBImpl::BGWorkCompaction", [&](void* /*arg*/) {
160 ASSERT_EQ(tid, std::this_thread::get_id());
161 ++num_compactions;
162 });
163 SyncPoint::GetInstance()->EnableProcessing();
164
165 ASSERT_OK(Put("key", "val"));
166 for (int i = 0; i < 4; ++i) {
167 ASSERT_OK(Put("key", "val"));
168 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
169 }
170 ASSERT_OK(dbfull()->TEST_WaitForCompact());
171 ASSERT_EQ(4, num_flushes);
172 ASSERT_EQ(1, num_compactions);
173 }
174
175 // Test when flush job is submitted to low priority thread pool and when DB is
176 // closed in the meanwhile, CloseHelper doesn't hang.
177 TEST_F(DBFlushTest, CloseDBWhenFlushInLowPri) {
178 Options options = CurrentOptions();
179 options.max_background_flushes = 1;
180 options.max_total_wal_size = 8192;
181
182 DestroyAndReopen(options);
183 CreateColumnFamilies({"cf1", "cf2"}, options);
184
185 env_->SetBackgroundThreads(0, Env::HIGH);
186 env_->SetBackgroundThreads(1, Env::LOW);
187 test::SleepingBackgroundTask sleeping_task_low;
188 int num_flushes = 0;
189
190 SyncPoint::GetInstance()->SetCallBack("DBImpl::BGWorkFlush",
191 [&](void* /*arg*/) { ++num_flushes; });
192
193 int num_low_flush_unscheduled = 0;
194 SyncPoint::GetInstance()->SetCallBack(
195 "DBImpl::UnscheduleLowFlushCallback", [&](void* /*arg*/) {
196 num_low_flush_unscheduled++;
197 // There should be one flush job in low pool that needs to be
198 // unscheduled
199 ASSERT_EQ(num_low_flush_unscheduled, 1);
200 });
201
202 int num_high_flush_unscheduled = 0;
203 SyncPoint::GetInstance()->SetCallBack(
204 "DBImpl::UnscheduleHighFlushCallback", [&](void* /*arg*/) {
205 num_high_flush_unscheduled++;
206 // There should be no flush job in high pool
207 ASSERT_EQ(num_high_flush_unscheduled, 0);
208 });
209
210 SyncPoint::GetInstance()->EnableProcessing();
211
212 ASSERT_OK(Put(0, "key1", DummyString(8192)));
213 // Block thread so that flush cannot be run and can be removed from the queue
214 // when called Unschedule.
215 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
216 Env::Priority::LOW);
217 sleeping_task_low.WaitUntilSleeping();
218
219 // Trigger flush and flush job will be scheduled to LOW priority thread.
220 ASSERT_OK(Put(0, "key2", DummyString(8192)));
221
222 // Close DB and flush job in low priority queue will be removed without
223 // running.
224 Close();
225 sleeping_task_low.WakeUp();
226 sleeping_task_low.WaitUntilDone();
227 ASSERT_EQ(0, num_flushes);
228
229 TryReopenWithColumnFamilies({"default", "cf1", "cf2"}, options);
230 ASSERT_OK(Put(0, "key3", DummyString(8192)));
231 ASSERT_OK(Flush(0));
232 ASSERT_EQ(1, num_flushes);
233 }
234
235 TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
236 Options options = CurrentOptions();
237 options.write_buffer_size = 100;
238 options.max_write_buffer_number = 4;
239 options.min_write_buffer_number_to_merge = 3;
240 Reopen(options);
241
242 SyncPoint::GetInstance()->LoadDependency(
243 {{"DBImpl::BGWorkFlush",
244 "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
245 {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
246 "FlushJob::WriteLevel0Table"}});
247 SyncPoint::GetInstance()->EnableProcessing();
248
249 ASSERT_OK(Put("key1", "value1"));
250
251 port::Thread t([&]() {
252 // The call wait for flush to finish, i.e. with flush_options.wait = true.
253 ASSERT_OK(Flush());
254 });
255
256 // Wait for flush start.
257 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
258 // Insert a second memtable before the manual flush finish.
259 // At the end of the manual flush job, it will check if further flush
260 // is needed, but it will not trigger flush of the second memtable because
261 // min_write_buffer_number_to_merge is not reached.
262 ASSERT_OK(Put("key2", "value2"));
263 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
264 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
265
266 // Manual flush should return, without waiting for flush indefinitely.
267 t.join();
268 }
269
270 TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) {
271 Options options = CurrentOptions();
272 Reopen(options);
273 SyncPoint::GetInstance()->DisableProcessing();
274 SyncPoint::GetInstance()->ClearAllCallBacks();
275 int called = 0;
276 SyncPoint::GetInstance()->SetCallBack(
277 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) {
278 ASSERT_NE(nullptr, arg);
279 auto unscheduled_flushes = *reinterpret_cast<int*>(arg);
280 ASSERT_EQ(0, unscheduled_flushes);
281 ++called;
282 });
283 SyncPoint::GetInstance()->EnableProcessing();
284
285 ASSERT_OK(Put("a", "foo"));
286 FlushOptions flush_opts;
287 ASSERT_OK(dbfull()->Flush(flush_opts));
288 ASSERT_EQ(1, called);
289
290 SyncPoint::GetInstance()->DisableProcessing();
291 SyncPoint::GetInstance()->ClearAllCallBacks();
292 }
293
294 // The following 3 tests are designed for testing garbage statistics at flush
295 // time.
296 //
297 // ======= General Information ======= (from GitHub Wiki).
298 // There are three scenarios where memtable flush can be triggered:
299 //
300 // 1 - Memtable size exceeds ColumnFamilyOptions::write_buffer_size
301 // after a write.
302 // 2 - Total memtable size across all column families exceeds
303 // DBOptions::db_write_buffer_size,
304 // or DBOptions::write_buffer_manager signals a flush. In this scenario
305 // the largest memtable will be flushed.
306 // 3 - Total WAL file size exceeds DBOptions::max_total_wal_size.
307 // In this scenario the memtable with the oldest data will be flushed,
308 // in order to allow the WAL file with data from this memtable to be
309 // purged.
310 //
311 // As a result, a memtable can be flushed before it is full. This is one
312 // reason the generated SST file can be smaller than the corresponding
313 // memtable. Compression is another factor to make SST file smaller than
314 // corresponding memtable, since data in memtable is uncompressed.
315
316 TEST_F(DBFlushTest, StatisticsGarbageBasic) {
317 Options options = CurrentOptions();
318
319 // The following options are used to enforce several values that
320 // may already exist as default values to make this test resilient
321 // to default value updates in the future.
322 options.statistics = CreateDBStatistics();
323
324 // Record all statistics.
325 options.statistics->set_stats_level(StatsLevel::kAll);
326
327 // create the DB if it's not already present
328 options.create_if_missing = true;
329
330 // Useful for now as we are trying to compare uncompressed data savings on
331 // flush().
332 options.compression = kNoCompression;
333
334 // Prevent memtable in place updates. Should already be disabled
335 // (from Wiki:
336 // In place updates can be enabled by toggling on the bool
337 // inplace_update_support flag. However, this flag is by default set to
338 // false
339 // because this thread-safe in-place update support is not compatible
340 // with concurrent memtable writes. Note that the bool
341 // allow_concurrent_memtable_write is set to true by default )
342 options.inplace_update_support = false;
343 options.allow_concurrent_memtable_write = true;
344
345 // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
346 options.write_buffer_size = 64 << 20;
347
348 ASSERT_OK(TryReopen(options));
349
350 // Put multiple times the same key-values.
351 // The encoded length of a db entry in the memtable is
352 // defined in db/memtable.cc (MemTable::Add) as the variable:
353 // encoded_len= VarintLength(internal_key_size) --> =
354 // log_256(internal_key).
355 // Min # of bytes
356 // necessary to
357 // store
358 // internal_key_size.
359 // + internal_key_size --> = actual key string,
360 // (size key_size: w/o term null char)
361 // + 8 bytes for
362 // fixed uint64 "seq
363 // number
364 // +
365 // insertion type"
366 // + VarintLength(val_size) --> = min # of bytes to
367 // store val_size
368 // + val_size --> = actual value
369 // string
370 // For example, in our situation, "key1" : size 4, "value1" : size 6
371 // (the terminating null characters are not copied over to the memtable).
372 // And therefore encoded_len = 1 + (4+8) + 1 + 6 = 20 bytes per entry.
373 // However in terms of raw data contained in the memtable, and written
374 // over to the SSTable, we only count internal_key_size and val_size,
375 // because this is the only raw chunk of bytes that contains everything
376 // necessary to reconstruct a user entry: sequence number, insertion type,
377 // key, and value.
378
379 // To test the relevance of our Memtable garbage statistics,
380 // namely MEMTABLE_PAYLOAD_BYTES_AT_FLUSH and MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
381 // we insert K-V pairs with 3 distinct keys (of length 4),
382 // and random values of arbitrary length RAND_VALUES_LENGTH,
383 // and we repeat this step NUM_REPEAT times total.
384 // At the end, we insert 3 final K-V pairs with the same 3 keys
385 // and known values (these will be the final values, of length 6).
386 // I chose NUM_REPEAT=2,000 such that no automatic flush is
387 // triggered (the number of bytes in the memtable is therefore
388 // well below any meaningful heuristic for a memtable of size 64MB).
389 // As a result, since each K-V pair is inserted as a payload
390 // of N meaningful bytes (sequence number, insertion type,
391 // key, and value = 8 + 4 + RAND_VALUE_LENGTH),
392 // MEMTABLE_GARBAGE_BYTES_AT_FLUSH should be equal to 2,000 * N bytes
393 // and MEMTABLE_PAYLAOD_BYTES_AT_FLUSH = MEMTABLE_GARBAGE_BYTES_AT_FLUSH +
394 // (3*(8 + 4 + 6)) bytes. For RAND_VALUE_LENGTH = 172 (arbitrary value), we
395 // expect:
396 // N = 8 + 4 + 172 = 184 bytes
397 // MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 2,000 * 184 = 368,000 bytes.
398 // MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 368,000 + 3*18 = 368,054 bytes.
399
400 const size_t NUM_REPEAT = 2000;
401 const size_t RAND_VALUES_LENGTH = 172;
402 const std::string KEY1 = "key1";
403 const std::string KEY2 = "key2";
404 const std::string KEY3 = "key3";
405 const std::string VALUE1 = "value1";
406 const std::string VALUE2 = "value2";
407 const std::string VALUE3 = "value3";
408 uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
409 uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
410
411 Random rnd(301);
412 // Insertion of of K-V pairs, multiple times.
413 for (size_t i = 0; i < NUM_REPEAT; i++) {
414 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
415 std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
416 std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
417 std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
418 ASSERT_OK(Put(KEY1, p_v1));
419 ASSERT_OK(Put(KEY2, p_v2));
420 ASSERT_OK(Put(KEY3, p_v3));
421 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
422 KEY1.size() + p_v1.size() + sizeof(uint64_t);
423 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
424 KEY2.size() + p_v2.size() + sizeof(uint64_t);
425 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
426 KEY3.size() + p_v3.size() + sizeof(uint64_t);
427 }
428
429 // The memtable data bytes includes the "garbage"
430 // bytes along with the useful payload.
431 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
432 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
433
434 ASSERT_OK(Put(KEY1, VALUE1));
435 ASSERT_OK(Put(KEY2, VALUE2));
436 ASSERT_OK(Put(KEY3, VALUE3));
437
438 // Add useful payload to the memtable data bytes:
439 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
440 KEY1.size() + VALUE1.size() + KEY2.size() + VALUE2.size() + KEY3.size() +
441 VALUE3.size() + 3 * sizeof(uint64_t);
442
443 // We assert that the last K-V pairs have been successfully inserted,
444 // and that the valid values are VALUE1, VALUE2, VALUE3.
445 PinnableSlice value;
446 ASSERT_OK(Get(KEY1, &value));
447 ASSERT_EQ(value.ToString(), VALUE1);
448 ASSERT_OK(Get(KEY2, &value));
449 ASSERT_EQ(value.ToString(), VALUE2);
450 ASSERT_OK(Get(KEY3, &value));
451 ASSERT_EQ(value.ToString(), VALUE3);
452
453 // Force flush to SST. Increments the statistics counter.
454 ASSERT_OK(Flush());
455
456 // Collect statistics.
457 uint64_t mem_data_bytes =
458 TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
459 uint64_t mem_garbage_bytes =
460 TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
461
462 EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
463 EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
464
465 Close();
466 }
467
468 TEST_F(DBFlushTest, StatisticsGarbageInsertAndDeletes) {
469 Options options = CurrentOptions();
470 options.statistics = CreateDBStatistics();
471 options.statistics->set_stats_level(StatsLevel::kAll);
472 options.create_if_missing = true;
473 options.compression = kNoCompression;
474 options.inplace_update_support = false;
475 options.allow_concurrent_memtable_write = true;
476 options.write_buffer_size = 67108864;
477
478 ASSERT_OK(TryReopen(options));
479
480 const size_t NUM_REPEAT = 2000;
481 const size_t RAND_VALUES_LENGTH = 37;
482 const std::string KEY1 = "key1";
483 const std::string KEY2 = "key2";
484 const std::string KEY3 = "key3";
485 const std::string KEY4 = "key4";
486 const std::string KEY5 = "key5";
487 const std::string KEY6 = "key6";
488
489 uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
490 uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
491
492 WriteBatch batch;
493
494 Random rnd(301);
495 // Insertion of of K-V pairs, multiple times.
496 for (size_t i = 0; i < NUM_REPEAT; i++) {
497 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
498 std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
499 std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
500 std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
501 ASSERT_OK(Put(KEY1, p_v1));
502 ASSERT_OK(Put(KEY2, p_v2));
503 ASSERT_OK(Put(KEY3, p_v3));
504 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
505 KEY1.size() + p_v1.size() + sizeof(uint64_t);
506 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
507 KEY2.size() + p_v2.size() + sizeof(uint64_t);
508 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
509 KEY3.size() + p_v3.size() + sizeof(uint64_t);
510 ASSERT_OK(Delete(KEY1));
511 ASSERT_OK(Delete(KEY2));
512 ASSERT_OK(Delete(KEY3));
513 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
514 KEY1.size() + KEY2.size() + KEY3.size() + 3 * sizeof(uint64_t);
515 }
516
517 // The memtable data bytes includes the "garbage"
518 // bytes along with the useful payload.
519 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
520 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
521
522 // Note : one set of delete for KEY1, KEY2, KEY3 is written to
523 // SSTable to propagate the delete operations to K-V pairs
524 // that could have been inserted into the database during past Flush
525 // opeartions.
526 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH -=
527 KEY1.size() + KEY2.size() + KEY3.size() + 3 * sizeof(uint64_t);
528
529 // Additional useful paylaod.
530 ASSERT_OK(Delete(KEY4));
531 ASSERT_OK(Delete(KEY5));
532 ASSERT_OK(Delete(KEY6));
533
534 // // Add useful payload to the memtable data bytes:
535 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
536 KEY4.size() + KEY5.size() + KEY6.size() + 3 * sizeof(uint64_t);
537
538 // We assert that the K-V pairs have been successfully deleted.
539 PinnableSlice value;
540 ASSERT_NOK(Get(KEY1, &value));
541 ASSERT_NOK(Get(KEY2, &value));
542 ASSERT_NOK(Get(KEY3, &value));
543
544 // Force flush to SST. Increments the statistics counter.
545 ASSERT_OK(Flush());
546
547 // Collect statistics.
548 uint64_t mem_data_bytes =
549 TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
550 uint64_t mem_garbage_bytes =
551 TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
552
553 EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
554 EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
555
556 Close();
557 }
558
559 TEST_F(DBFlushTest, StatisticsGarbageRangeDeletes) {
560 Options options = CurrentOptions();
561 options.statistics = CreateDBStatistics();
562 options.statistics->set_stats_level(StatsLevel::kAll);
563 options.create_if_missing = true;
564 options.compression = kNoCompression;
565 options.inplace_update_support = false;
566 options.allow_concurrent_memtable_write = true;
567 options.write_buffer_size = 67108864;
568
569 ASSERT_OK(TryReopen(options));
570
571 const size_t NUM_REPEAT = 1000;
572 const size_t RAND_VALUES_LENGTH = 42;
573 const std::string KEY1 = "key1";
574 const std::string KEY2 = "key2";
575 const std::string KEY3 = "key3";
576 const std::string KEY4 = "key4";
577 const std::string KEY5 = "key5";
578 const std::string KEY6 = "key6";
579 const std::string VALUE3 = "value3";
580
581 uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
582 uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
583
584 Random rnd(301);
585 // Insertion of of K-V pairs, multiple times.
586 // Also insert DeleteRange
587 for (size_t i = 0; i < NUM_REPEAT; i++) {
588 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
589 std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
590 std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
591 std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
592 ASSERT_OK(Put(KEY1, p_v1));
593 ASSERT_OK(Put(KEY2, p_v2));
594 ASSERT_OK(Put(KEY3, p_v3));
595 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
596 KEY1.size() + p_v1.size() + sizeof(uint64_t);
597 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
598 KEY2.size() + p_v2.size() + sizeof(uint64_t);
599 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
600 KEY3.size() + p_v3.size() + sizeof(uint64_t);
601 ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1,
602 KEY2));
603 // Note: DeleteRange have an exclusive upper bound, e.g. here: [KEY2,KEY3)
604 // is deleted.
605 ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2,
606 KEY3));
607 // Delete ranges are stored as a regular K-V pair, with key=STARTKEY,
608 // value=ENDKEY.
609 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
610 (KEY1.size() + KEY2.size() + sizeof(uint64_t)) +
611 (KEY2.size() + KEY3.size() + sizeof(uint64_t));
612 }
613
614 // The memtable data bytes includes the "garbage"
615 // bytes along with the useful payload.
616 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
617 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
618
619 // Note : one set of deleteRange for (KEY1, KEY2) and (KEY2, KEY3) is written
620 // to SSTable to propagate the deleteRange operations to K-V pairs that could
621 // have been inserted into the database during past Flush opeartions.
622 EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH -=
623 (KEY1.size() + KEY2.size() + sizeof(uint64_t)) +
624 (KEY2.size() + KEY3.size() + sizeof(uint64_t));
625
626 // Overwrite KEY3 with known value (VALUE3)
627 // Note that during the whole time KEY3 has never been deleted
628 // by the RangeDeletes.
629 ASSERT_OK(Put(KEY3, VALUE3));
630 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
631 KEY3.size() + VALUE3.size() + sizeof(uint64_t);
632
633 // Additional useful paylaod.
634 ASSERT_OK(
635 db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY4, KEY5));
636 ASSERT_OK(
637 db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY5, KEY6));
638
639 // Add useful payload to the memtable data bytes:
640 EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
641 (KEY4.size() + KEY5.size() + sizeof(uint64_t)) +
642 (KEY5.size() + KEY6.size() + sizeof(uint64_t));
643
644 // We assert that the K-V pairs have been successfully deleted.
645 PinnableSlice value;
646 ASSERT_NOK(Get(KEY1, &value));
647 ASSERT_NOK(Get(KEY2, &value));
648 // And that KEY3's value is correct.
649 ASSERT_OK(Get(KEY3, &value));
650 ASSERT_EQ(value, VALUE3);
651
652 // Force flush to SST. Increments the statistics counter.
653 ASSERT_OK(Flush());
654
655 // Collect statistics.
656 uint64_t mem_data_bytes =
657 TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
658 uint64_t mem_garbage_bytes =
659 TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
660
661 EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
662 EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
663
664 Close();
665 }
666
667 #ifndef ROCKSDB_LITE
668 // This simple Listener can only handle one flush at a time.
669 class TestFlushListener : public EventListener {
670 public:
671 TestFlushListener(Env* env, DBFlushTest* test)
672 : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
673 db_closed = false;
674 }
675
676 ~TestFlushListener() override {
677 prev_fc_info_.status.PermitUncheckedError(); // Ignore the status
678 }
679
680 void OnTableFileCreated(const TableFileCreationInfo& info) override {
681 // remember the info for later checking the FlushJobInfo.
682 prev_fc_info_ = info;
683 ASSERT_GT(info.db_name.size(), 0U);
684 ASSERT_GT(info.cf_name.size(), 0U);
685 ASSERT_GT(info.file_path.size(), 0U);
686 ASSERT_GT(info.job_id, 0);
687 ASSERT_GT(info.table_properties.data_size, 0U);
688 ASSERT_GT(info.table_properties.raw_key_size, 0U);
689 ASSERT_GT(info.table_properties.raw_value_size, 0U);
690 ASSERT_GT(info.table_properties.num_data_blocks, 0U);
691 ASSERT_GT(info.table_properties.num_entries, 0U);
692 ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
693 ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
694 }
695
696 void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
697 flushed_dbs_.push_back(db);
698 flushed_column_family_names_.push_back(info.cf_name);
699 if (info.triggered_writes_slowdown) {
700 slowdown_count++;
701 }
702 if (info.triggered_writes_stop) {
703 stop_count++;
704 }
705 // verify whether the previously created file matches the flushed file.
706 ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
707 ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
708 ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
709 ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
710 ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
711
712 // Note: the following chunk relies on the notification pertaining to the
713 // database pointed to by DBTestBase::db_, and is thus bypassed when
714 // that assumption does not hold (see the test case MultiDBMultiListeners
715 // below).
716 ASSERT_TRUE(test_);
717 if (db == test_->db_) {
718 std::vector<std::vector<FileMetaData>> files_by_level;
719 test_->dbfull()->TEST_GetFilesMetaData(db->DefaultColumnFamily(),
720 &files_by_level);
721
722 ASSERT_FALSE(files_by_level.empty());
723 auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
724 [&](const FileMetaData& meta) {
725 return meta.fd.GetNumber() == info.file_number;
726 });
727 ASSERT_NE(it, files_by_level[0].end());
728 ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
729 }
730
731 ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
732 ASSERT_GT(info.thread_id, 0U);
733 }
734
735 std::vector<std::string> flushed_column_family_names_;
736 std::vector<DB*> flushed_dbs_;
737 int slowdown_count;
738 int stop_count;
739 bool db_closing;
740 std::atomic_bool db_closed;
741 TableFileCreationInfo prev_fc_info_;
742
743 protected:
744 Env* env_;
745 DBFlushTest* test_;
746 };
747 #endif // !ROCKSDB_LITE
748
749 TEST_F(DBFlushTest, MemPurgeBasic) {
750 Options options = CurrentOptions();
751
752 // The following options are used to enforce several values that
753 // may already exist as default values to make this test resilient
754 // to default value updates in the future.
755 options.statistics = CreateDBStatistics();
756
757 // Record all statistics.
758 options.statistics->set_stats_level(StatsLevel::kAll);
759
760 // create the DB if it's not already present
761 options.create_if_missing = true;
762
763 // Useful for now as we are trying to compare uncompressed data savings on
764 // flush().
765 options.compression = kNoCompression;
766
767 // Prevent memtable in place updates. Should already be disabled
768 // (from Wiki:
769 // In place updates can be enabled by toggling on the bool
770 // inplace_update_support flag. However, this flag is by default set to
771 // false
772 // because this thread-safe in-place update support is not compatible
773 // with concurrent memtable writes. Note that the bool
774 // allow_concurrent_memtable_write is set to true by default )
775 options.inplace_update_support = false;
776 options.allow_concurrent_memtable_write = true;
777
778 // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
779 options.write_buffer_size = 1 << 20;
780 #ifndef ROCKSDB_LITE
781 // Initially deactivate the MemPurge prototype.
782 options.experimental_mempurge_threshold = 0.0;
783 TestFlushListener* listener = new TestFlushListener(options.env, this);
784 options.listeners.emplace_back(listener);
785 #else
786 // Activate directly the MemPurge prototype.
787 // (RocksDB lite does not support dynamic options)
788 options.experimental_mempurge_threshold = 1.0;
789 #endif // !ROCKSDB_LITE
790 ASSERT_OK(TryReopen(options));
791
792 // RocksDB lite does not support dynamic options
793 #ifndef ROCKSDB_LITE
794 // Dynamically activate the MemPurge prototype without restarting the DB.
795 ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
796 ASSERT_OK(db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "1.0"}}));
797 #endif
798
799 std::atomic<uint32_t> mempurge_count{0};
800 std::atomic<uint32_t> sst_count{0};
801 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
802 "DBImpl::FlushJob:MemPurgeSuccessful",
803 [&](void* /*arg*/) { mempurge_count++; });
804 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
805 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
806 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
807
808 std::string KEY1 = "IamKey1";
809 std::string KEY2 = "IamKey2";
810 std::string KEY3 = "IamKey3";
811 std::string KEY4 = "IamKey4";
812 std::string KEY5 = "IamKey5";
813 std::string KEY6 = "IamKey6";
814 std::string KEY7 = "IamKey7";
815 std::string KEY8 = "IamKey8";
816 std::string KEY9 = "IamKey9";
817 std::string RNDKEY1, RNDKEY2, RNDKEY3;
818 const std::string NOT_FOUND = "NOT_FOUND";
819
820 // Heavy overwrite workload,
821 // more than would fit in maximum allowed memtables.
822 Random rnd(719);
823 const size_t NUM_REPEAT = 100;
824 const size_t RAND_KEYS_LENGTH = 57;
825 const size_t RAND_VALUES_LENGTH = 10240;
826 std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9, p_rv1,
827 p_rv2, p_rv3;
828
829 // Insert a very first set of keys that will be
830 // mempurged at least once.
831 p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
832 p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
833 p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
834 p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
835 ASSERT_OK(Put(KEY1, p_v1));
836 ASSERT_OK(Put(KEY2, p_v2));
837 ASSERT_OK(Put(KEY3, p_v3));
838 ASSERT_OK(Put(KEY4, p_v4));
839 ASSERT_EQ(Get(KEY1), p_v1);
840 ASSERT_EQ(Get(KEY2), p_v2);
841 ASSERT_EQ(Get(KEY3), p_v3);
842 ASSERT_EQ(Get(KEY4), p_v4);
843
844 // Insertion of of K-V pairs, multiple times (overwrites).
845 for (size_t i = 0; i < NUM_REPEAT; i++) {
846 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
847 p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
848 p_v6 = rnd.RandomString(RAND_VALUES_LENGTH);
849 p_v7 = rnd.RandomString(RAND_VALUES_LENGTH);
850 p_v8 = rnd.RandomString(RAND_VALUES_LENGTH);
851 p_v9 = rnd.RandomString(RAND_VALUES_LENGTH);
852
853 ASSERT_OK(Put(KEY5, p_v5));
854 ASSERT_OK(Put(KEY6, p_v6));
855 ASSERT_OK(Put(KEY7, p_v7));
856 ASSERT_OK(Put(KEY8, p_v8));
857 ASSERT_OK(Put(KEY9, p_v9));
858
859 ASSERT_EQ(Get(KEY1), p_v1);
860 ASSERT_EQ(Get(KEY2), p_v2);
861 ASSERT_EQ(Get(KEY3), p_v3);
862 ASSERT_EQ(Get(KEY4), p_v4);
863 ASSERT_EQ(Get(KEY5), p_v5);
864 ASSERT_EQ(Get(KEY6), p_v6);
865 ASSERT_EQ(Get(KEY7), p_v7);
866 ASSERT_EQ(Get(KEY8), p_v8);
867 ASSERT_EQ(Get(KEY9), p_v9);
868 }
869
870 // Check that there was at least one mempurge
871 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
872 // Check that there was no SST files created during flush.
873 const uint32_t EXPECTED_SST_COUNT = 0;
874
875 EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
876 EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
877
878 // Insertion of of K-V pairs, no overwrites.
879 for (size_t i = 0; i < NUM_REPEAT; i++) {
880 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
881 RNDKEY1 = rnd.RandomString(RAND_KEYS_LENGTH);
882 RNDKEY2 = rnd.RandomString(RAND_KEYS_LENGTH);
883 RNDKEY3 = rnd.RandomString(RAND_KEYS_LENGTH);
884 p_rv1 = rnd.RandomString(RAND_VALUES_LENGTH);
885 p_rv2 = rnd.RandomString(RAND_VALUES_LENGTH);
886 p_rv3 = rnd.RandomString(RAND_VALUES_LENGTH);
887
888 ASSERT_OK(Put(RNDKEY1, p_rv1));
889 ASSERT_OK(Put(RNDKEY2, p_rv2));
890 ASSERT_OK(Put(RNDKEY3, p_rv3));
891
892 ASSERT_EQ(Get(KEY1), p_v1);
893 ASSERT_EQ(Get(KEY2), p_v2);
894 ASSERT_EQ(Get(KEY3), p_v3);
895 ASSERT_EQ(Get(KEY4), p_v4);
896 ASSERT_EQ(Get(KEY5), p_v5);
897 ASSERT_EQ(Get(KEY6), p_v6);
898 ASSERT_EQ(Get(KEY7), p_v7);
899 ASSERT_EQ(Get(KEY8), p_v8);
900 ASSERT_EQ(Get(KEY9), p_v9);
901 ASSERT_EQ(Get(RNDKEY1), p_rv1);
902 ASSERT_EQ(Get(RNDKEY2), p_rv2);
903 ASSERT_EQ(Get(RNDKEY3), p_rv3);
904 }
905
906 // Assert that at least one flush to storage has been performed
907 EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
908 // (which will consequently increase the number of mempurges recorded too).
909 EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
910
911 // Assert that there is no data corruption, even with
912 // a flush to storage.
913 ASSERT_EQ(Get(KEY1), p_v1);
914 ASSERT_EQ(Get(KEY2), p_v2);
915 ASSERT_EQ(Get(KEY3), p_v3);
916 ASSERT_EQ(Get(KEY4), p_v4);
917 ASSERT_EQ(Get(KEY5), p_v5);
918 ASSERT_EQ(Get(KEY6), p_v6);
919 ASSERT_EQ(Get(KEY7), p_v7);
920 ASSERT_EQ(Get(KEY8), p_v8);
921 ASSERT_EQ(Get(KEY9), p_v9);
922 ASSERT_EQ(Get(RNDKEY1), p_rv1);
923 ASSERT_EQ(Get(RNDKEY2), p_rv2);
924 ASSERT_EQ(Get(RNDKEY3), p_rv3);
925
926 Close();
927 }
928
929 // RocksDB lite does not support dynamic options
930 #ifndef ROCKSDB_LITE
931 TEST_F(DBFlushTest, MemPurgeBasicToggle) {
932 Options options = CurrentOptions();
933
934 // The following options are used to enforce several values that
935 // may already exist as default values to make this test resilient
936 // to default value updates in the future.
937 options.statistics = CreateDBStatistics();
938
939 // Record all statistics.
940 options.statistics->set_stats_level(StatsLevel::kAll);
941
942 // create the DB if it's not already present
943 options.create_if_missing = true;
944
945 // Useful for now as we are trying to compare uncompressed data savings on
946 // flush().
947 options.compression = kNoCompression;
948
949 // Prevent memtable in place updates. Should already be disabled
950 // (from Wiki:
951 // In place updates can be enabled by toggling on the bool
952 // inplace_update_support flag. However, this flag is by default set to
953 // false
954 // because this thread-safe in-place update support is not compatible
955 // with concurrent memtable writes. Note that the bool
956 // allow_concurrent_memtable_write is set to true by default )
957 options.inplace_update_support = false;
958 options.allow_concurrent_memtable_write = true;
959
960 // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
961 options.write_buffer_size = 1 << 20;
962 // Initially deactivate the MemPurge prototype.
963 // (negative values are equivalent to 0.0).
964 options.experimental_mempurge_threshold = -25.3;
965 TestFlushListener* listener = new TestFlushListener(options.env, this);
966 options.listeners.emplace_back(listener);
967
968 ASSERT_OK(TryReopen(options));
969 // Dynamically activate the MemPurge prototype without restarting the DB.
970 ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
971 // Values greater than 1.0 are equivalent to 1.0
972 ASSERT_OK(
973 db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "3.7898"}}));
974 std::atomic<uint32_t> mempurge_count{0};
975 std::atomic<uint32_t> sst_count{0};
976 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
977 "DBImpl::FlushJob:MemPurgeSuccessful",
978 [&](void* /*arg*/) { mempurge_count++; });
979 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
980 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
981 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
982 const size_t KVSIZE = 3;
983 std::vector<std::string> KEYS(KVSIZE);
984 for (size_t k = 0; k < KVSIZE; k++) {
985 KEYS[k] = "IamKey" + std::to_string(k);
986 }
987
988 std::vector<std::string> RNDVALS(KVSIZE);
989 const std::string NOT_FOUND = "NOT_FOUND";
990
991 // Heavy overwrite workload,
992 // more than would fit in maximum allowed memtables.
993 Random rnd(719);
994 const size_t NUM_REPEAT = 100;
995 const size_t RAND_VALUES_LENGTH = 10240;
996
997 // Insertion of of K-V pairs, multiple times (overwrites).
998 for (size_t i = 0; i < NUM_REPEAT; i++) {
999 for (size_t j = 0; j < KEYS.size(); j++) {
1000 RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
1001 ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
1002 ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
1003 }
1004 for (size_t j = 0; j < KEYS.size(); j++) {
1005 ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
1006 }
1007 }
1008
1009 // Check that there was at least one mempurge
1010 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
1011 // Check that there was no SST files created during flush.
1012 const uint32_t EXPECTED_SST_COUNT = 0;
1013
1014 EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
1015 EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
1016
1017 // Dynamically deactivate MemPurge.
1018 ASSERT_OK(
1019 db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "-1023.0"}}));
1020
1021 // Insertion of of K-V pairs, multiple times (overwrites).
1022 for (size_t i = 0; i < NUM_REPEAT; i++) {
1023 for (size_t j = 0; j < KEYS.size(); j++) {
1024 RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
1025 ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
1026 ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
1027 }
1028 for (size_t j = 0; j < KEYS.size(); j++) {
1029 ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
1030 }
1031 }
1032
1033 // Check that there was at least one mempurge
1034 const uint32_t ZERO = 0;
1035 // Assert that at least one flush to storage has been performed
1036 EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
1037 // The mempurge count is expected to be set to 0 when the options are updated.
1038 // We expect no mempurge at all.
1039 EXPECT_EQ(mempurge_count.exchange(0), ZERO);
1040
1041 Close();
1042 }
1043 // Closes the "#ifndef ROCKSDB_LITE"
1044 // End of MemPurgeBasicToggle, which is not
1045 // supported with RocksDB LITE because it
1046 // relies on dynamically changing the option
1047 // flag experimental_mempurge_threshold.
1048 #endif
1049
1050 // At the moment, MemPurge feature is deactivated
1051 // when atomic_flush is enabled. This is because the level
1052 // of garbage between Column Families is not guaranteed to
1053 // be consistent, therefore a CF could hypothetically
1054 // trigger a MemPurge while another CF would trigger
1055 // a regular Flush.
1056 TEST_F(DBFlushTest, MemPurgeWithAtomicFlush) {
1057 Options options = CurrentOptions();
1058
1059 // The following options are used to enforce several values that
1060 // may already exist as default values to make this test resilient
1061 // to default value updates in the future.
1062 options.statistics = CreateDBStatistics();
1063
1064 // Record all statistics.
1065 options.statistics->set_stats_level(StatsLevel::kAll);
1066
1067 // create the DB if it's not already present
1068 options.create_if_missing = true;
1069
1070 // Useful for now as we are trying to compare uncompressed data savings on
1071 // flush().
1072 options.compression = kNoCompression;
1073
1074 // Prevent memtable in place updates. Should already be disabled
1075 // (from Wiki:
1076 // In place updates can be enabled by toggling on the bool
1077 // inplace_update_support flag. However, this flag is by default set to
1078 // false
1079 // because this thread-safe in-place update support is not compatible
1080 // with concurrent memtable writes. Note that the bool
1081 // allow_concurrent_memtable_write is set to true by default )
1082 options.inplace_update_support = false;
1083 options.allow_concurrent_memtable_write = true;
1084
1085 // Enforce size of a single MemTable to 64KB (64KB = 65,536 bytes).
1086 options.write_buffer_size = 1 << 20;
1087 // Activate the MemPurge prototype.
1088 options.experimental_mempurge_threshold = 153.245;
1089 // Activate atomic_flush.
1090 options.atomic_flush = true;
1091
1092 const std::vector<std::string> new_cf_names = {"pikachu", "eevie"};
1093 CreateColumnFamilies(new_cf_names, options);
1094
1095 Close();
1096
1097 // 3 CFs: default will be filled with overwrites (would normally trigger
1098 // mempurge)
1099 // new_cf_names[1] will be filled with random values (would trigger
1100 // flush) new_cf_names[2] not filled with anything.
1101 ReopenWithColumnFamilies(
1102 {kDefaultColumnFamilyName, new_cf_names[0], new_cf_names[1]}, options);
1103 size_t num_cfs = handles_.size();
1104 ASSERT_EQ(3, num_cfs);
1105 ASSERT_OK(Put(1, "foo", "bar"));
1106 ASSERT_OK(Put(2, "bar", "baz"));
1107
1108 std::atomic<uint32_t> mempurge_count{0};
1109 std::atomic<uint32_t> sst_count{0};
1110 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1111 "DBImpl::FlushJob:MemPurgeSuccessful",
1112 [&](void* /*arg*/) { mempurge_count++; });
1113 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1114 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
1115 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1116
1117 const size_t KVSIZE = 3;
1118 std::vector<std::string> KEYS(KVSIZE);
1119 for (size_t k = 0; k < KVSIZE; k++) {
1120 KEYS[k] = "IamKey" + std::to_string(k);
1121 }
1122
1123 std::string RNDKEY;
1124 std::vector<std::string> RNDVALS(KVSIZE);
1125 const std::string NOT_FOUND = "NOT_FOUND";
1126
1127 // Heavy overwrite workload,
1128 // more than would fit in maximum allowed memtables.
1129 Random rnd(106);
1130 const size_t NUM_REPEAT = 100;
1131 const size_t RAND_KEY_LENGTH = 128;
1132 const size_t RAND_VALUES_LENGTH = 10240;
1133
1134 // Insertion of of K-V pairs, multiple times (overwrites).
1135 for (size_t i = 0; i < NUM_REPEAT; i++) {
1136 for (size_t j = 0; j < KEYS.size(); j++) {
1137 RNDKEY = rnd.RandomString(RAND_KEY_LENGTH);
1138 RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
1139 ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
1140 ASSERT_OK(Put(1, RNDKEY, RNDVALS[j]));
1141 ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
1142 ASSERT_EQ(Get(1, RNDKEY), RNDVALS[j]);
1143 }
1144 }
1145
1146 // Check that there was no mempurge because atomic_flush option is true.
1147 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 0;
1148 // Check that there was at least one SST files created during flush.
1149 const uint32_t EXPECTED_SST_COUNT = 1;
1150
1151 EXPECT_EQ(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
1152 EXPECT_GE(sst_count.exchange(0), EXPECTED_SST_COUNT);
1153
1154 Close();
1155 }
1156
1157 TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
1158 Options options = CurrentOptions();
1159
1160 options.statistics = CreateDBStatistics();
1161 options.statistics->set_stats_level(StatsLevel::kAll);
1162 options.create_if_missing = true;
1163 options.compression = kNoCompression;
1164 options.inplace_update_support = false;
1165 options.allow_concurrent_memtable_write = true;
1166 #ifndef ROCKSDB_LITE
1167 TestFlushListener* listener = new TestFlushListener(options.env, this);
1168 options.listeners.emplace_back(listener);
1169 #endif // !ROCKSDB_LITE
1170 // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
1171 options.write_buffer_size = 1 << 20;
1172 // Activate the MemPurge prototype.
1173 options.experimental_mempurge_threshold = 15.0;
1174
1175 ASSERT_OK(TryReopen(options));
1176
1177 std::atomic<uint32_t> mempurge_count{0};
1178 std::atomic<uint32_t> sst_count{0};
1179 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1180 "DBImpl::FlushJob:MemPurgeSuccessful",
1181 [&](void* /*arg*/) { mempurge_count++; });
1182 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1183 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
1184 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1185
1186 std::string KEY1 = "ThisIsKey1";
1187 std::string KEY2 = "ThisIsKey2";
1188 std::string KEY3 = "ThisIsKey3";
1189 std::string KEY4 = "ThisIsKey4";
1190 std::string KEY5 = "ThisIsKey5";
1191 const std::string NOT_FOUND = "NOT_FOUND";
1192
1193 Random rnd(117);
1194 const size_t NUM_REPEAT = 100;
1195 const size_t RAND_VALUES_LENGTH = 10240;
1196
1197 std::string key, value, p_v1, p_v2, p_v3, p_v3b, p_v4, p_v5;
1198 int count = 0;
1199 const int EXPECTED_COUNT_FORLOOP = 3;
1200 const int EXPECTED_COUNT_END = 4;
1201
1202 ReadOptions ropt;
1203 ropt.pin_data = true;
1204 ropt.total_order_seek = true;
1205 Iterator* iter = nullptr;
1206
1207 // Insertion of of K-V pairs, multiple times.
1208 // Also insert DeleteRange
1209 for (size_t i = 0; i < NUM_REPEAT; i++) {
1210 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
1211 p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
1212 p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
1213 p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
1214 p_v3b = rnd.RandomString(RAND_VALUES_LENGTH);
1215 p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
1216 p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
1217 ASSERT_OK(Put(KEY1, p_v1));
1218 ASSERT_OK(Put(KEY2, p_v2));
1219 ASSERT_OK(Put(KEY3, p_v3));
1220 ASSERT_OK(Put(KEY4, p_v4));
1221 ASSERT_OK(Put(KEY5, p_v5));
1222 ASSERT_OK(Delete(KEY2));
1223 ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2,
1224 KEY4));
1225 ASSERT_OK(Put(KEY3, p_v3b));
1226 ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1,
1227 KEY3));
1228 ASSERT_OK(Delete(KEY1));
1229
1230 ASSERT_EQ(Get(KEY1), NOT_FOUND);
1231 ASSERT_EQ(Get(KEY2), NOT_FOUND);
1232 ASSERT_EQ(Get(KEY3), p_v3b);
1233 ASSERT_EQ(Get(KEY4), p_v4);
1234 ASSERT_EQ(Get(KEY5), p_v5);
1235
1236 iter = db_->NewIterator(ropt);
1237 iter->SeekToFirst();
1238 count = 0;
1239 for (; iter->Valid(); iter->Next()) {
1240 ASSERT_OK(iter->status());
1241 key = (iter->key()).ToString(false);
1242 value = (iter->value()).ToString(false);
1243 if (key.compare(KEY3) == 0)
1244 ASSERT_EQ(value, p_v3b);
1245 else if (key.compare(KEY4) == 0)
1246 ASSERT_EQ(value, p_v4);
1247 else if (key.compare(KEY5) == 0)
1248 ASSERT_EQ(value, p_v5);
1249 else
1250 ASSERT_EQ(value, NOT_FOUND);
1251 count++;
1252 }
1253
1254 // Expected count here is 3: KEY3, KEY4, KEY5.
1255 ASSERT_EQ(count, EXPECTED_COUNT_FORLOOP);
1256 if (iter) {
1257 delete iter;
1258 }
1259 }
1260
1261 // Check that there was at least one mempurge
1262 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
1263 // Check that there was no SST files created during flush.
1264 const uint32_t EXPECTED_SST_COUNT = 0;
1265
1266 EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
1267 EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
1268
1269 // Additional test for the iterator+memPurge.
1270 ASSERT_OK(Put(KEY2, p_v2));
1271 iter = db_->NewIterator(ropt);
1272 iter->SeekToFirst();
1273 ASSERT_OK(Put(KEY4, p_v4));
1274 count = 0;
1275 for (; iter->Valid(); iter->Next()) {
1276 ASSERT_OK(iter->status());
1277 key = (iter->key()).ToString(false);
1278 value = (iter->value()).ToString(false);
1279 if (key.compare(KEY2) == 0)
1280 ASSERT_EQ(value, p_v2);
1281 else if (key.compare(KEY3) == 0)
1282 ASSERT_EQ(value, p_v3b);
1283 else if (key.compare(KEY4) == 0)
1284 ASSERT_EQ(value, p_v4);
1285 else if (key.compare(KEY5) == 0)
1286 ASSERT_EQ(value, p_v5);
1287 else
1288 ASSERT_EQ(value, NOT_FOUND);
1289 count++;
1290 }
1291
1292 // Expected count here is 4: KEY2, KEY3, KEY4, KEY5.
1293 ASSERT_EQ(count, EXPECTED_COUNT_END);
1294 if (iter) delete iter;
1295
1296 Close();
1297 }
1298
1299 // Create a Compaction Fitler that will be invoked
1300 // at flush time and will update the value of a KV pair
1301 // if the key string is "lower" than the filter_key_ string.
1302 class ConditionalUpdateFilter : public CompactionFilter {
1303 public:
1304 explicit ConditionalUpdateFilter(const std::string* filtered_key)
1305 : filtered_key_(filtered_key) {}
1306 bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/,
1307 std::string* new_value, bool* value_changed) const override {
1308 // If key<filtered_key_, update the value of the KV-pair.
1309 if (key.compare(*filtered_key_) < 0) {
1310 assert(new_value != nullptr);
1311 *new_value = NEW_VALUE;
1312 *value_changed = true;
1313 }
1314 return false /*do not remove this KV-pair*/;
1315 }
1316
1317 const char* Name() const override { return "ConditionalUpdateFilter"; }
1318
1319 private:
1320 const std::string* filtered_key_;
1321 };
1322
1323 class ConditionalUpdateFilterFactory : public CompactionFilterFactory {
1324 public:
1325 explicit ConditionalUpdateFilterFactory(const Slice& filtered_key)
1326 : filtered_key_(filtered_key.ToString()) {}
1327
1328 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
1329 const CompactionFilter::Context& /*context*/) override {
1330 return std::unique_ptr<CompactionFilter>(
1331 new ConditionalUpdateFilter(&filtered_key_));
1332 }
1333
1334 const char* Name() const override { return "ConditionalUpdateFilterFactory"; }
1335
1336 bool ShouldFilterTableFileCreation(
1337 TableFileCreationReason reason) const override {
1338 // This compaction filter will be invoked
1339 // at flush time (and therefore at MemPurge time).
1340 return (reason == TableFileCreationReason::kFlush);
1341 }
1342
1343 private:
1344 std::string filtered_key_;
1345 };
1346
1347 TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
1348 Options options = CurrentOptions();
1349
1350 std::string KEY1 = "ThisIsKey1";
1351 std::string KEY2 = "ThisIsKey2";
1352 std::string KEY3 = "ThisIsKey3";
1353 std::string KEY4 = "ThisIsKey4";
1354 std::string KEY5 = "ThisIsKey5";
1355 std::string KEY6 = "ThisIsKey6";
1356 std::string KEY7 = "ThisIsKey7";
1357 std::string KEY8 = "ThisIsKey8";
1358 std::string KEY9 = "ThisIsKey9";
1359 const std::string NOT_FOUND = "NOT_FOUND";
1360
1361 options.statistics = CreateDBStatistics();
1362 options.statistics->set_stats_level(StatsLevel::kAll);
1363 options.create_if_missing = true;
1364 options.compression = kNoCompression;
1365 options.inplace_update_support = false;
1366 options.allow_concurrent_memtable_write = true;
1367 #ifndef ROCKSDB_LITE
1368 TestFlushListener* listener = new TestFlushListener(options.env, this);
1369 options.listeners.emplace_back(listener);
1370 #endif // !ROCKSDB_LITE
1371 // Create a ConditionalUpdate compaction filter
1372 // that will update all the values of the KV pairs
1373 // where the keys are "lower" than KEY4.
1374 options.compaction_filter_factory =
1375 std::make_shared<ConditionalUpdateFilterFactory>(KEY4);
1376
1377 // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
1378 options.write_buffer_size = 1 << 20;
1379 // Activate the MemPurge prototype.
1380 options.experimental_mempurge_threshold = 26.55;
1381
1382 ASSERT_OK(TryReopen(options));
1383
1384 std::atomic<uint32_t> mempurge_count{0};
1385 std::atomic<uint32_t> sst_count{0};
1386 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1387 "DBImpl::FlushJob:MemPurgeSuccessful",
1388 [&](void* /*arg*/) { mempurge_count++; });
1389 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1390 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
1391 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1392
1393 Random rnd(53);
1394 const size_t NUM_REPEAT = 1000;
1395 const size_t RAND_VALUES_LENGTH = 10240;
1396 std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9;
1397
1398 p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
1399 p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
1400 p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
1401 p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
1402 p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
1403 ASSERT_OK(Put(KEY1, p_v1));
1404 ASSERT_OK(Put(KEY2, p_v2));
1405 ASSERT_OK(Put(KEY3, p_v3));
1406 ASSERT_OK(Put(KEY4, p_v4));
1407 ASSERT_OK(Put(KEY5, p_v5));
1408 ASSERT_OK(Delete(KEY1));
1409
1410 // Insertion of of K-V pairs, multiple times.
1411 for (size_t i = 0; i < NUM_REPEAT; i++) {
1412 // Create value strings of arbitrary
1413 // length RAND_VALUES_LENGTH bytes.
1414 p_v6 = rnd.RandomString(RAND_VALUES_LENGTH);
1415 p_v7 = rnd.RandomString(RAND_VALUES_LENGTH);
1416 p_v8 = rnd.RandomString(RAND_VALUES_LENGTH);
1417 p_v9 = rnd.RandomString(RAND_VALUES_LENGTH);
1418 ASSERT_OK(Put(KEY6, p_v6));
1419 ASSERT_OK(Put(KEY7, p_v7));
1420 ASSERT_OK(Put(KEY8, p_v8));
1421 ASSERT_OK(Put(KEY9, p_v9));
1422
1423 ASSERT_OK(Delete(KEY7));
1424 }
1425
1426 // Check that there was at least one mempurge
1427 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
1428 // Check that there was no SST files created during flush.
1429 const uint32_t EXPECTED_SST_COUNT = 0;
1430
1431 EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
1432 EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
1433
1434 // Verify that the ConditionalUpdateCompactionFilter
1435 // updated the values of KEY2 and KEY3, and not KEY4 and KEY5.
1436 ASSERT_EQ(Get(KEY1), NOT_FOUND);
1437 ASSERT_EQ(Get(KEY2), NEW_VALUE);
1438 ASSERT_EQ(Get(KEY3), NEW_VALUE);
1439 ASSERT_EQ(Get(KEY4), p_v4);
1440 ASSERT_EQ(Get(KEY5), p_v5);
1441 }
1442
1443 TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) {
1444 Options options = CurrentOptions();
1445
1446 options.statistics = CreateDBStatistics();
1447 options.statistics->set_stats_level(StatsLevel::kAll);
1448 options.create_if_missing = true;
1449 options.compression = kNoCompression;
1450 options.inplace_update_support = false;
1451 options.allow_concurrent_memtable_write = true;
1452
1453 // Enforce size of a single MemTable to 128KB.
1454 options.write_buffer_size = 128 << 10;
1455 // Activate the MemPurge prototype
1456 // (values >1.0 are equivalent to 1.0).
1457 options.experimental_mempurge_threshold = 2.5;
1458
1459 ASSERT_OK(TryReopen(options));
1460
1461 const size_t KVSIZE = 10;
1462
1463 do {
1464 CreateAndReopenWithCF({"pikachu"}, options);
1465 ASSERT_OK(Put(1, "foo", "v1"));
1466 ASSERT_OK(Put(1, "baz", "v5"));
1467
1468 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1469 ASSERT_EQ("v1", Get(1, "foo"));
1470
1471 ASSERT_EQ("v1", Get(1, "foo"));
1472 ASSERT_EQ("v5", Get(1, "baz"));
1473 ASSERT_OK(Put(0, "bar", "v2"));
1474 ASSERT_OK(Put(1, "bar", "v2"));
1475 ASSERT_OK(Put(1, "foo", "v3"));
1476 std::atomic<uint32_t> mempurge_count{0};
1477 std::atomic<uint32_t> sst_count{0};
1478 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1479 "DBImpl::FlushJob:MemPurgeSuccessful",
1480 [&](void* /*arg*/) { mempurge_count++; });
1481 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1482 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
1483 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1484
1485 std::vector<std::string> keys;
1486 for (size_t k = 0; k < KVSIZE; k++) {
1487 keys.push_back("IamKey" + std::to_string(k));
1488 }
1489
1490 std::string RNDKEY, RNDVALUE;
1491 const std::string NOT_FOUND = "NOT_FOUND";
1492
1493 // Heavy overwrite workload,
1494 // more than would fit in maximum allowed memtables.
1495 Random rnd(719);
1496 const size_t NUM_REPEAT = 100;
1497 const size_t RAND_KEY_LENGTH = 4096;
1498 const size_t RAND_VALUES_LENGTH = 1024;
1499 std::vector<std::string> values_default(KVSIZE), values_pikachu(KVSIZE);
1500
1501 // Insert a very first set of keys that will be
1502 // mempurged at least once.
1503 for (size_t k = 0; k < KVSIZE / 2; k++) {
1504 values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH);
1505 values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH);
1506 }
1507
1508 // Insert keys[0:KVSIZE/2] to
1509 // both 'default' and 'pikachu' CFs.
1510 for (size_t k = 0; k < KVSIZE / 2; k++) {
1511 ASSERT_OK(Put(0, keys[k], values_default[k]));
1512 ASSERT_OK(Put(1, keys[k], values_pikachu[k]));
1513 }
1514
1515 // Check that the insertion was seamless.
1516 for (size_t k = 0; k < KVSIZE / 2; k++) {
1517 ASSERT_EQ(Get(0, keys[k]), values_default[k]);
1518 ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
1519 }
1520
1521 // Insertion of of K-V pairs, multiple times (overwrites)
1522 // into 'default' CF. Will trigger mempurge.
1523 for (size_t j = 0; j < NUM_REPEAT; j++) {
1524 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
1525 for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
1526 values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH);
1527 }
1528
1529 // Insert K-V into default CF.
1530 for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
1531 ASSERT_OK(Put(0, keys[k], values_default[k]));
1532 }
1533
1534 // Check key validity, for all keys, both in
1535 // default and pikachu CFs.
1536 for (size_t k = 0; k < KVSIZE; k++) {
1537 ASSERT_EQ(Get(0, keys[k]), values_default[k]);
1538 }
1539 // Note that at this point, only keys[0:KVSIZE/2]
1540 // have been inserted into Pikachu.
1541 for (size_t k = 0; k < KVSIZE / 2; k++) {
1542 ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
1543 }
1544 }
1545
1546 // Insertion of of K-V pairs, multiple times (overwrites)
1547 // into 'pikachu' CF. Will trigger mempurge.
1548 // Check that we keep the older logs for 'default' imm().
1549 for (size_t j = 0; j < NUM_REPEAT; j++) {
1550 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
1551 for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
1552 values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH);
1553 }
1554
1555 // Insert K-V into pikachu CF.
1556 for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
1557 ASSERT_OK(Put(1, keys[k], values_pikachu[k]));
1558 }
1559
1560 // Check key validity, for all keys,
1561 // both in default and pikachu.
1562 for (size_t k = 0; k < KVSIZE; k++) {
1563 ASSERT_EQ(Get(0, keys[k]), values_default[k]);
1564 ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
1565 }
1566 }
1567
1568 // Check that there was at least one mempurge
1569 const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
1570 // Check that there was no SST files created during flush.
1571 const uint32_t EXPECTED_SST_COUNT = 0;
1572
1573 EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
1574 if (options.experimental_mempurge_threshold ==
1575 std::numeric_limits<double>::max()) {
1576 EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
1577 }
1578
1579 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1580 // Check that there was no data corruption anywhere,
1581 // not in 'default' nor in 'Pikachu' CFs.
1582 ASSERT_EQ("v3", Get(1, "foo"));
1583 ASSERT_OK(Put(1, "foo", "v4"));
1584 ASSERT_EQ("v4", Get(1, "foo"));
1585 ASSERT_EQ("v2", Get(1, "bar"));
1586 ASSERT_EQ("v5", Get(1, "baz"));
1587 // Check keys in 'Default' and 'Pikachu'.
1588 // keys[0:KVSIZE/2] were for sure contained
1589 // in the imm() at Reopen/recovery time.
1590 for (size_t k = 0; k < KVSIZE; k++) {
1591 ASSERT_EQ(Get(0, keys[k]), values_default[k]);
1592 ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
1593 }
1594 // Insertion of random K-V pairs to trigger
1595 // a flush in the Pikachu CF.
1596 for (size_t j = 0; j < NUM_REPEAT; j++) {
1597 RNDKEY = rnd.RandomString(RAND_KEY_LENGTH);
1598 RNDVALUE = rnd.RandomString(RAND_VALUES_LENGTH);
1599 ASSERT_OK(Put(1, RNDKEY, RNDVALUE));
1600 }
1601 // ASsert than there was at least one flush to storage.
1602 EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
1603 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1604 ASSERT_EQ("v4", Get(1, "foo"));
1605 ASSERT_EQ("v2", Get(1, "bar"));
1606 ASSERT_EQ("v5", Get(1, "baz"));
1607 // Since values in default are held in mutable mem()
1608 // and imm(), check if the flush in pikachu didn't
1609 // affect these values.
1610 for (size_t k = 0; k < KVSIZE; k++) {
1611 ASSERT_EQ(Get(0, keys[k]), values_default[k]);
1612 ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
1613 }
1614 ASSERT_EQ(Get(1, RNDKEY), RNDVALUE);
1615 } while (ChangeWalOptions());
1616 }
1617
1618 TEST_F(DBFlushTest, MemPurgeCorrectLogNumberAndSSTFileCreation) {
1619 // Before our bug fix, we noticed that when 2 memtables were
1620 // being flushed (with one memtable being the output of a
1621 // previous MemPurge and one memtable being a newly-sealed memtable),
1622 // the SST file created was not properly added to the DB version
1623 // (via the VersionEdit obj), leading to data loss (the SST file
1624 // was later being purged as an obsolete file).
1625 // Therefore, we reproduce this scenario to test our fix.
1626 Options options = CurrentOptions();
1627
1628 options.create_if_missing = true;
1629 options.compression = kNoCompression;
1630 options.inplace_update_support = false;
1631 options.allow_concurrent_memtable_write = true;
1632
1633 // Enforce size of a single MemTable to 1MB (64MB = 1048576 bytes).
1634 options.write_buffer_size = 1 << 20;
1635 // Activate the MemPurge prototype.
1636 options.experimental_mempurge_threshold = 1.0;
1637
1638 // Force to have more than one memtable to trigger a flush.
1639 // For some reason this option does not seem to be enforced,
1640 // so the following test is designed to make sure that we
1641 // are testing the correct test case.
1642 options.min_write_buffer_number_to_merge = 3;
1643 options.max_write_buffer_number = 5;
1644 options.max_write_buffer_size_to_maintain = 2 * (options.write_buffer_size);
1645 options.disable_auto_compactions = true;
1646 ASSERT_OK(TryReopen(options));
1647
1648 std::atomic<uint32_t> mempurge_count{0};
1649 std::atomic<uint32_t> sst_count{0};
1650 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1651 "DBImpl::FlushJob:MemPurgeSuccessful",
1652 [&](void* /*arg*/) { mempurge_count++; });
1653 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1654 "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
1655 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1656
1657 // Dummy variable used for the following callback function.
1658 uint64_t ZERO = 0;
1659 // We will first execute mempurge operations exclusively.
1660 // Therefore, when the first flush is triggered, we want to make
1661 // sure there is at least 2 memtables being flushed: one output
1662 // from a previous mempurge, and one newly sealed memtable.
1663 // This is when we observed in the past that some SST files created
1664 // were not properly added to the DB version (via the VersionEdit obj).
1665 std::atomic<uint64_t> num_memtable_at_first_flush(0);
1666 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1667 "FlushJob::WriteLevel0Table:num_memtables", [&](void* arg) {
1668 uint64_t* mems_size = reinterpret_cast<uint64_t*>(arg);
1669 // atomic_compare_exchange_strong sometimes updates the value
1670 // of ZERO (the "expected" object), so we make sure ZERO is indeed...
1671 // zero.
1672 ZERO = 0;
1673 std::atomic_compare_exchange_strong(&num_memtable_at_first_flush, &ZERO,
1674 *mems_size);
1675 });
1676
1677 const std::vector<std::string> KEYS = {
1678 "ThisIsKey1", "ThisIsKey2", "ThisIsKey3", "ThisIsKey4", "ThisIsKey5",
1679 "ThisIsKey6", "ThisIsKey7", "ThisIsKey8", "ThisIsKey9"};
1680 const std::string NOT_FOUND = "NOT_FOUND";
1681
1682 Random rnd(117);
1683 const uint64_t NUM_REPEAT_OVERWRITES = 100;
1684 const uint64_t NUM_RAND_INSERTS = 500;
1685 const uint64_t RAND_VALUES_LENGTH = 10240;
1686
1687 std::string key, value;
1688 std::vector<std::string> values(9, "");
1689
1690 // Keys used to check that no SST file disappeared.
1691 for (uint64_t k = 0; k < 5; k++) {
1692 values[k] = rnd.RandomString(RAND_VALUES_LENGTH);
1693 ASSERT_OK(Put(KEYS[k], values[k]));
1694 }
1695
1696 // Insertion of of K-V pairs, multiple times.
1697 // Trigger at least one mempurge and no SST file creation.
1698 for (size_t i = 0; i < NUM_REPEAT_OVERWRITES; i++) {
1699 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
1700 for (uint64_t k = 5; k < values.size(); k++) {
1701 values[k] = rnd.RandomString(RAND_VALUES_LENGTH);
1702 ASSERT_OK(Put(KEYS[k], values[k]));
1703 }
1704 // Check database consistency.
1705 for (uint64_t k = 0; k < values.size(); k++) {
1706 ASSERT_EQ(Get(KEYS[k]), values[k]);
1707 }
1708 }
1709
1710 // Check that there was at least one mempurge
1711 uint32_t expected_min_mempurge_count = 1;
1712 // Check that there was no SST files created during flush.
1713 uint32_t expected_sst_count = 0;
1714 EXPECT_GE(mempurge_count.load(), expected_min_mempurge_count);
1715 EXPECT_EQ(sst_count.load(), expected_sst_count);
1716
1717 // Trigger an SST file creation and no mempurge.
1718 for (size_t i = 0; i < NUM_RAND_INSERTS; i++) {
1719 key = rnd.RandomString(RAND_VALUES_LENGTH);
1720 // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
1721 value = rnd.RandomString(RAND_VALUES_LENGTH);
1722 ASSERT_OK(Put(key, value));
1723 // Check database consistency.
1724 for (uint64_t k = 0; k < values.size(); k++) {
1725 ASSERT_EQ(Get(KEYS[k]), values[k]);
1726 }
1727 ASSERT_EQ(Get(key), value);
1728 }
1729
1730 // Check that there was at least one SST files created during flush.
1731 expected_sst_count = 1;
1732 EXPECT_GE(sst_count.load(), expected_sst_count);
1733
1734 // Oddly enough, num_memtable_at_first_flush is not enforced to be
1735 // equal to min_write_buffer_number_to_merge. So by asserting that
1736 // the first SST file creation comes from one output memtable
1737 // from a previous mempurge, and one newly sealed memtable. This
1738 // is the scenario where we observed that some SST files created
1739 // were not properly added to the DB version before our bug fix.
1740 ASSERT_GE(num_memtable_at_first_flush.load(), 2);
1741
1742 // Check that no data was lost after SST file creation.
1743 for (uint64_t k = 0; k < values.size(); k++) {
1744 ASSERT_EQ(Get(KEYS[k]), values[k]);
1745 }
1746 // Extra check of database consistency.
1747 ASSERT_EQ(Get(key), value);
1748
1749 Close();
1750 }
1751
1752 TEST_P(DBFlushDirectIOTest, DirectIO) {
1753 Options options;
1754 options.create_if_missing = true;
1755 options.disable_auto_compactions = true;
1756 options.max_background_flushes = 2;
1757 options.use_direct_io_for_flush_and_compaction = GetParam();
1758 options.env = MockEnv::Create(Env::Default());
1759 SyncPoint::GetInstance()->SetCallBack(
1760 "BuildTable:create_file", [&](void* arg) {
1761 bool* use_direct_writes = static_cast<bool*>(arg);
1762 ASSERT_EQ(*use_direct_writes,
1763 options.use_direct_io_for_flush_and_compaction);
1764 });
1765
1766 SyncPoint::GetInstance()->EnableProcessing();
1767 Reopen(options);
1768 ASSERT_OK(Put("foo", "v"));
1769 FlushOptions flush_options;
1770 flush_options.wait = true;
1771 ASSERT_OK(dbfull()->Flush(flush_options));
1772 Destroy(options);
1773 delete options.env;
1774 }
1775
1776 TEST_F(DBFlushTest, FlushError) {
1777 Options options;
1778 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
1779 new FaultInjectionTestEnv(env_));
1780 options.write_buffer_size = 100;
1781 options.max_write_buffer_number = 4;
1782 options.min_write_buffer_number_to_merge = 3;
1783 options.disable_auto_compactions = true;
1784 options.env = fault_injection_env.get();
1785 Reopen(options);
1786
1787 ASSERT_OK(Put("key1", "value1"));
1788 ASSERT_OK(Put("key2", "value2"));
1789 fault_injection_env->SetFilesystemActive(false);
1790 Status s = dbfull()->TEST_SwitchMemtable();
1791 fault_injection_env->SetFilesystemActive(true);
1792 Destroy(options);
1793 ASSERT_NE(s, Status::OK());
1794 }
1795
1796 TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
1797 // Regression test for bug where manual flush hangs forever when the DB
1798 // is in read-only mode. Verify it now at least returns, despite failing.
1799 Options options;
1800 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
1801 new FaultInjectionTestEnv(env_));
1802 options.env = fault_injection_env.get();
1803 options.max_write_buffer_number = 2;
1804 Reopen(options);
1805
1806 // Trigger a first flush but don't let it run
1807 ASSERT_OK(db_->PauseBackgroundWork());
1808 ASSERT_OK(Put("key1", "value1"));
1809 FlushOptions flush_opts;
1810 flush_opts.wait = false;
1811 ASSERT_OK(db_->Flush(flush_opts));
1812
1813 // Write a key to the second memtable so we have something to flush later
1814 // after the DB is in read-only mode.
1815 ASSERT_OK(Put("key2", "value2"));
1816
1817 // Let the first flush continue, hit an error, and put the DB in read-only
1818 // mode.
1819 fault_injection_env->SetFilesystemActive(false);
1820 ASSERT_OK(db_->ContinueBackgroundWork());
1821 // We ingested the error to env, so the returned status is not OK.
1822 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
1823 #ifndef ROCKSDB_LITE
1824 uint64_t num_bg_errors;
1825 ASSERT_TRUE(
1826 db_->GetIntProperty(DB::Properties::kBackgroundErrors, &num_bg_errors));
1827 ASSERT_GT(num_bg_errors, 0);
1828 #endif // ROCKSDB_LITE
1829
1830 // In the bug scenario, triggering another flush would cause the second flush
1831 // to hang forever. After the fix we expect it to return an error.
1832 ASSERT_NOK(db_->Flush(FlushOptions()));
1833
1834 Close();
1835 }
1836
1837 TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
1838 Options options = CurrentOptions();
1839 options.create_if_missing = true;
1840 CreateAndReopenWithCF({"pikachu"}, options);
1841 SyncPoint::GetInstance()->DisableProcessing();
1842 SyncPoint::GetInstance()->LoadDependency(
1843 {{"DBImpl::FlushMemTable:AfterScheduleFlush",
1844 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
1845 {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
1846 "DBImpl::BackgroundCallFlush:start"},
1847 {"DBImpl::BackgroundCallFlush:start",
1848 "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
1849 SyncPoint::GetInstance()->EnableProcessing();
1850 ASSERT_EQ(2, handles_.size());
1851 ASSERT_OK(Put(1, "key", "value"));
1852 auto* cfd = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
1853 port::Thread drop_cf_thr([&]() {
1854 TEST_SYNC_POINT(
1855 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
1856 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
1857 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
1858 handles_.resize(1);
1859 TEST_SYNC_POINT(
1860 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
1861 });
1862 FlushOptions flush_opts;
1863 flush_opts.allow_write_stall = true;
1864 ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts));
1865 drop_cf_thr.join();
1866 Close();
1867 SyncPoint::GetInstance()->DisableProcessing();
1868 }
1869
1870 #ifndef ROCKSDB_LITE
1871 TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
1872 class TestListener : public EventListener {
1873 public:
1874 void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
1875 // There's only one key in each flush.
1876 ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
1877 ASSERT_NE(0, info.smallest_seqno);
1878 if (info.smallest_seqno == seq1) {
1879 // First flush completed
1880 ASSERT_FALSE(completed1);
1881 completed1 = true;
1882 CheckFlushResultCommitted(db, seq1);
1883 } else {
1884 // Second flush completed
1885 ASSERT_FALSE(completed2);
1886 completed2 = true;
1887 ASSERT_EQ(info.smallest_seqno, seq2);
1888 CheckFlushResultCommitted(db, seq2);
1889 }
1890 }
1891
1892 void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
1893 DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
1894 InstrumentedMutex* mutex = db_impl->mutex();
1895 mutex->Lock();
1896 auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
1897 db->DefaultColumnFamily())
1898 ->cfd();
1899 ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
1900 mutex->Unlock();
1901 }
1902
1903 std::atomic<SequenceNumber> seq1{0};
1904 std::atomic<SequenceNumber> seq2{0};
1905 std::atomic<bool> completed1{false};
1906 std::atomic<bool> completed2{false};
1907 };
1908 std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();
1909
1910 SyncPoint::GetInstance()->LoadDependency(
1911 {{"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
1912 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
1913 {"DBImpl::FlushMemTableToOutputFile:Finish",
1914 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
1915 SyncPoint::GetInstance()->SetCallBack(
1916 "FlushJob::WriteLevel0Table", [&listener](void* arg) {
1917 // Wait for the second flush finished, out of mutex.
1918 auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
1919 if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
1920 TEST_SYNC_POINT(
1921 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
1922 "WaitSecond");
1923 }
1924 });
1925
1926 Options options = CurrentOptions();
1927 options.create_if_missing = true;
1928 options.listeners.push_back(listener);
1929 // Setting max_flush_jobs = max_background_jobs / 4 = 2.
1930 options.max_background_jobs = 8;
1931 // Allow 2 immutable memtables.
1932 options.max_write_buffer_number = 3;
1933 Reopen(options);
1934 SyncPoint::GetInstance()->EnableProcessing();
1935 ASSERT_OK(Put("foo", "v"));
1936 listener->seq1 = db_->GetLatestSequenceNumber();
1937 // t1 will wait for the second flush complete before committing flush result.
1938 auto t1 = port::Thread([&]() {
1939 // flush_opts.wait = true
1940 ASSERT_OK(db_->Flush(FlushOptions()));
1941 });
1942 // Wait for first flush started.
1943 TEST_SYNC_POINT(
1944 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
1945 // The second flush will exit early without commit its result. The work
1946 // is delegated to the first flush.
1947 ASSERT_OK(Put("bar", "v"));
1948 listener->seq2 = db_->GetLatestSequenceNumber();
1949 FlushOptions flush_opts;
1950 flush_opts.wait = false;
1951 ASSERT_OK(db_->Flush(flush_opts));
1952 t1.join();
1953 // Ensure background work is fully finished including listener callbacks
1954 // before accessing listener state.
1955 ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
1956 ASSERT_TRUE(listener->completed1);
1957 ASSERT_TRUE(listener->completed2);
1958 SyncPoint::GetInstance()->DisableProcessing();
1959 SyncPoint::GetInstance()->ClearAllCallBacks();
1960 }
1961 #endif // !ROCKSDB_LITE
1962
1963 TEST_F(DBFlushTest, FlushWithBlob) {
1964 constexpr uint64_t min_blob_size = 10;
1965
1966 Options options;
1967 options.enable_blob_files = true;
1968 options.min_blob_size = min_blob_size;
1969 options.disable_auto_compactions = true;
1970 options.env = env_;
1971
1972 Reopen(options);
1973
1974 constexpr char short_value[] = "short";
1975 static_assert(sizeof(short_value) - 1 < min_blob_size,
1976 "short_value too long");
1977
1978 constexpr char long_value[] = "long_value";
1979 static_assert(sizeof(long_value) - 1 >= min_blob_size,
1980 "long_value too short");
1981
1982 ASSERT_OK(Put("key1", short_value));
1983 ASSERT_OK(Put("key2", long_value));
1984
1985 ASSERT_OK(Flush());
1986
1987 ASSERT_EQ(Get("key1"), short_value);
1988 ASSERT_EQ(Get("key2"), long_value);
1989
1990 VersionSet* const versions = dbfull()->GetVersionSet();
1991 assert(versions);
1992
1993 ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
1994 assert(cfd);
1995
1996 Version* const current = cfd->current();
1997 assert(current);
1998
1999 const VersionStorageInfo* const storage_info = current->storage_info();
2000 assert(storage_info);
2001
2002 const auto& l0_files = storage_info->LevelFiles(0);
2003 ASSERT_EQ(l0_files.size(), 1);
2004
2005 const FileMetaData* const table_file = l0_files[0];
2006 assert(table_file);
2007
2008 const auto& blob_files = storage_info->GetBlobFiles();
2009 ASSERT_EQ(blob_files.size(), 1);
2010
2011 const auto& blob_file = blob_files.front();
2012 assert(blob_file);
2013
2014 ASSERT_EQ(table_file->smallest.user_key(), "key1");
2015 ASSERT_EQ(table_file->largest.user_key(), "key2");
2016 ASSERT_EQ(table_file->fd.smallest_seqno, 1);
2017 ASSERT_EQ(table_file->fd.largest_seqno, 2);
2018 ASSERT_EQ(table_file->oldest_blob_file_number,
2019 blob_file->GetBlobFileNumber());
2020
2021 ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
2022
2023 #ifndef ROCKSDB_LITE
2024 const InternalStats* const internal_stats = cfd->internal_stats();
2025 assert(internal_stats);
2026
2027 const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
2028 ASSERT_FALSE(compaction_stats.empty());
2029 ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize());
2030 ASSERT_EQ(compaction_stats[0].bytes_written_blob,
2031 blob_file->GetTotalBlobBytes());
2032 ASSERT_EQ(compaction_stats[0].num_output_files, 1);
2033 ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1);
2034
2035 const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
2036 ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
2037 compaction_stats[0].bytes_written +
2038 compaction_stats[0].bytes_written_blob);
2039 #endif // ROCKSDB_LITE
2040 }
2041
2042 TEST_F(DBFlushTest, FlushWithChecksumHandoff1) {
2043 if (mem_env_ || encrypted_env_) {
2044 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
2045 return;
2046 }
2047 std::shared_ptr<FaultInjectionTestFS> fault_fs(
2048 new FaultInjectionTestFS(FileSystem::Default()));
2049 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
2050 Options options = CurrentOptions();
2051 options.write_buffer_size = 100;
2052 options.max_write_buffer_number = 4;
2053 options.min_write_buffer_number_to_merge = 3;
2054 options.disable_auto_compactions = true;
2055 options.env = fault_fs_env.get();
2056 options.checksum_handoff_file_types.Add(FileType::kTableFile);
2057 Reopen(options);
2058
2059 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
2060 ASSERT_OK(Put("key1", "value1"));
2061 ASSERT_OK(Put("key2", "value2"));
2062 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
2063
2064 // The hash does not match, write fails
2065 // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
2066 // Since the file system returns IOStatus::Corruption, it is an
2067 // unrecoverable error.
2068 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2069 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
2070 });
2071 ASSERT_OK(Put("key3", "value3"));
2072 ASSERT_OK(Put("key4", "value4"));
2073 SyncPoint::GetInstance()->EnableProcessing();
2074 Status s = Flush();
2075 ASSERT_EQ(s.severity(),
2076 ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
2077 SyncPoint::GetInstance()->DisableProcessing();
2078 Destroy(options);
2079 Reopen(options);
2080
2081 // The file system does not support checksum handoff. The check
2082 // will be ignored.
2083 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
2084 ASSERT_OK(Put("key5", "value5"));
2085 ASSERT_OK(Put("key6", "value6"));
2086 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
2087
2088 // Each write will be similated as corrupted.
2089 // Since the file system returns IOStatus::Corruption, it is an
2090 // unrecoverable error.
2091 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
2092 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2093 fault_fs->IngestDataCorruptionBeforeWrite();
2094 });
2095 ASSERT_OK(Put("key7", "value7"));
2096 ASSERT_OK(Put("key8", "value8"));
2097 SyncPoint::GetInstance()->EnableProcessing();
2098 s = Flush();
2099 ASSERT_EQ(s.severity(),
2100 ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
2101 SyncPoint::GetInstance()->DisableProcessing();
2102
2103 Destroy(options);
2104 }
2105
2106 TEST_F(DBFlushTest, FlushWithChecksumHandoff2) {
2107 if (mem_env_ || encrypted_env_) {
2108 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
2109 return;
2110 }
2111 std::shared_ptr<FaultInjectionTestFS> fault_fs(
2112 new FaultInjectionTestFS(FileSystem::Default()));
2113 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
2114 Options options = CurrentOptions();
2115 options.write_buffer_size = 100;
2116 options.max_write_buffer_number = 4;
2117 options.min_write_buffer_number_to_merge = 3;
2118 options.disable_auto_compactions = true;
2119 options.env = fault_fs_env.get();
2120 Reopen(options);
2121
2122 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
2123 ASSERT_OK(Put("key1", "value1"));
2124 ASSERT_OK(Put("key2", "value2"));
2125 ASSERT_OK(Flush());
2126
2127 // options is not set, the checksum handoff will not be triggered
2128 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2129 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
2130 });
2131 ASSERT_OK(Put("key3", "value3"));
2132 ASSERT_OK(Put("key4", "value4"));
2133 SyncPoint::GetInstance()->EnableProcessing();
2134 ASSERT_OK(Flush());
2135 SyncPoint::GetInstance()->DisableProcessing();
2136 Destroy(options);
2137 Reopen(options);
2138
2139 // The file system does not support checksum handoff. The check
2140 // will be ignored.
2141 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
2142 ASSERT_OK(Put("key5", "value5"));
2143 ASSERT_OK(Put("key6", "value6"));
2144 ASSERT_OK(Flush());
2145
2146 // options is not set, the checksum handoff will not be triggered
2147 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
2148 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2149 fault_fs->IngestDataCorruptionBeforeWrite();
2150 });
2151 ASSERT_OK(Put("key7", "value7"));
2152 ASSERT_OK(Put("key8", "value8"));
2153 SyncPoint::GetInstance()->EnableProcessing();
2154 ASSERT_OK(Flush());
2155 SyncPoint::GetInstance()->DisableProcessing();
2156
2157 Destroy(options);
2158 }
2159
2160 TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest1) {
2161 if (mem_env_ || encrypted_env_) {
2162 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
2163 return;
2164 }
2165 std::shared_ptr<FaultInjectionTestFS> fault_fs(
2166 new FaultInjectionTestFS(FileSystem::Default()));
2167 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
2168 Options options = CurrentOptions();
2169 options.write_buffer_size = 100;
2170 options.max_write_buffer_number = 4;
2171 options.min_write_buffer_number_to_merge = 3;
2172 options.disable_auto_compactions = true;
2173 options.env = fault_fs_env.get();
2174 options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
2175 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
2176 Reopen(options);
2177
2178 ASSERT_OK(Put("key1", "value1"));
2179 ASSERT_OK(Put("key2", "value2"));
2180 ASSERT_OK(Flush());
2181
2182 // The hash does not match, write fails
2183 // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
2184 // Since the file system returns IOStatus::Corruption, it is mapped to
2185 // kFatalError error.
2186 ASSERT_OK(Put("key3", "value3"));
2187 SyncPoint::GetInstance()->SetCallBack(
2188 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
2189 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
2190 });
2191 ASSERT_OK(Put("key3", "value3"));
2192 ASSERT_OK(Put("key4", "value4"));
2193 SyncPoint::GetInstance()->EnableProcessing();
2194 Status s = Flush();
2195 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2196 SyncPoint::GetInstance()->DisableProcessing();
2197 Destroy(options);
2198 }
2199
2200 TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest2) {
2201 if (mem_env_ || encrypted_env_) {
2202 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
2203 return;
2204 }
2205 std::shared_ptr<FaultInjectionTestFS> fault_fs(
2206 new FaultInjectionTestFS(FileSystem::Default()));
2207 std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
2208 Options options = CurrentOptions();
2209 options.write_buffer_size = 100;
2210 options.max_write_buffer_number = 4;
2211 options.min_write_buffer_number_to_merge = 3;
2212 options.disable_auto_compactions = true;
2213 options.env = fault_fs_env.get();
2214 options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
2215 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
2216 Reopen(options);
2217 // The file system does not support checksum handoff. The check
2218 // will be ignored.
2219 ASSERT_OK(Put("key5", "value5"));
2220 ASSERT_OK(Put("key6", "value6"));
2221 ASSERT_OK(Flush());
2222
2223 // Each write will be similated as corrupted.
2224 // Since the file system returns IOStatus::Corruption, it is mapped to
2225 // kFatalError error.
2226 fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
2227 SyncPoint::GetInstance()->SetCallBack(
2228 "VersionSet::LogAndApply:WriteManifest",
2229 [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); });
2230 ASSERT_OK(Put("key7", "value7"));
2231 ASSERT_OK(Put("key8", "value8"));
2232 SyncPoint::GetInstance()->EnableProcessing();
2233 Status s = Flush();
2234 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2235 SyncPoint::GetInstance()->DisableProcessing();
2236
2237 Destroy(options);
2238 }
2239
2240 TEST_F(DBFlushTest, PickRightMemtables) {
2241 Options options = CurrentOptions();
2242 DestroyAndReopen(options);
2243 options.create_if_missing = true;
2244
2245 const std::string test_cf_name = "test_cf";
2246 options.max_write_buffer_number = 128;
2247 CreateColumnFamilies({test_cf_name}, options);
2248
2249 Close();
2250
2251 ReopenWithColumnFamilies({kDefaultColumnFamilyName, test_cf_name}, options);
2252
2253 ASSERT_OK(db_->Put(WriteOptions(), "key", "value"));
2254
2255 ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "key", "value"));
2256
2257 SyncPoint::GetInstance()->DisableProcessing();
2258 SyncPoint::GetInstance()->ClearAllCallBacks();
2259 SyncPoint::GetInstance()->SetCallBack(
2260 "DBImpl::SyncClosedLogs:BeforeReLock", [&](void* /*arg*/) {
2261 ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "what", "v"));
2262 auto* cfhi =
2263 static_cast_with_check<ColumnFamilyHandleImpl>(handles_[1]);
2264 assert(cfhi);
2265 ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfhi->cfd()));
2266 });
2267 SyncPoint::GetInstance()->SetCallBack(
2268 "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", [&](void* arg) {
2269 auto* job = reinterpret_cast<FlushJob*>(arg);
2270 assert(job);
2271 const auto& mems = job->GetMemTables();
2272 assert(mems.size() == 1);
2273 assert(mems[0]);
2274 ASSERT_EQ(1, mems[0]->GetID());
2275 });
2276 SyncPoint::GetInstance()->EnableProcessing();
2277
2278 ASSERT_OK(db_->Flush(FlushOptions(), handles_[1]));
2279
2280 SyncPoint::GetInstance()->DisableProcessing();
2281 SyncPoint::GetInstance()->ClearAllCallBacks();
2282 }
2283
2284 class DBFlushTestBlobError : public DBFlushTest,
2285 public testing::WithParamInterface<std::string> {
2286 public:
2287 DBFlushTestBlobError() : sync_point_(GetParam()) {}
2288
2289 std::string sync_point_;
2290 };
2291
2292 INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError,
2293 ::testing::ValuesIn(std::vector<std::string>{
2294 "BlobFileBuilder::WriteBlobToFile:AddRecord",
2295 "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
2296
2297 TEST_P(DBFlushTestBlobError, FlushError) {
2298 Options options;
2299 options.enable_blob_files = true;
2300 options.disable_auto_compactions = true;
2301 options.env = env_;
2302
2303 Reopen(options);
2304
2305 ASSERT_OK(Put("key", "blob"));
2306
2307 SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
2308 Status* const s = static_cast<Status*>(arg);
2309 assert(s);
2310
2311 (*s) = Status::IOError(sync_point_);
2312 });
2313 SyncPoint::GetInstance()->EnableProcessing();
2314
2315 ASSERT_NOK(Flush());
2316
2317 SyncPoint::GetInstance()->DisableProcessing();
2318 SyncPoint::GetInstance()->ClearAllCallBacks();
2319
2320 VersionSet* const versions = dbfull()->GetVersionSet();
2321 assert(versions);
2322
2323 ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
2324 assert(cfd);
2325
2326 Version* const current = cfd->current();
2327 assert(current);
2328
2329 const VersionStorageInfo* const storage_info = current->storage_info();
2330 assert(storage_info);
2331
2332 const auto& l0_files = storage_info->LevelFiles(0);
2333 ASSERT_TRUE(l0_files.empty());
2334
2335 const auto& blob_files = storage_info->GetBlobFiles();
2336 ASSERT_TRUE(blob_files.empty());
2337
2338 // Make sure the files generated by the failed job have been deleted
2339 std::vector<std::string> files;
2340 ASSERT_OK(env_->GetChildren(dbname_, &files));
2341 for (const auto& file : files) {
2342 uint64_t number = 0;
2343 FileType type = kTableFile;
2344
2345 if (!ParseFileName(file, &number, &type)) {
2346 continue;
2347 }
2348
2349 ASSERT_NE(type, kTableFile);
2350 ASSERT_NE(type, kBlobFile);
2351 }
2352
2353 #ifndef ROCKSDB_LITE
2354 const InternalStats* const internal_stats = cfd->internal_stats();
2355 assert(internal_stats);
2356
2357 const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
2358 ASSERT_FALSE(compaction_stats.empty());
2359
2360 if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
2361 ASSERT_EQ(compaction_stats[0].bytes_written, 0);
2362 ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0);
2363 ASSERT_EQ(compaction_stats[0].num_output_files, 0);
2364 ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0);
2365 } else {
2366 // SST file writing succeeded; blob file writing failed (during Finish)
2367 ASSERT_GT(compaction_stats[0].bytes_written, 0);
2368 ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0);
2369 ASSERT_EQ(compaction_stats[0].num_output_files, 1);
2370 ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0);
2371 }
2372
2373 const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
2374 ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
2375 compaction_stats[0].bytes_written +
2376 compaction_stats[0].bytes_written_blob);
2377 #endif // ROCKSDB_LITE
2378 }
2379
2380 #ifndef ROCKSDB_LITE
2381 TEST_F(DBFlushTest, TombstoneVisibleInSnapshot) {
2382 class SimpleTestFlushListener : public EventListener {
2383 public:
2384 explicit SimpleTestFlushListener(DBFlushTest* _test) : test_(_test) {}
2385 ~SimpleTestFlushListener() override {}
2386
2387 void OnFlushBegin(DB* db, const FlushJobInfo& info) override {
2388 ASSERT_EQ(static_cast<uint32_t>(0), info.cf_id);
2389
2390 ASSERT_OK(db->Delete(WriteOptions(), "foo"));
2391 snapshot_ = db->GetSnapshot();
2392 ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
2393
2394 auto* dbimpl = static_cast_with_check<DBImpl>(db);
2395 assert(dbimpl);
2396
2397 ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
2398 auto* cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(cfh);
2399 assert(cfhi);
2400 ASSERT_OK(dbimpl->TEST_SwitchMemtable(cfhi->cfd()));
2401 }
2402
2403 DBFlushTest* test_ = nullptr;
2404 const Snapshot* snapshot_ = nullptr;
2405 };
2406
2407 Options options = CurrentOptions();
2408 options.create_if_missing = true;
2409 auto* listener = new SimpleTestFlushListener(this);
2410 options.listeners.emplace_back(listener);
2411 DestroyAndReopen(options);
2412
2413 ASSERT_OK(db_->Put(WriteOptions(), "foo", "value0"));
2414
2415 ManagedSnapshot snapshot_guard(db_);
2416
2417 ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
2418 ASSERT_OK(db_->Flush(FlushOptions(), default_cf));
2419
2420 const Snapshot* snapshot = listener->snapshot_;
2421 assert(snapshot);
2422
2423 ReadOptions read_opts;
2424 read_opts.snapshot = snapshot;
2425
2426 // Using snapshot should not see "foo".
2427 {
2428 std::string value;
2429 Status s = db_->Get(read_opts, "foo", &value);
2430 ASSERT_TRUE(s.IsNotFound());
2431 }
2432
2433 db_->ReleaseSnapshot(snapshot);
2434 }
2435
2436 TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
2437 Options options = CurrentOptions();
2438 options.create_if_missing = true;
2439 options.allow_2pc = true;
2440 options.atomic_flush = GetParam();
2441 // 64MB so that memtable flush won't be trigger by the small writes.
2442 options.write_buffer_size = (static_cast<size_t>(64) << 20);
2443
2444 // Destroy the DB to recreate as a TransactionDB.
2445 Close();
2446 Destroy(options, true);
2447
2448 // Create a TransactionDB.
2449 TransactionDB* txn_db = nullptr;
2450 TransactionDBOptions txn_db_opts;
2451 txn_db_opts.write_policy = TxnDBWritePolicy::WRITE_COMMITTED;
2452 ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db));
2453 ASSERT_NE(txn_db, nullptr);
2454 db_ = txn_db;
2455
2456 // Create two more columns other than default CF.
2457 std::vector<std::string> cfs = {"puppy", "kitty"};
2458 CreateColumnFamilies(cfs, options);
2459 ASSERT_EQ(handles_.size(), 2);
2460 ASSERT_EQ(handles_[0]->GetName(), cfs[0]);
2461 ASSERT_EQ(handles_[1]->GetName(), cfs[1]);
2462 const size_t kNumCfToFlush = options.atomic_flush ? 2 : 1;
2463
2464 WriteOptions wopts;
2465 TransactionOptions txn_opts;
2466 // txn1 only prepare, but does not commit.
2467 // The WAL containing the prepared but uncommitted data must be kept.
2468 Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
2469 // txn2 not only prepare, but also commit.
2470 Transaction* txn2 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
2471 ASSERT_NE(txn1, nullptr);
2472 ASSERT_NE(txn2, nullptr);
2473 for (size_t i = 0; i < kNumCfToFlush; i++) {
2474 ASSERT_OK(txn1->Put(handles_[i], "k1", "v1"));
2475 ASSERT_OK(txn2->Put(handles_[i], "k2", "v2"));
2476 }
2477 // A txn must be named before prepare.
2478 ASSERT_OK(txn1->SetName("txn1"));
2479 ASSERT_OK(txn2->SetName("txn2"));
2480 // Prepare writes to WAL, but not to memtable. (WriteCommitted)
2481 ASSERT_OK(txn1->Prepare());
2482 ASSERT_OK(txn2->Prepare());
2483 // Commit writes to memtable.
2484 ASSERT_OK(txn2->Commit());
2485 delete txn1;
2486 delete txn2;
2487
2488 // There are still data in memtable not flushed.
2489 // But since data is small enough to reside in the active memtable,
2490 // there are no immutable memtable.
2491 for (size_t i = 0; i < kNumCfToFlush; i++) {
2492 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
2493 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
2494 ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
2495 }
2496
2497 // Atomic flush memtables,
2498 // the min log with prepared data should be written to MANIFEST.
2499 std::vector<ColumnFamilyHandle*> cfs_to_flush(kNumCfToFlush);
2500 for (size_t i = 0; i < kNumCfToFlush; i++) {
2501 cfs_to_flush[i] = handles_[i];
2502 }
2503 ASSERT_OK(txn_db->Flush(FlushOptions(), cfs_to_flush));
2504
2505 // There are no remaining data in memtable after flush.
2506 for (size_t i = 0; i < kNumCfToFlush; i++) {
2507 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
2508 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
2509 ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
2510 ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush);
2511 }
2512
2513 // The recovered min log number with prepared data should be non-zero.
2514 // In 2pc mode, MinLogNumberToKeep returns the
2515 // VersionSet::min_log_number_to_keep recovered from MANIFEST, if it's 0,
2516 // it means atomic flush didn't write the min_log_number_to_keep to MANIFEST.
2517 cfs.push_back(kDefaultColumnFamilyName);
2518 ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
2519 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_);
2520 ASSERT_TRUE(db_impl->allow_2pc());
2521 ASSERT_NE(db_impl->MinLogNumberToKeep(), 0);
2522 }
2523 #endif // ROCKSDB_LITE
2524
2525 TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
2526 Options options = CurrentOptions();
2527 options.create_if_missing = true;
2528 options.atomic_flush = GetParam();
2529 options.write_buffer_size = (static_cast<size_t>(64) << 20);
2530
2531 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
2532 size_t num_cfs = handles_.size();
2533 ASSERT_EQ(3, num_cfs);
2534 WriteOptions wopts;
2535 wopts.disableWAL = true;
2536 for (size_t i = 0; i != num_cfs; ++i) {
2537 ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
2538 }
2539
2540 for (size_t i = 0; i != num_cfs; ++i) {
2541 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
2542 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
2543 ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
2544 }
2545
2546 std::vector<int> cf_ids;
2547 for (size_t i = 0; i != num_cfs; ++i) {
2548 cf_ids.emplace_back(static_cast<int>(i));
2549 }
2550 ASSERT_OK(Flush(cf_ids));
2551
2552 for (size_t i = 0; i != num_cfs; ++i) {
2553 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
2554 ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush);
2555 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
2556 ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
2557 }
2558 }
2559
2560 TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) {
2561 Options options = CurrentOptions();
2562 options.create_if_missing = true;
2563 options.atomic_flush = GetParam();
2564 options.write_buffer_size = (static_cast<size_t>(64) << 20);
2565 CreateAndReopenWithCF({"pikachu"}, options);
2566
2567 const size_t num_cfs = handles_.size();
2568 ASSERT_EQ(num_cfs, 2);
2569 WriteOptions wopts;
2570 for (size_t i = 0; i != num_cfs; ++i) {
2571 ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
2572 }
2573
2574 {
2575 // Flush the default CF only.
2576 std::vector<int> cf_ids{0};
2577 ASSERT_OK(Flush(cf_ids));
2578
2579 autovector<ColumnFamilyData*> flushed_cfds;
2580 autovector<autovector<VersionEdit*>> flush_edits;
2581 auto flushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[0]);
2582 flushed_cfds.push_back(flushed_cfh->cfd());
2583 flush_edits.push_back({});
2584 auto unflushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[1]);
2585
2586 ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
2587 flushed_cfds, flush_edits),
2588 unflushed_cfh->cfd()->GetLogNumber());
2589 }
2590
2591 {
2592 // Flush all CFs.
2593 std::vector<int> cf_ids;
2594 for (size_t i = 0; i != num_cfs; ++i) {
2595 cf_ids.emplace_back(static_cast<int>(i));
2596 }
2597 ASSERT_OK(Flush(cf_ids));
2598 uint64_t log_num_after_flush = dbfull()->TEST_GetCurrentLogNumber();
2599
2600 uint64_t min_log_number_to_keep = std::numeric_limits<uint64_t>::max();
2601 autovector<ColumnFamilyData*> flushed_cfds;
2602 autovector<autovector<VersionEdit*>> flush_edits;
2603 for (size_t i = 0; i != num_cfs; ++i) {
2604 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
2605 flushed_cfds.push_back(cfh->cfd());
2606 flush_edits.push_back({});
2607 min_log_number_to_keep =
2608 std::min(min_log_number_to_keep, cfh->cfd()->GetLogNumber());
2609 }
2610 ASSERT_EQ(min_log_number_to_keep, log_num_after_flush);
2611 ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
2612 flushed_cfds, flush_edits),
2613 min_log_number_to_keep);
2614 }
2615 }
2616
2617 TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
2618 Options options = CurrentOptions();
2619 options.create_if_missing = true;
2620 options.atomic_flush = GetParam();
2621 // 4KB so that we can easily trigger auto flush.
2622 options.write_buffer_size = 4096;
2623
2624 SyncPoint::GetInstance()->LoadDependency(
2625 {{"DBImpl::BackgroundCallFlush:FlushFinish:0",
2626 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
2627 SyncPoint::GetInstance()->EnableProcessing();
2628
2629 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
2630 size_t num_cfs = handles_.size();
2631 ASSERT_EQ(3, num_cfs);
2632 WriteOptions wopts;
2633 wopts.disableWAL = true;
2634 for (size_t i = 0; i != num_cfs; ++i) {
2635 ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
2636 }
2637 // Keep writing to one of them column families to trigger auto flush.
2638 for (int i = 0; i != 4000; ++i) {
2639 ASSERT_OK(Put(static_cast<int>(num_cfs) - 1 /*cf*/,
2640 "key" + std::to_string(i), "value" + std::to_string(i),
2641 wopts));
2642 }
2643
2644 TEST_SYNC_POINT(
2645 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
2646 if (options.atomic_flush) {
2647 for (size_t i = 0; i + 1 != num_cfs; ++i) {
2648 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
2649 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
2650 ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
2651 }
2652 } else {
2653 for (size_t i = 0; i + 1 != num_cfs; ++i) {
2654 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
2655 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
2656 ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
2657 }
2658 }
2659 SyncPoint::GetInstance()->DisableProcessing();
2660 }
2661
2662 TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
2663 bool atomic_flush = GetParam();
2664 if (!atomic_flush) {
2665 return;
2666 }
2667 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
2668 new FaultInjectionTestEnv(env_));
2669 Options options = CurrentOptions();
2670 options.create_if_missing = true;
2671 options.atomic_flush = atomic_flush;
2672 options.env = fault_injection_env.get();
2673 SyncPoint::GetInstance()->DisableProcessing();
2674 SyncPoint::GetInstance()->LoadDependency(
2675 {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
2676 "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
2677 {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
2678 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
2679 SyncPoint::GetInstance()->EnableProcessing();
2680
2681 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
2682 size_t num_cfs = handles_.size();
2683 ASSERT_EQ(3, num_cfs);
2684 WriteOptions wopts;
2685 wopts.disableWAL = true;
2686 for (size_t i = 0; i != num_cfs; ++i) {
2687 int cf_id = static_cast<int>(i);
2688 ASSERT_OK(Put(cf_id, "key", "value", wopts));
2689 }
2690 FlushOptions flush_opts;
2691 flush_opts.wait = false;
2692 ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
2693 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
2694 fault_injection_env->SetFilesystemActive(false);
2695 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
2696 for (auto* cfh : handles_) {
2697 // Returns the IO error happend during flush.
2698 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable(cfh));
2699 }
2700 for (size_t i = 0; i != num_cfs; ++i) {
2701 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
2702 ASSERT_EQ(1, cfh->cfd()->imm()->NumNotFlushed());
2703 ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
2704 }
2705 fault_injection_env->SetFilesystemActive(true);
2706 Destroy(options);
2707 }
2708
2709 TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
2710 bool atomic_flush = GetParam();
2711 if (!atomic_flush) {
2712 return;
2713 }
2714 Options options = CurrentOptions();
2715 options.create_if_missing = true;
2716 options.atomic_flush = atomic_flush;
2717 SyncPoint::GetInstance()->DisableProcessing();
2718 SyncPoint::GetInstance()->ClearAllCallBacks();
2719 SyncPoint::GetInstance()->EnableProcessing();
2720
2721 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
2722 size_t num_cfs = handles_.size();
2723 ASSERT_EQ(3, num_cfs);
2724 WriteOptions wopts;
2725 wopts.disableWAL = true;
2726 std::vector<int> cf_ids;
2727 for (size_t i = 0; i != num_cfs; ++i) {
2728 int cf_id = static_cast<int>(i);
2729 ASSERT_OK(Put(cf_id, "key", "value", wopts));
2730 cf_ids.push_back(cf_id);
2731 }
2732 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
2733 ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped());
2734 Destroy(options);
2735 }
2736
2737 TEST_P(DBAtomicFlushTest,
2738 FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) {
2739 bool atomic_flush = GetParam();
2740 if (!atomic_flush) {
2741 return;
2742 }
2743 Options options = CurrentOptions();
2744 options.create_if_missing = true;
2745 options.atomic_flush = atomic_flush;
2746
2747 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
2748
2749 SyncPoint::GetInstance()->DisableProcessing();
2750 SyncPoint::GetInstance()->ClearAllCallBacks();
2751 SyncPoint::GetInstance()->LoadDependency(
2752 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
2753 "DBAtomicFlushTest::BeforeDropCF"},
2754 {"DBAtomicFlushTest::AfterDropCF",
2755 "DBImpl::BackgroundCallFlush:start"}});
2756 SyncPoint::GetInstance()->EnableProcessing();
2757
2758 size_t num_cfs = handles_.size();
2759 ASSERT_EQ(3, num_cfs);
2760 WriteOptions wopts;
2761 wopts.disableWAL = true;
2762 for (size_t i = 0; i != num_cfs; ++i) {
2763 int cf_id = static_cast<int>(i);
2764 ASSERT_OK(Put(cf_id, "key", "value", wopts));
2765 }
2766 port::Thread user_thread([&]() {
2767 TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
2768 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
2769 TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
2770 });
2771 FlushOptions flush_opts;
2772 flush_opts.wait = true;
2773 ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
2774 user_thread.join();
2775 for (size_t i = 0; i != num_cfs; ++i) {
2776 int cf_id = static_cast<int>(i);
2777 ASSERT_EQ("value", Get(cf_id, "key"));
2778 }
2779
2780 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options);
2781 num_cfs = handles_.size();
2782 ASSERT_EQ(2, num_cfs);
2783 for (size_t i = 0; i != num_cfs; ++i) {
2784 int cf_id = static_cast<int>(i);
2785 ASSERT_EQ("value", Get(cf_id, "key"));
2786 }
2787 Destroy(options);
2788 }
2789
2790 TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
2791 bool atomic_flush = GetParam();
2792 if (!atomic_flush) {
2793 return;
2794 }
2795 const int kNumKeysTriggerFlush = 4;
2796 Options options = CurrentOptions();
2797 options.create_if_missing = true;
2798 options.atomic_flush = atomic_flush;
2799 options.memtable_factory.reset(
2800 test::NewSpecialSkipListFactory(kNumKeysTriggerFlush));
2801 CreateAndReopenWithCF({"pikachu"}, options);
2802
2803 for (int i = 0; i != kNumKeysTriggerFlush; ++i) {
2804 ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
2805 }
2806 SyncPoint::GetInstance()->DisableProcessing();
2807 SyncPoint::GetInstance()->ClearAllCallBacks();
2808 SyncPoint::GetInstance()->EnableProcessing();
2809 ASSERT_OK(Put(0, "key", "value"));
2810 Close();
2811
2812 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
2813 ASSERT_EQ("value", Get(0, "key"));
2814 }
2815
2816 TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
2817 bool atomic_flush = GetParam();
2818 Options options = CurrentOptions();
2819 options.create_if_missing = true;
2820 options.atomic_flush = atomic_flush;
2821 options.max_write_buffer_number = 4;
2822 // Set min_write_buffer_number_to_merge to be greater than 1, so that
2823 // a column family with one memtable in the imm will not cause IsFlushPending
2824 // to return true when flush_requested_ is false.
2825 options.min_write_buffer_number_to_merge = 2;
2826 CreateAndReopenWithCF({"pikachu"}, options);
2827 ASSERT_EQ(2, handles_.size());
2828 ASSERT_OK(dbfull()->PauseBackgroundWork());
2829 ASSERT_OK(Put(0, "key00", "value00"));
2830 ASSERT_OK(Put(1, "key10", "value10"));
2831 FlushOptions flush_opts;
2832 flush_opts.wait = false;
2833 ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
2834 ASSERT_OK(Put(0, "key01", "value01"));
2835 // Since max_write_buffer_number is 4, the following flush won't cause write
2836 // stall.
2837 ASSERT_OK(dbfull()->Flush(flush_opts));
2838 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
2839 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
2840 handles_[1] = nullptr;
2841 ASSERT_OK(dbfull()->ContinueBackgroundWork());
2842 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
2843 delete handles_[0];
2844 handles_.clear();
2845 }
2846
2847 TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) {
2848 bool atomic_flush = GetParam();
2849 if (!atomic_flush) {
2850 return;
2851 }
2852 Options options = CurrentOptions();
2853 options.create_if_missing = true;
2854 options.atomic_flush = atomic_flush;
2855 CreateAndReopenWithCF({"pikachu"}, options);
2856 SyncPoint::GetInstance()->DisableProcessing();
2857 SyncPoint::GetInstance()->LoadDependency(
2858 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
2859 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
2860 {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
2861 "DBImpl::BackgroundCallFlush:start"},
2862 {"DBImpl::BackgroundCallFlush:start",
2863 "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
2864 SyncPoint::GetInstance()->EnableProcessing();
2865 ASSERT_EQ(2, handles_.size());
2866 ASSERT_OK(Put(0, "key", "value"));
2867 ASSERT_OK(Put(1, "key", "value"));
2868 auto* cfd_default =
2869 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
2870 ->cfd();
2871 auto* cfd_pikachu = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
2872 port::Thread drop_cf_thr([&]() {
2873 TEST_SYNC_POINT(
2874 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
2875 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
2876 delete handles_[1];
2877 handles_.resize(1);
2878 TEST_SYNC_POINT(
2879 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
2880 });
2881 FlushOptions flush_opts;
2882 flush_opts.allow_write_stall = true;
2883 ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu},
2884 flush_opts));
2885 drop_cf_thr.join();
2886 Close();
2887 SyncPoint::GetInstance()->DisableProcessing();
2888 }
2889
2890 TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) {
2891 bool atomic_flush = GetParam();
2892 if (!atomic_flush) {
2893 return;
2894 }
2895 auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
2896 Options options = CurrentOptions();
2897 options.env = fault_injection_env.get();
2898 options.create_if_missing = true;
2899 options.atomic_flush = atomic_flush;
2900 CreateAndReopenWithCF({"pikachu"}, options);
2901 ASSERT_EQ(2, handles_.size());
2902 for (size_t cf = 0; cf < handles_.size(); ++cf) {
2903 ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));
2904 }
2905 SyncPoint::GetInstance()->DisableProcessing();
2906 SyncPoint::GetInstance()->ClearAllCallBacks();
2907 SyncPoint::GetInstance()->SetCallBack(
2908 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
2909 [&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); });
2910 SyncPoint::GetInstance()->EnableProcessing();
2911 FlushOptions flush_opts;
2912 Status s = db_->Flush(flush_opts, handles_);
2913 ASSERT_NOK(s);
2914 fault_injection_env->SetFilesystemActive(true);
2915 Close();
2916 SyncPoint::GetInstance()->ClearAllCallBacks();
2917 }
2918
2919 // In atomic flush, concurrent bg flush threads commit to the MANIFEST in
2920 // serial, in the order of their picked memtables for each column family.
2921 // Only when a bg flush thread finds out that its memtables are the earliest
2922 // unflushed ones for all the included column families will this bg flush
2923 // thread continue to commit to MANIFEST.
2924 // This unit test uses sync point to coordinate the execution of two bg threads
2925 // executing the same sequence of functions. The interleaving are as follows.
2926 // time bg1 bg2
2927 // | pick memtables to flush
2928 // | flush memtables cf1_m1, cf2_m1
2929 // | join MANIFEST write queue
2930 // | pick memtabls to flush
2931 // | flush memtables cf1_(m1+1)
2932 // | join MANIFEST write queue
2933 // | wait to write MANIFEST
2934 // | write MANIFEST
2935 // | IO error
2936 // | detect IO error and stop waiting
2937 // V
2938 TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) {
2939 bool atomic_flush = GetParam();
2940 if (!atomic_flush) {
2941 return;
2942 }
2943 auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
2944 Options options = GetDefaultOptions();
2945 options.create_if_missing = true;
2946 options.atomic_flush = true;
2947 options.env = fault_injection_env.get();
2948 // Set a larger value than default so that RocksDB can schedule concurrent
2949 // background flush threads.
2950 options.max_background_jobs = 8;
2951 options.max_write_buffer_number = 8;
2952 CreateAndReopenWithCF({"pikachu"}, options);
2953
2954 assert(2 == handles_.size());
2955
2956 WriteOptions write_opts;
2957 write_opts.disableWAL = true;
2958
2959 ASSERT_OK(Put(0, "a", "v_0_a", write_opts));
2960 ASSERT_OK(Put(1, "a", "v_1_a", write_opts));
2961
2962 SyncPoint::GetInstance()->DisableProcessing();
2963 SyncPoint::GetInstance()->ClearAllCallBacks();
2964
2965 SyncPoint::GetInstance()->LoadDependency({
2966 {"BgFlushThr2:WaitToCommit", "BgFlushThr1:BeforeWriteManifest"},
2967 });
2968
2969 std::thread::id bg_flush_thr1, bg_flush_thr2;
2970 SyncPoint::GetInstance()->SetCallBack(
2971 "DBImpl::BackgroundCallFlush:start", [&](void*) {
2972 if (bg_flush_thr1 == std::thread::id()) {
2973 bg_flush_thr1 = std::this_thread::get_id();
2974 } else if (bg_flush_thr2 == std::thread::id()) {
2975 bg_flush_thr2 = std::this_thread::get_id();
2976 }
2977 });
2978
2979 int called = 0;
2980 SyncPoint::GetInstance()->SetCallBack(
2981 "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg) {
2982 if (std::this_thread::get_id() == bg_flush_thr2) {
2983 const auto* ptr = reinterpret_cast<std::pair<Status, bool>*>(arg);
2984 assert(ptr);
2985 if (0 == called) {
2986 // When bg flush thread 2 reaches here for the first time.
2987 ASSERT_OK(ptr->first);
2988 ASSERT_TRUE(ptr->second);
2989 } else if (1 == called) {
2990 // When bg flush thread 2 reaches here for the second time.
2991 ASSERT_TRUE(ptr->first.IsIOError());
2992 ASSERT_FALSE(ptr->second);
2993 }
2994 ++called;
2995 TEST_SYNC_POINT("BgFlushThr2:WaitToCommit");
2996 }
2997 });
2998
2999 SyncPoint::GetInstance()->SetCallBack(
3000 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
3001 [&](void*) {
3002 if (std::this_thread::get_id() == bg_flush_thr1) {
3003 TEST_SYNC_POINT("BgFlushThr1:BeforeWriteManifest");
3004 }
3005 });
3006
3007 SyncPoint::GetInstance()->SetCallBack(
3008 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
3009 if (std::this_thread::get_id() != bg_flush_thr1) {
3010 return;
3011 }
3012 ASSERT_OK(db_->Put(write_opts, "b", "v_1_b"));
3013
3014 FlushOptions flush_opts;
3015 flush_opts.wait = false;
3016 std::vector<ColumnFamilyHandle*> cfhs(1, db_->DefaultColumnFamily());
3017 ASSERT_OK(dbfull()->Flush(flush_opts, cfhs));
3018 });
3019
3020 SyncPoint::GetInstance()->SetCallBack(
3021 "VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
3022 auto* ptr = reinterpret_cast<IOStatus*>(arg);
3023 assert(ptr);
3024 *ptr = IOStatus::IOError("Injected failure");
3025 });
3026 SyncPoint::GetInstance()->EnableProcessing();
3027
3028 ASSERT_TRUE(dbfull()->Flush(FlushOptions(), handles_).IsIOError());
3029
3030 Close();
3031 SyncPoint::GetInstance()->DisableProcessing();
3032 SyncPoint::GetInstance()->ClearAllCallBacks();
3033 }
3034
3035 TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) {
3036 Options options = GetDefaultOptions();
3037 options.create_if_missing = true;
3038 options.atomic_flush = GetParam();
3039 options.max_write_buffer_number = 2;
3040 options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
3041
3042 Reopen(options);
3043
3044 SyncPoint::GetInstance()->DisableProcessing();
3045 SyncPoint::GetInstance()->LoadDependency(
3046 {{"DBImpl::DelayWrite:Start",
3047 "DBAtomicFlushTest::NoWaitWhenWritesStopped:0"}});
3048 SyncPoint::GetInstance()->EnableProcessing();
3049
3050 ASSERT_OK(dbfull()->PauseBackgroundWork());
3051 for (int i = 0; i < options.max_write_buffer_number; ++i) {
3052 ASSERT_OK(Put("k" + std::to_string(i), "v" + std::to_string(i)));
3053 }
3054 std::thread stalled_writer([&]() { ASSERT_OK(Put("k", "v")); });
3055
3056 TEST_SYNC_POINT("DBAtomicFlushTest::NoWaitWhenWritesStopped:0");
3057
3058 {
3059 FlushOptions flush_opts;
3060 flush_opts.wait = false;
3061 flush_opts.allow_write_stall = true;
3062 ASSERT_TRUE(db_->Flush(flush_opts).IsTryAgain());
3063 }
3064
3065 ASSERT_OK(dbfull()->ContinueBackgroundWork());
3066 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
3067
3068 stalled_writer.join();
3069
3070 SyncPoint::GetInstance()->DisableProcessing();
3071 }
3072
3073 INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
3074 testing::Bool());
3075
3076 INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());
3077
3078 } // namespace ROCKSDB_NAMESPACE
3079
3080 int main(int argc, char** argv) {
3081 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
3082 ::testing::InitGoogleTest(&argc, argv);
3083 return RUN_ALL_TESTS();
3084 }