]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_write_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_write_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <atomic>
7 #include <fstream>
8 #include <memory>
9 #include <thread>
10 #include <vector>
11
12 #include "db/db_test_util.h"
13 #include "db/write_batch_internal.h"
14 #include "db/write_thread.h"
15 #include "port/port.h"
16 #include "port/stack_trace.h"
17 #include "test_util/sync_point.h"
18 #include "util/random.h"
19 #include "util/string_util.h"
20 #include "utilities/fault_injection_env.h"
21
22 namespace ROCKSDB_NAMESPACE {
23
24 // Test variations of WriteImpl.
25 class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
26 public:
27 DBWriteTest() : DBTestBase("/db_write_test", /*env_do_fsync=*/true) {}
28
29 Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
30
31 void Open() { DBTestBase::Reopen(GetOptions()); }
32 };
33
34 // It is invalid to do sync write while disabling WAL.
35 TEST_P(DBWriteTest, SyncAndDisableWAL) {
36 WriteOptions write_options;
37 write_options.sync = true;
38 write_options.disableWAL = true;
39 ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());
40 WriteBatch batch;
41 ASSERT_OK(batch.Put("foo", "bar"));
42 ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
43 }
44
45 TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
46 Options options = GetOptions();
47 options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
48 4;
49 std::vector<port::Thread> threads;
50 std::atomic<int> thread_num(0);
51 port::Mutex mutex;
52 port::CondVar cv(&mutex);
53 // Guarded by mutex
54 int writers = 0;
55
56 Reopen(options);
57
58 std::function<void()> write_slowdown_func = [&]() {
59 int a = thread_num.fetch_add(1);
60 std::string key = "foo" + std::to_string(a);
61 WriteOptions wo;
62 wo.no_slowdown = false;
63 dbfull()->Put(wo, key, "bar");
64 };
65 std::function<void()> write_no_slowdown_func = [&]() {
66 int a = thread_num.fetch_add(1);
67 std::string key = "foo" + std::to_string(a);
68 WriteOptions wo;
69 wo.no_slowdown = true;
70 dbfull()->Put(wo, key, "bar");
71 };
72 std::function<void(void*)> unblock_main_thread_func = [&](void*) {
73 mutex.Lock();
74 ++writers;
75 cv.SignalAll();
76 mutex.Unlock();
77 };
78
79 // Create 3 L0 files and schedule 4th without waiting
80 Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
81 Flush();
82 Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
83 Flush();
84 Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
85 Flush();
86 Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
87
88 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
89 "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
90 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
91 {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
92 "DBImpl::BackgroundCallFlush:start"},
93 {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
94 "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
95 // Make compaction start wait for the write stall to be detected and
96 // implemented by a write group leader
97 {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
98 "BackgroundCallCompaction:0"}});
99 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
100
101 // Schedule creation of 4th L0 file without waiting. This will seal the
102 // memtable and then wait for a sync point before writing the file. We need
103 // to do it this way because SwitchMemtable() needs to enter the
104 // write_thread
105 FlushOptions fopt;
106 fopt.wait = false;
107 dbfull()->Flush(fopt);
108
109 // Create a mix of slowdown/no_slowdown write threads
110 mutex.Lock();
111 // First leader
112 threads.emplace_back(write_slowdown_func);
113 while (writers != 1) {
114 cv.Wait();
115 }
116
117 // Second leader. Will stall writes
118 // Build a writers list with no slowdown in the middle:
119 // +-------------+
120 // | slowdown +<----+ newest
121 // +--+----------+
122 // |
123 // v
124 // +--+----------+
125 // | no slowdown |
126 // +--+----------+
127 // |
128 // v
129 // +--+----------+
130 // | slowdown +
131 // +-------------+
132 threads.emplace_back(write_slowdown_func);
133 while (writers != 2) {
134 cv.Wait();
135 }
136 threads.emplace_back(write_no_slowdown_func);
137 while (writers != 3) {
138 cv.Wait();
139 }
140 threads.emplace_back(write_slowdown_func);
141 while (writers != 4) {
142 cv.Wait();
143 }
144
145 mutex.Unlock();
146
147 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
148 dbfull()->TEST_WaitForFlushMemTable(nullptr);
149 // This would have triggered a write stall. Unblock the write group leader
150 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
151 // The leader is going to create missing newer links. When the leader
152 // finishes, the next leader is going to delay writes and fail writers with
153 // no_slowdown
154
155 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
156 for (auto& t : threads) {
157 t.join();
158 }
159
160 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
161 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
162 }
163
164 TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
165 Options options = GetOptions();
166 options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
167 std::vector<port::Thread> threads;
168 std::atomic<int> thread_num(0);
169 port::Mutex mutex;
170 port::CondVar cv(&mutex);
171 // Guarded by mutex
172 int writers = 0;
173
174 Reopen(options);
175
176 std::function<void()> write_slowdown_func = [&]() {
177 int a = thread_num.fetch_add(1);
178 std::string key = "foo" + std::to_string(a);
179 WriteOptions wo;
180 wo.no_slowdown = false;
181 dbfull()->Put(wo, key, "bar");
182 };
183 std::function<void()> write_no_slowdown_func = [&]() {
184 int a = thread_num.fetch_add(1);
185 std::string key = "foo" + std::to_string(a);
186 WriteOptions wo;
187 wo.no_slowdown = true;
188 dbfull()->Put(wo, key, "bar");
189 };
190 std::function<void(void *)> unblock_main_thread_func = [&](void *) {
191 mutex.Lock();
192 ++writers;
193 cv.SignalAll();
194 mutex.Unlock();
195 };
196
197 // Create 3 L0 files and schedule 4th without waiting
198 Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
199 Flush();
200 Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
201 Flush();
202 Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
203 Flush();
204 Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
205
206 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
207 "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
208 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
209 {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
210 "DBImpl::BackgroundCallFlush:start"},
211 {"DBWriteTest::WriteThreadHangOnWriteStall:2",
212 "DBImpl::WriteImpl:BeforeLeaderEnters"},
213 // Make compaction start wait for the write stall to be detected and
214 // implemented by a write group leader
215 {"DBWriteTest::WriteThreadHangOnWriteStall:3",
216 "BackgroundCallCompaction:0"}});
217 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
218
219 // Schedule creation of 4th L0 file without waiting. This will seal the
220 // memtable and then wait for a sync point before writing the file. We need
221 // to do it this way because SwitchMemtable() needs to enter the
222 // write_thread
223 FlushOptions fopt;
224 fopt.wait = false;
225 dbfull()->Flush(fopt);
226
227 // Create a mix of slowdown/no_slowdown write threads
228 mutex.Lock();
229 // First leader
230 threads.emplace_back(write_slowdown_func);
231 while (writers != 1) {
232 cv.Wait();
233 }
234 // Second leader. Will stall writes
235 threads.emplace_back(write_slowdown_func);
236 threads.emplace_back(write_no_slowdown_func);
237 threads.emplace_back(write_slowdown_func);
238 threads.emplace_back(write_no_slowdown_func);
239 threads.emplace_back(write_slowdown_func);
240 while (writers != 6) {
241 cv.Wait();
242 }
243 mutex.Unlock();
244
245 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
246 dbfull()->TEST_WaitForFlushMemTable(nullptr);
247 // This would have triggered a write stall. Unblock the write group leader
248 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
249 // The leader is going to create missing newer links. When the leader finishes,
250 // the next leader is going to delay writes and fail writers with no_slowdown
251
252 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
253 for (auto& t : threads) {
254 t.join();
255 }
256 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
257 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
258 }
259
260 TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
261 constexpr int kNumThreads = 5;
262 std::unique_ptr<FaultInjectionTestEnv> mock_env(
263 new FaultInjectionTestEnv(env_));
264 Options options = GetOptions();
265 options.env = mock_env.get();
266 Reopen(options);
267 std::atomic<int> ready_count{0};
268 std::atomic<int> leader_count{0};
269 std::vector<port::Thread> threads;
270 mock_env->SetFilesystemActive(false);
271
272 // Wait until all threads linked to write threads, to make sure
273 // all threads join the same batch group.
274 SyncPoint::GetInstance()->SetCallBack(
275 "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
276 ready_count++;
277 auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
278 if (w->state == WriteThread::STATE_GROUP_LEADER) {
279 leader_count++;
280 while (ready_count < kNumThreads) {
281 // busy waiting
282 }
283 }
284 });
285 SyncPoint::GetInstance()->EnableProcessing();
286 for (int i = 0; i < kNumThreads; i++) {
287 threads.push_back(port::Thread(
288 [&](int index) {
289 // All threads should fail.
290 auto res = Put("key" + ToString(index), "value");
291 if (options.manual_wal_flush) {
292 ASSERT_TRUE(res.ok());
293 // we should see fs error when we do the flush
294
295 // TSAN reports a false alarm for lock-order-inversion but Open and
296 // FlushWAL are not run concurrently. Disabling this until TSAN is
297 // fixed.
298 // res = dbfull()->FlushWAL(false);
299 // ASSERT_FALSE(res.ok());
300 } else {
301 ASSERT_FALSE(res.ok());
302 }
303 },
304 i));
305 }
306 for (int i = 0; i < kNumThreads; i++) {
307 threads[i].join();
308 }
309 ASSERT_EQ(1, leader_count);
310 // Close before mock_env destruct.
311 Close();
312 }
313
314 TEST_P(DBWriteTest, ManualWalFlushInEffect) {
315 Options options = GetOptions();
316 Reopen(options);
317 // try the 1st WAL created during open
318 ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
319 ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
320 ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
321 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
322 // try the 2nd wal created during SwitchWAL
323 dbfull()->TEST_SwitchWAL();
324 ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
325 ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
326 ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
327 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
328 }
329
330 TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
331 std::unique_ptr<FaultInjectionTestEnv> mock_env(
332 new FaultInjectionTestEnv(env_));
333 Options options = GetOptions();
334 options.env = mock_env.get();
335 Reopen(options);
336 for (int i = 0; i < 2; i++) {
337 // Forcibly fail WAL write for the first Put only. Subsequent Puts should
338 // fail due to read-only mode
339 mock_env->SetFilesystemActive(i != 0);
340 auto res = Put("key" + ToString(i), "value");
341 // TSAN reports a false alarm for lock-order-inversion but Open and
342 // FlushWAL are not run concurrently. Disabling this until TSAN is
343 // fixed.
344 /*
345 if (options.manual_wal_flush && i == 0) {
346 // even with manual_wal_flush the 2nd Put should return error because of
347 // the read-only mode
348 ASSERT_TRUE(res.ok());
349 // we should see fs error when we do the flush
350 res = dbfull()->FlushWAL(false);
351 }
352 */
353 if (!options.manual_wal_flush) {
354 ASSERT_FALSE(res.ok());
355 }
356 }
357 // Close before mock_env destruct.
358 Close();
359 }
360
361 TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
362 Random rnd(301);
363 std::unique_ptr<FaultInjectionTestEnv> mock_env(
364 new FaultInjectionTestEnv(env_));
365 Options options = GetOptions();
366 options.env = mock_env.get();
367 options.writable_file_max_buffer_size = 4 * 1024 * 1024;
368 options.write_buffer_size = 3 * 512 * 1024;
369 options.wal_bytes_per_sync = 256 * 1024;
370 options.manual_wal_flush = true;
371 Reopen(options);
372 mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
373 Status s;
374 for (int i = 0; i < 4 * 512; ++i) {
375 s = Put(Key(i), rnd.RandomString(1024));
376 if (!s.ok()) {
377 break;
378 }
379 }
380 ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
381
382 mock_env->SetFilesystemActive(true);
383 // Close before mock_env destruct.
384 Close();
385 }
386
387 // Test that db->LockWAL() flushes the WAL after locking.
388 TEST_P(DBWriteTest, LockWalInEffect) {
389 Options options = GetOptions();
390 Reopen(options);
391 // try the 1st WAL created during open
392 ASSERT_OK(Put("key" + ToString(0), "value"));
393 ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
394 ASSERT_OK(dbfull()->LockWAL());
395 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
396 ASSERT_OK(dbfull()->UnlockWAL());
397 // try the 2nd wal created during SwitchWAL
398 dbfull()->TEST_SwitchWAL();
399 ASSERT_OK(Put("key" + ToString(0), "value"));
400 ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
401 ASSERT_OK(dbfull()->LockWAL());
402 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
403 ASSERT_OK(dbfull()->UnlockWAL());
404 }
405
406 TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
407 Options options = GetOptions();
408 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
409 options.statistics->set_stats_level(StatsLevel::kAll);
410 Reopen(options);
411 std::string wal_key_prefix = "WAL_KEY_";
412 std::string no_wal_key_prefix = "K_";
413 // 100 KB value each for NO-WAL operation
414 std::string no_wal_value(1024 * 100, 'X');
415 // 1B value each for WAL operation
416 std::string wal_value = "0";
417 std::thread threads[10];
418 for (int t = 0; t < 10; t++) {
419 threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, no_wal_value, this] {
420 for(int i = 0; i < 10; i++) {
421 ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
422 write_option_disable.disableWAL = true;
423 ROCKSDB_NAMESPACE::WriteOptions write_option_default;
424 std::string no_wal_key = no_wal_key_prefix + std::to_string(t) +
425 "_" + std::to_string(i);
426 this->Put(no_wal_key, no_wal_value, write_option_disable);
427 std::string wal_key =
428 wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
429 this->Put(wal_key, wal_value, write_option_default);
430 dbfull()->SyncWAL();
431 }
432 return 0;
433 });
434 }
435 for (auto& t: threads) {
436 t.join();
437 }
438 uint64_t bytes_num = options.statistics->getTickerCount(
439 ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);
440 // written WAL size should less than 100KB (even included HEADER & FOOTER overhead)
441 ASSERT_LE(bytes_num, 1024 * 100);
442 }
443
444 INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
445 testing::Values(DBTestBase::kDefault,
446 DBTestBase::kConcurrentWALWrites,
447 DBTestBase::kPipelinedWrite));
448
449 } // namespace ROCKSDB_NAMESPACE
450
451 int main(int argc, char** argv) {
452 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
453 ::testing::InitGoogleTest(&argc, argv);
454 return RUN_ALL_TESTS();
455 }