]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_test_util.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_test_util.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #pragma once
11
12 #include <fcntl.h>
13
14 #include <algorithm>
15 #include <cinttypes>
16 #include <map>
17 #include <set>
18 #include <string>
19 #include <thread>
20 #include <unordered_set>
21 #include <utility>
22 #include <vector>
23
24 #include "db/db_impl/db_impl.h"
25 #include "db/dbformat.h"
26 #include "env/mock_env.h"
27 #include "file/filename.h"
28 #include "memtable/hash_linklist_rep.h"
29 #include "rocksdb/cache.h"
30 #include "rocksdb/compaction_filter.h"
31 #include "rocksdb/convenience.h"
32 #include "rocksdb/db.h"
33 #include "rocksdb/env.h"
34 #include "rocksdb/filter_policy.h"
35 #include "rocksdb/options.h"
36 #include "rocksdb/slice.h"
37 #include "rocksdb/sst_file_writer.h"
38 #include "rocksdb/statistics.h"
39 #include "rocksdb/table.h"
40 #include "rocksdb/utilities/checkpoint.h"
41 #include "table/mock_table.h"
42 #include "table/scoped_arena_iterator.h"
43 #include "test_util/mock_time_env.h"
44 #include "test_util/sync_point.h"
45 #include "test_util/testharness.h"
46 #include "util/cast_util.h"
47 #include "util/compression.h"
48 #include "util/mutexlock.h"
49 #include "util/string_util.h"
50 #include "utilities/merge_operators.h"
51
52 namespace ROCKSDB_NAMESPACE {
53
54 namespace anon {
55 class AtomicCounter {
56 public:
57 explicit AtomicCounter(Env* env = NULL)
58 : env_(env), cond_count_(&mu_), count_(0) {}
59
60 void Increment() {
61 MutexLock l(&mu_);
62 count_++;
63 cond_count_.SignalAll();
64 }
65
66 int Read() {
67 MutexLock l(&mu_);
68 return count_;
69 }
70
71 bool WaitFor(int count) {
72 MutexLock l(&mu_);
73
74 uint64_t start = env_->NowMicros();
75 while (count_ < count) {
76 uint64_t now = env_->NowMicros();
77 cond_count_.TimedWait(now + /*1s*/ 1 * 1000 * 1000);
78 if (env_->NowMicros() - start > /*10s*/ 10 * 1000 * 1000) {
79 return false;
80 }
81 if (count_ < count) {
82 GTEST_LOG_(WARNING) << "WaitFor is taking more time than usual";
83 }
84 }
85
86 return true;
87 }
88
89 void Reset() {
90 MutexLock l(&mu_);
91 count_ = 0;
92 cond_count_.SignalAll();
93 }
94
95 private:
96 Env* env_;
97 port::Mutex mu_;
98 port::CondVar cond_count_;
99 int count_;
100 };
101
102 struct OptionsOverride {
103 std::shared_ptr<const FilterPolicy> filter_policy = nullptr;
104 // These will be used only if filter_policy is set
105 bool partition_filters = false;
106 uint64_t metadata_block_size = 1024;
107
108 // Used as a bit mask of individual enums in which to skip an XF test point
109 int skip_policy = 0;
110 };
111
112 } // namespace anon
113
114 enum SkipPolicy { kSkipNone = 0, kSkipNoSnapshot = 1, kSkipNoPrefix = 2 };
115
116 // A hacky skip list mem table that triggers flush after number of entries.
117 class SpecialMemTableRep : public MemTableRep {
118 public:
119 explicit SpecialMemTableRep(Allocator* allocator, MemTableRep* memtable,
120 int num_entries_flush)
121 : MemTableRep(allocator),
122 memtable_(memtable),
123 num_entries_flush_(num_entries_flush),
124 num_entries_(0) {}
125
126 virtual KeyHandle Allocate(const size_t len, char** buf) override {
127 return memtable_->Allocate(len, buf);
128 }
129
130 // Insert key into the list.
131 // REQUIRES: nothing that compares equal to key is currently in the list.
132 virtual void Insert(KeyHandle handle) override {
133 num_entries_++;
134 memtable_->Insert(handle);
135 }
136
137 void InsertConcurrently(KeyHandle handle) override {
138 num_entries_++;
139 memtable_->Insert(handle);
140 }
141
142 // Returns true iff an entry that compares equal to key is in the list.
143 virtual bool Contains(const char* key) const override {
144 return memtable_->Contains(key);
145 }
146
147 virtual size_t ApproximateMemoryUsage() override {
148 // Return a high memory usage when number of entries exceeds the threshold
149 // to trigger a flush.
150 return (num_entries_ < num_entries_flush_) ? 0 : 1024 * 1024 * 1024;
151 }
152
153 virtual void Get(const LookupKey& k, void* callback_args,
154 bool (*callback_func)(void* arg,
155 const char* entry)) override {
156 memtable_->Get(k, callback_args, callback_func);
157 }
158
159 uint64_t ApproximateNumEntries(const Slice& start_ikey,
160 const Slice& end_ikey) override {
161 return memtable_->ApproximateNumEntries(start_ikey, end_ikey);
162 }
163
164 virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {
165 return memtable_->GetIterator(arena);
166 }
167
168 virtual ~SpecialMemTableRep() override {}
169
170 private:
171 std::unique_ptr<MemTableRep> memtable_;
172 int num_entries_flush_;
173 int num_entries_;
174 };
175
176 // The factory for the hacky skip list mem table that triggers flush after
177 // number of entries exceeds a threshold.
178 class SpecialSkipListFactory : public MemTableRepFactory {
179 public:
180 // After number of inserts exceeds `num_entries_flush` in a mem table, trigger
181 // flush.
182 explicit SpecialSkipListFactory(int num_entries_flush)
183 : num_entries_flush_(num_entries_flush) {}
184
185 using MemTableRepFactory::CreateMemTableRep;
186 virtual MemTableRep* CreateMemTableRep(
187 const MemTableRep::KeyComparator& compare, Allocator* allocator,
188 const SliceTransform* transform, Logger* /*logger*/) override {
189 return new SpecialMemTableRep(
190 allocator, factory_.CreateMemTableRep(compare, allocator, transform, 0),
191 num_entries_flush_);
192 }
193 virtual const char* Name() const override { return "SkipListFactory"; }
194
195 bool IsInsertConcurrentlySupported() const override {
196 return factory_.IsInsertConcurrentlySupported();
197 }
198
199 private:
200 SkipListFactory factory_;
201 int num_entries_flush_;
202 };
203
204 // Special Env used to delay background operations
205 class SpecialEnv : public EnvWrapper {
206 public:
207 explicit SpecialEnv(Env* base, bool time_elapse_only_sleep = false);
208
209 Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
210 const EnvOptions& soptions) override {
211 class SSTableFile : public WritableFile {
212 private:
213 SpecialEnv* env_;
214 std::unique_ptr<WritableFile> base_;
215
216 public:
217 SSTableFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& base)
218 : env_(env), base_(std::move(base)) {}
219 Status Append(const Slice& data) override {
220 if (env_->table_write_callback_) {
221 (*env_->table_write_callback_)();
222 }
223 if (env_->drop_writes_.load(std::memory_order_acquire)) {
224 // Drop writes on the floor
225 return Status::OK();
226 } else if (env_->no_space_.load(std::memory_order_acquire)) {
227 return Status::NoSpace("No space left on device");
228 } else {
229 env_->bytes_written_ += data.size();
230 return base_->Append(data);
231 }
232 }
233 Status PositionedAppend(const Slice& data, uint64_t offset) override {
234 if (env_->table_write_callback_) {
235 (*env_->table_write_callback_)();
236 }
237 if (env_->drop_writes_.load(std::memory_order_acquire)) {
238 // Drop writes on the floor
239 return Status::OK();
240 } else if (env_->no_space_.load(std::memory_order_acquire)) {
241 return Status::NoSpace("No space left on device");
242 } else {
243 env_->bytes_written_ += data.size();
244 return base_->PositionedAppend(data, offset);
245 }
246 }
247 Status Truncate(uint64_t size) override { return base_->Truncate(size); }
248 Status RangeSync(uint64_t offset, uint64_t nbytes) override {
249 Status s = base_->RangeSync(offset, nbytes);
250 #if !(defined NDEBUG) || !defined(OS_WIN)
251 TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::RangeSync", &s);
252 #endif // !(defined NDEBUG) || !defined(OS_WIN)
253 return s;
254 }
255 Status Close() override {
256 // SyncPoint is not supported in Released Windows Mode.
257 #if !(defined NDEBUG) || !defined(OS_WIN)
258 // Check preallocation size
259 // preallocation size is never passed to base file.
260 size_t preallocation_size = preallocation_block_size();
261 TEST_SYNC_POINT_CALLBACK("DBTestWritableFile.GetPreallocationStatus",
262 &preallocation_size);
263 #endif // !(defined NDEBUG) || !defined(OS_WIN)
264 Status s = base_->Close();
265 #if !(defined NDEBUG) || !defined(OS_WIN)
266 TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Close", &s);
267 #endif // !(defined NDEBUG) || !defined(OS_WIN)
268 return s;
269 }
270 Status Flush() override { return base_->Flush(); }
271 Status Sync() override {
272 ++env_->sync_counter_;
273 while (env_->delay_sstable_sync_.load(std::memory_order_acquire)) {
274 env_->SleepForMicroseconds(100000);
275 }
276 Status s;
277 if (!env_->skip_fsync_) {
278 s = base_->Sync();
279 }
280 #if !(defined NDEBUG) || !defined(OS_WIN)
281 TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Sync", &s);
282 #endif // !(defined NDEBUG) || !defined(OS_WIN)
283 return s;
284 }
285 void SetIOPriority(Env::IOPriority pri) override {
286 base_->SetIOPriority(pri);
287 }
288 Env::IOPriority GetIOPriority() override {
289 return base_->GetIOPriority();
290 }
291 bool use_direct_io() const override {
292 return base_->use_direct_io();
293 }
294 Status Allocate(uint64_t offset, uint64_t len) override {
295 return base_->Allocate(offset, len);
296 }
297 };
298 class ManifestFile : public WritableFile {
299 public:
300 ManifestFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& b)
301 : env_(env), base_(std::move(b)) {}
302 Status Append(const Slice& data) override {
303 if (env_->manifest_write_error_.load(std::memory_order_acquire)) {
304 return Status::IOError("simulated writer error");
305 } else {
306 return base_->Append(data);
307 }
308 }
309 Status Truncate(uint64_t size) override { return base_->Truncate(size); }
310 Status Close() override { return base_->Close(); }
311 Status Flush() override { return base_->Flush(); }
312 Status Sync() override {
313 ++env_->sync_counter_;
314 if (env_->manifest_sync_error_.load(std::memory_order_acquire)) {
315 return Status::IOError("simulated sync error");
316 } else {
317 if (env_->skip_fsync_) {
318 return Status::OK();
319 } else {
320 return base_->Sync();
321 }
322 }
323 }
324 uint64_t GetFileSize() override { return base_->GetFileSize(); }
325 Status Allocate(uint64_t offset, uint64_t len) override {
326 return base_->Allocate(offset, len);
327 }
328
329 private:
330 SpecialEnv* env_;
331 std::unique_ptr<WritableFile> base_;
332 };
333 class WalFile : public WritableFile {
334 public:
335 WalFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& b)
336 : env_(env), base_(std::move(b)) {
337 env_->num_open_wal_file_.fetch_add(1);
338 }
339 virtual ~WalFile() { env_->num_open_wal_file_.fetch_add(-1); }
340 Status Append(const Slice& data) override {
341 #if !(defined NDEBUG) || !defined(OS_WIN)
342 TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1");
343 #endif
344 Status s;
345 if (env_->log_write_error_.load(std::memory_order_acquire)) {
346 s = Status::IOError("simulated writer error");
347 } else {
348 int slowdown =
349 env_->log_write_slowdown_.load(std::memory_order_acquire);
350 if (slowdown > 0) {
351 env_->SleepForMicroseconds(slowdown);
352 }
353 s = base_->Append(data);
354 }
355 #if !(defined NDEBUG) || !defined(OS_WIN)
356 TEST_SYNC_POINT("SpecialEnv::WalFile::Append:2");
357 #endif
358 return s;
359 }
360 Status Truncate(uint64_t size) override { return base_->Truncate(size); }
361 Status Close() override {
362 // SyncPoint is not supported in Released Windows Mode.
363 #if !(defined NDEBUG) || !defined(OS_WIN)
364 // Check preallocation size
365 // preallocation size is never passed to base file.
366 size_t preallocation_size = preallocation_block_size();
367 TEST_SYNC_POINT_CALLBACK("DBTestWalFile.GetPreallocationStatus",
368 &preallocation_size);
369 #endif // !(defined NDEBUG) || !defined(OS_WIN)
370
371 return base_->Close();
372 }
373 Status Flush() override { return base_->Flush(); }
374 Status Sync() override {
375 ++env_->sync_counter_;
376 if (env_->skip_fsync_) {
377 return Status::OK();
378 } else {
379 return base_->Sync();
380 }
381 }
382 bool IsSyncThreadSafe() const override {
383 return env_->is_wal_sync_thread_safe_.load();
384 }
385 Status Allocate(uint64_t offset, uint64_t len) override {
386 return base_->Allocate(offset, len);
387 }
388
389 private:
390 SpecialEnv* env_;
391 std::unique_ptr<WritableFile> base_;
392 };
393 class OtherFile : public WritableFile {
394 public:
395 OtherFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& b)
396 : env_(env), base_(std::move(b)) {}
397 Status Append(const Slice& data) override { return base_->Append(data); }
398 Status Truncate(uint64_t size) override { return base_->Truncate(size); }
399 Status Close() override { return base_->Close(); }
400 Status Flush() override { return base_->Flush(); }
401 Status Sync() override {
402 if (env_->skip_fsync_) {
403 return Status::OK();
404 } else {
405 return base_->Sync();
406 }
407 }
408 uint64_t GetFileSize() override { return base_->GetFileSize(); }
409 Status Allocate(uint64_t offset, uint64_t len) override {
410 return base_->Allocate(offset, len);
411 }
412
413 private:
414 SpecialEnv* env_;
415 std::unique_ptr<WritableFile> base_;
416 };
417
418 if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
419 uint32_t random_number;
420 {
421 MutexLock l(&rnd_mutex_);
422 random_number = rnd_.Uniform(100);
423 }
424 if (random_number < non_writeable_rate_.load()) {
425 return Status::IOError("simulated random write error");
426 }
427 }
428
429 new_writable_count_++;
430
431 if (non_writable_count_.load() > 0) {
432 non_writable_count_--;
433 return Status::IOError("simulated write error");
434 }
435
436 EnvOptions optimized = soptions;
437 if (strstr(f.c_str(), "MANIFEST") != nullptr ||
438 strstr(f.c_str(), "log") != nullptr) {
439 optimized.use_mmap_writes = false;
440 optimized.use_direct_writes = false;
441 }
442
443 Status s = target()->NewWritableFile(f, r, optimized);
444 if (s.ok()) {
445 if (strstr(f.c_str(), ".sst") != nullptr) {
446 r->reset(new SSTableFile(this, std::move(*r)));
447 } else if (strstr(f.c_str(), "MANIFEST") != nullptr) {
448 r->reset(new ManifestFile(this, std::move(*r)));
449 } else if (strstr(f.c_str(), "log") != nullptr) {
450 r->reset(new WalFile(this, std::move(*r)));
451 } else {
452 r->reset(new OtherFile(this, std::move(*r)));
453 }
454 }
455 return s;
456 }
457
458 Status NewRandomAccessFile(const std::string& f,
459 std::unique_ptr<RandomAccessFile>* r,
460 const EnvOptions& soptions) override {
461 class CountingFile : public RandomAccessFile {
462 public:
463 CountingFile(std::unique_ptr<RandomAccessFile>&& target,
464 anon::AtomicCounter* counter,
465 std::atomic<size_t>* bytes_read)
466 : target_(std::move(target)),
467 counter_(counter),
468 bytes_read_(bytes_read) {}
469 virtual Status Read(uint64_t offset, size_t n, Slice* result,
470 char* scratch) const override {
471 counter_->Increment();
472 Status s = target_->Read(offset, n, result, scratch);
473 *bytes_read_ += result->size();
474 return s;
475 }
476
477 virtual Status Prefetch(uint64_t offset, size_t n) override {
478 Status s = target_->Prefetch(offset, n);
479 *bytes_read_ += n;
480 return s;
481 }
482
483 private:
484 std::unique_ptr<RandomAccessFile> target_;
485 anon::AtomicCounter* counter_;
486 std::atomic<size_t>* bytes_read_;
487 };
488
489 class RandomFailureFile : public RandomAccessFile {
490 public:
491 RandomFailureFile(std::unique_ptr<RandomAccessFile>&& target,
492 std::atomic<uint64_t>* failure_cnt, uint32_t fail_odd)
493 : target_(std::move(target)),
494 fail_cnt_(failure_cnt),
495 fail_odd_(fail_odd) {}
496 virtual Status Read(uint64_t offset, size_t n, Slice* result,
497 char* scratch) const override {
498 if (Random::GetTLSInstance()->OneIn(fail_odd_)) {
499 fail_cnt_->fetch_add(1);
500 return Status::IOError("random error");
501 }
502 return target_->Read(offset, n, result, scratch);
503 }
504
505 virtual Status Prefetch(uint64_t offset, size_t n) override {
506 return target_->Prefetch(offset, n);
507 }
508
509 private:
510 std::unique_ptr<RandomAccessFile> target_;
511 std::atomic<uint64_t>* fail_cnt_;
512 uint32_t fail_odd_;
513 };
514
515 Status s = target()->NewRandomAccessFile(f, r, soptions);
516 random_file_open_counter_++;
517 if (s.ok()) {
518 if (count_random_reads_) {
519 r->reset(new CountingFile(std::move(*r), &random_read_counter_,
520 &random_read_bytes_counter_));
521 } else if (rand_reads_fail_odd_ > 0) {
522 r->reset(new RandomFailureFile(std::move(*r), &num_reads_fails_,
523 rand_reads_fail_odd_));
524 }
525 }
526
527 if (s.ok() && soptions.compaction_readahead_size > 0) {
528 compaction_readahead_size_ = soptions.compaction_readahead_size;
529 }
530 return s;
531 }
532
533 virtual Status NewSequentialFile(const std::string& f,
534 std::unique_ptr<SequentialFile>* r,
535 const EnvOptions& soptions) override {
536 class CountingFile : public SequentialFile {
537 public:
538 CountingFile(std::unique_ptr<SequentialFile>&& target,
539 anon::AtomicCounter* counter)
540 : target_(std::move(target)), counter_(counter) {}
541 virtual Status Read(size_t n, Slice* result, char* scratch) override {
542 counter_->Increment();
543 return target_->Read(n, result, scratch);
544 }
545 virtual Status Skip(uint64_t n) override { return target_->Skip(n); }
546
547 private:
548 std::unique_ptr<SequentialFile> target_;
549 anon::AtomicCounter* counter_;
550 };
551
552 Status s = target()->NewSequentialFile(f, r, soptions);
553 if (s.ok() && count_sequential_reads_) {
554 r->reset(new CountingFile(std::move(*r), &sequential_read_counter_));
555 }
556 return s;
557 }
558
559 virtual void SleepForMicroseconds(int micros) override {
560 sleep_counter_.Increment();
561 if (no_slowdown_ || time_elapse_only_sleep_) {
562 addon_microseconds_.fetch_add(micros);
563 }
564 if (!no_slowdown_) {
565 target()->SleepForMicroseconds(micros);
566 }
567 }
568
569 void MockSleepForMicroseconds(int64_t micros) {
570 sleep_counter_.Increment();
571 assert(no_slowdown_);
572 addon_microseconds_.fetch_add(micros);
573 }
574
575 void MockSleepForSeconds(int64_t seconds) {
576 sleep_counter_.Increment();
577 assert(no_slowdown_);
578 addon_microseconds_.fetch_add(seconds * 1000000);
579 }
580
581 virtual Status GetCurrentTime(int64_t* unix_time) override {
582 Status s;
583 if (time_elapse_only_sleep_) {
584 *unix_time = maybe_starting_time_;
585 } else {
586 s = target()->GetCurrentTime(unix_time);
587 }
588 if (s.ok()) {
589 // mock microseconds elapsed to seconds of time
590 *unix_time += addon_microseconds_.load() / 1000000;
591 }
592 return s;
593 }
594
595 virtual uint64_t NowCPUNanos() override {
596 now_cpu_count_.fetch_add(1);
597 return target()->NowCPUNanos();
598 }
599
600 virtual uint64_t NowNanos() override {
601 return (time_elapse_only_sleep_ ? 0 : target()->NowNanos()) +
602 addon_microseconds_.load() * 1000;
603 }
604
605 virtual uint64_t NowMicros() override {
606 return (time_elapse_only_sleep_ ? 0 : target()->NowMicros()) +
607 addon_microseconds_.load();
608 }
609
610 virtual Status DeleteFile(const std::string& fname) override {
611 delete_count_.fetch_add(1);
612 return target()->DeleteFile(fname);
613 }
614
615 void SetMockSleep(bool enabled = true) { no_slowdown_ = enabled; }
616
617 Status NewDirectory(const std::string& name,
618 std::unique_ptr<Directory>* result) override {
619 if (!skip_fsync_) {
620 return target()->NewDirectory(name, result);
621 } else {
622 class NoopDirectory : public Directory {
623 public:
624 NoopDirectory() {}
625 ~NoopDirectory() {}
626
627 Status Fsync() override { return Status::OK(); }
628 };
629
630 result->reset(new NoopDirectory());
631 return Status::OK();
632 }
633 }
634
635 // Something to return when mocking current time
636 const int64_t maybe_starting_time_;
637
638 Random rnd_;
639 port::Mutex rnd_mutex_; // Lock to pretect rnd_
640
641 // sstable Sync() calls are blocked while this pointer is non-nullptr.
642 std::atomic<bool> delay_sstable_sync_;
643
644 // Drop writes on the floor while this pointer is non-nullptr.
645 std::atomic<bool> drop_writes_;
646
647 // Simulate no-space errors while this pointer is non-nullptr.
648 std::atomic<bool> no_space_;
649
650 // Simulate non-writable file system while this pointer is non-nullptr
651 std::atomic<bool> non_writable_;
652
653 // Force sync of manifest files to fail while this pointer is non-nullptr
654 std::atomic<bool> manifest_sync_error_;
655
656 // Force write to manifest files to fail while this pointer is non-nullptr
657 std::atomic<bool> manifest_write_error_;
658
659 // Force write to log files to fail while this pointer is non-nullptr
660 std::atomic<bool> log_write_error_;
661
662 // Slow down every log write, in micro-seconds.
663 std::atomic<int> log_write_slowdown_;
664
665 // Number of WAL files that are still open for write.
666 std::atomic<int> num_open_wal_file_;
667
668 bool count_random_reads_;
669 uint32_t rand_reads_fail_odd_ = 0;
670 std::atomic<uint64_t> num_reads_fails_;
671 anon::AtomicCounter random_read_counter_;
672 std::atomic<size_t> random_read_bytes_counter_;
673 std::atomic<int> random_file_open_counter_;
674
675 bool count_sequential_reads_;
676 anon::AtomicCounter sequential_read_counter_;
677
678 anon::AtomicCounter sleep_counter_;
679
680 std::atomic<int64_t> bytes_written_;
681
682 std::atomic<int> sync_counter_;
683
684 // If true, all fsync to files and directories are skipped.
685 bool skip_fsync_ = false;
686
687 std::atomic<uint32_t> non_writeable_rate_;
688
689 std::atomic<uint32_t> new_writable_count_;
690
691 std::atomic<uint32_t> non_writable_count_;
692
693 std::function<void()>* table_write_callback_;
694
695 std::atomic<int> now_cpu_count_;
696
697 std::atomic<int> delete_count_;
698
699 std::atomic<bool> is_wal_sync_thread_safe_{true};
700
701 std::atomic<size_t> compaction_readahead_size_{};
702
703 private: // accessing these directly is prone to error
704 friend class DBTestBase;
705
706 std::atomic<int64_t> addon_microseconds_{0};
707
708 // Do not modify in the env of a running DB (could cause deadlock)
709 std::atomic<bool> time_elapse_only_sleep_;
710
711 bool no_slowdown_;
712 };
713
714 #ifndef ROCKSDB_LITE
715 class OnFileDeletionListener : public EventListener {
716 public:
717 OnFileDeletionListener() : matched_count_(0), expected_file_name_("") {}
718
719 void SetExpectedFileName(const std::string file_name) {
720 expected_file_name_ = file_name;
721 }
722
723 void VerifyMatchedCount(size_t expected_value) {
724 ASSERT_EQ(matched_count_, expected_value);
725 }
726
727 void OnTableFileDeleted(const TableFileDeletionInfo& info) override {
728 if (expected_file_name_ != "") {
729 ASSERT_EQ(expected_file_name_, info.file_path);
730 expected_file_name_ = "";
731 matched_count_++;
732 }
733 }
734
735 private:
736 size_t matched_count_;
737 std::string expected_file_name_;
738 };
739 #endif
740
741 // A test merge operator mimics put but also fails if one of merge operands is
742 // "corrupted".
743 class TestPutOperator : public MergeOperator {
744 public:
745 virtual bool FullMergeV2(const MergeOperationInput& merge_in,
746 MergeOperationOutput* merge_out) const override {
747 if (merge_in.existing_value != nullptr &&
748 *(merge_in.existing_value) == "corrupted") {
749 return false;
750 }
751 for (auto value : merge_in.operand_list) {
752 if (value == "corrupted") {
753 return false;
754 }
755 }
756 merge_out->existing_operand = merge_in.operand_list.back();
757 return true;
758 }
759
760 virtual const char* Name() const override { return "TestPutOperator"; }
761 };
762
763 // A wrapper around Cache that can easily be extended with instrumentation,
764 // etc.
765 class CacheWrapper : public Cache {
766 public:
767 explicit CacheWrapper(std::shared_ptr<Cache> target)
768 : target_(std::move(target)) {}
769
770 const char* Name() const override { return target_->Name(); }
771
772 Status Insert(const Slice& key, void* value, size_t charge,
773 void (*deleter)(const Slice& key, void* value),
774 Handle** handle = nullptr,
775 Priority priority = Priority::LOW) override {
776 return target_->Insert(key, value, charge, deleter, handle, priority);
777 }
778
779 Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override {
780 return target_->Lookup(key, stats);
781 }
782
783 bool Ref(Handle* handle) override { return target_->Ref(handle); }
784
785 bool Release(Handle* handle, bool force_erase = false) override {
786 return target_->Release(handle, force_erase);
787 }
788
789 void* Value(Handle* handle) override { return target_->Value(handle); }
790
791 void Erase(const Slice& key) override { target_->Erase(key); }
792 uint64_t NewId() override { return target_->NewId(); }
793
794 void SetCapacity(size_t capacity) override { target_->SetCapacity(capacity); }
795
796 void SetStrictCapacityLimit(bool strict_capacity_limit) override {
797 target_->SetStrictCapacityLimit(strict_capacity_limit);
798 }
799
800 bool HasStrictCapacityLimit() const override {
801 return target_->HasStrictCapacityLimit();
802 }
803
804 size_t GetCapacity() const override { return target_->GetCapacity(); }
805
806 size_t GetUsage() const override { return target_->GetUsage(); }
807
808 size_t GetUsage(Handle* handle) const override {
809 return target_->GetUsage(handle);
810 }
811
812 size_t GetPinnedUsage() const override { return target_->GetPinnedUsage(); }
813
814 size_t GetCharge(Handle* handle) const override {
815 return target_->GetCharge(handle);
816 }
817
818 void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
819 bool thread_safe) override {
820 target_->ApplyToAllCacheEntries(callback, thread_safe);
821 }
822
823 void EraseUnRefEntries() override { target_->EraseUnRefEntries(); }
824
825 protected:
826 std::shared_ptr<Cache> target_;
827 };
828
829 class DBTestBase : public testing::Test {
830 public:
831 // Sequence of option configurations to try
832 enum OptionConfig : int {
833 kDefault = 0,
834 kBlockBasedTableWithPrefixHashIndex = 1,
835 kBlockBasedTableWithWholeKeyHashIndex = 2,
836 kPlainTableFirstBytePrefix = 3,
837 kPlainTableCappedPrefix = 4,
838 kPlainTableCappedPrefixNonMmap = 5,
839 kPlainTableAllBytesPrefix = 6,
840 kVectorRep = 7,
841 kHashLinkList = 8,
842 kMergePut = 9,
843 kFilter = 10,
844 kFullFilterWithNewTableReaderForCompactions = 11,
845 kUncompressed = 12,
846 kNumLevel_3 = 13,
847 kDBLogDir = 14,
848 kWalDirAndMmapReads = 15,
849 kManifestFileSize = 16,
850 kPerfOptions = 17,
851 kHashSkipList = 18,
852 kUniversalCompaction = 19,
853 kUniversalCompactionMultiLevel = 20,
854 kCompressedBlockCache = 21,
855 kInfiniteMaxOpenFiles = 22,
856 kxxHashChecksum = 23,
857 kFIFOCompaction = 24,
858 kOptimizeFiltersForHits = 25,
859 kRowCache = 26,
860 kRecycleLogFiles = 27,
861 kConcurrentSkipList = 28,
862 kPipelinedWrite = 29,
863 kConcurrentWALWrites = 30,
864 kDirectIO,
865 kLevelSubcompactions,
866 kBlockBasedTableWithIndexRestartInterval,
867 kBlockBasedTableWithPartitionedIndex,
868 kBlockBasedTableWithPartitionedIndexFormat4,
869 kPartitionedFilterWithNewTableReaderForCompactions,
870 kUniversalSubcompactions,
871 kxxHash64Checksum,
872 kUnorderedWrite,
873 // This must be the last line
874 kEnd,
875 };
876
877 public:
878 std::string dbname_;
879 std::string alternative_wal_dir_;
880 std::string alternative_db_log_dir_;
881 MockEnv* mem_env_;
882 Env* encrypted_env_;
883 SpecialEnv* env_;
884 std::shared_ptr<Env> env_guard_;
885 DB* db_;
886 std::vector<ColumnFamilyHandle*> handles_;
887
888 int option_config_;
889 Options last_options_;
890
891 // Skip some options, as they may not be applicable to a specific test.
892 // To add more skip constants, use values 4, 8, 16, etc.
893 enum OptionSkip {
894 kNoSkip = 0,
895 kSkipDeletesFilterFirst = 1,
896 kSkipUniversalCompaction = 2,
897 kSkipMergePut = 4,
898 kSkipPlainTable = 8,
899 kSkipHashIndex = 16,
900 kSkipNoSeekToLast = 32,
901 kSkipFIFOCompaction = 128,
902 kSkipMmapReads = 256,
903 };
904
905 const int kRangeDelSkipConfigs =
906 // Plain tables do not support range deletions.
907 kSkipPlainTable |
908 // MmapReads disables the iterator pinning that RangeDelAggregator
909 // requires.
910 kSkipMmapReads;
911
912 // `env_do_fsync` decides whether the special Env would do real
913 // fsync for files and directories. Skipping fsync can speed up
914 // tests, but won't cover the exact fsync logic.
915 DBTestBase(const std::string path, bool env_do_fsync);
916
917 ~DBTestBase();
918
919 static std::string Key(int i) {
920 char buf[100];
921 snprintf(buf, sizeof(buf), "key%06d", i);
922 return std::string(buf);
923 }
924
925 static bool ShouldSkipOptions(int option_config, int skip_mask = kNoSkip);
926
927 // Switch to a fresh database with the next option configuration to
928 // test. Return false if there are no more configurations to test.
929 bool ChangeOptions(int skip_mask = kNoSkip);
930
931 // Switch between different compaction styles.
932 bool ChangeCompactOptions();
933
934 // Switch between different WAL-realted options.
935 bool ChangeWalOptions();
936
937 // Switch between different filter policy
938 // Jump from kDefault to kFilter to kFullFilter
939 bool ChangeFilterOptions();
940
941 // Switch between different DB options for file ingestion tests.
942 bool ChangeOptionsForFileIngestionTest();
943
944 // Return the current option configuration.
945 Options CurrentOptions(const anon::OptionsOverride& options_override =
946 anon::OptionsOverride()) const;
947
948 Options CurrentOptions(const Options& default_options,
949 const anon::OptionsOverride& options_override =
950 anon::OptionsOverride()) const;
951
952 Options GetDefaultOptions() const;
953
954 Options GetOptions(int option_config) const {
955 return GetOptions(option_config, GetDefaultOptions());
956 }
957
958 Options GetOptions(int option_config, const Options& default_options,
959 const anon::OptionsOverride& options_override =
960 anon::OptionsOverride()) const;
961
962 DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_); }
963
964 void CreateColumnFamilies(const std::vector<std::string>& cfs,
965 const Options& options);
966
967 void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
968 const Options& options);
969
970 void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
971 const std::vector<Options>& options);
972
973 void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
974 const Options& options);
975
976 Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
977 const std::vector<Options>& options);
978
979 Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
980 const Options& options);
981
982 void Reopen(const Options& options);
983
984 void Close();
985
986 void DestroyAndReopen(const Options& options);
987
988 void Destroy(const Options& options, bool delete_cf_paths = false);
989
990 Status ReadOnlyReopen(const Options& options);
991
992 Status TryReopen(const Options& options);
993
994 bool IsDirectIOSupported();
995
996 bool IsMemoryMappedAccessSupported() const;
997
998 Status Flush(int cf = 0);
999
1000 Status Flush(const std::vector<int>& cf_ids);
1001
1002 Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions());
1003
1004 Status Put(int cf, const Slice& k, const Slice& v,
1005 WriteOptions wo = WriteOptions());
1006
1007 Status Merge(const Slice& k, const Slice& v,
1008 WriteOptions wo = WriteOptions());
1009
1010 Status Merge(int cf, const Slice& k, const Slice& v,
1011 WriteOptions wo = WriteOptions());
1012
1013 Status Delete(const std::string& k);
1014
1015 Status Delete(int cf, const std::string& k);
1016
1017 Status SingleDelete(const std::string& k);
1018
1019 Status SingleDelete(int cf, const std::string& k);
1020
1021 bool SetPreserveDeletesSequenceNumber(SequenceNumber sn);
1022
1023 std::string Get(const std::string& k, const Snapshot* snapshot = nullptr);
1024
1025 std::string Get(int cf, const std::string& k,
1026 const Snapshot* snapshot = nullptr);
1027
1028 Status Get(const std::string& k, PinnableSlice* v);
1029
1030 std::vector<std::string> MultiGet(std::vector<int> cfs,
1031 const std::vector<std::string>& k,
1032 const Snapshot* snapshot,
1033 const bool batched);
1034
1035 std::vector<std::string> MultiGet(const std::vector<std::string>& k,
1036 const Snapshot* snapshot = nullptr);
1037
1038 uint64_t GetNumSnapshots();
1039
1040 uint64_t GetTimeOldestSnapshots();
1041
1042 uint64_t GetSequenceOldestSnapshots();
1043
1044 // Return a string that contains all key,value pairs in order,
1045 // formatted like "(k1->v1)(k2->v2)".
1046 std::string Contents(int cf = 0);
1047
1048 std::string AllEntriesFor(const Slice& user_key, int cf = 0);
1049
1050 #ifndef ROCKSDB_LITE
1051 int NumSortedRuns(int cf = 0);
1052
1053 uint64_t TotalSize(int cf = 0);
1054
1055 uint64_t SizeAtLevel(int level);
1056
1057 size_t TotalLiveFiles(int cf = 0);
1058
1059 size_t CountLiveFiles();
1060
1061 int NumTableFilesAtLevel(int level, int cf = 0);
1062
1063 double CompressionRatioAtLevel(int level, int cf = 0);
1064
1065 int TotalTableFiles(int cf = 0, int levels = -1);
1066 #endif // ROCKSDB_LITE
1067
1068 // Return spread of files per level
1069 std::string FilesPerLevel(int cf = 0);
1070
1071 size_t CountFiles();
1072
1073 uint64_t Size(const Slice& start, const Slice& limit, int cf = 0);
1074
1075 void Compact(int cf, const Slice& start, const Slice& limit,
1076 uint32_t target_path_id);
1077
1078 void Compact(int cf, const Slice& start, const Slice& limit);
1079
1080 void Compact(const Slice& start, const Slice& limit);
1081
1082 // Do n memtable compactions, each of which produces an sstable
1083 // covering the range [small,large].
1084 void MakeTables(int n, const std::string& small, const std::string& large,
1085 int cf = 0);
1086
1087 // Prevent pushing of new sstables into deeper levels by adding
1088 // tables that cover a specified range to all levels.
1089 void FillLevels(const std::string& smallest, const std::string& largest,
1090 int cf);
1091
1092 void MoveFilesToLevel(int level, int cf = 0);
1093
1094 #ifndef ROCKSDB_LITE
1095 void DumpFileCounts(const char* label);
1096 #endif // ROCKSDB_LITE
1097
1098 std::string DumpSSTableList();
1099
1100 static void GetSstFiles(Env* env, std::string path,
1101 std::vector<std::string>* files);
1102
1103 int GetSstFileCount(std::string path);
1104
1105 // this will generate non-overlapping files since it keeps increasing key_idx
1106 void GenerateNewFile(Random* rnd, int* key_idx, bool nowait = false);
1107
1108 void GenerateNewFile(int fd, Random* rnd, int* key_idx, bool nowait = false);
1109
1110 static const int kNumKeysByGenerateNewRandomFile;
1111 static const int KNumKeysByGenerateNewFile = 100;
1112
1113 void GenerateNewRandomFile(Random* rnd, bool nowait = false);
1114
1115 std::string IterStatus(Iterator* iter);
1116
1117 Options OptionsForLogIterTest();
1118
1119 std::string DummyString(size_t len, char c = 'a');
1120
1121 void VerifyIterLast(std::string expected_key, int cf = 0);
1122
1123 // Used to test InplaceUpdate
1124
1125 // If previous value is nullptr or delta is > than previous value,
1126 // sets newValue with delta
1127 // If previous value is not empty,
1128 // updates previous value with 'b' string of previous value size - 1.
1129 static UpdateStatus updateInPlaceSmallerSize(char* prevValue,
1130 uint32_t* prevSize, Slice delta,
1131 std::string* newValue);
1132
1133 static UpdateStatus updateInPlaceSmallerVarintSize(char* prevValue,
1134 uint32_t* prevSize,
1135 Slice delta,
1136 std::string* newValue);
1137
1138 static UpdateStatus updateInPlaceLargerSize(char* prevValue,
1139 uint32_t* prevSize, Slice delta,
1140 std::string* newValue);
1141
1142 static UpdateStatus updateInPlaceNoAction(char* prevValue, uint32_t* prevSize,
1143 Slice delta, std::string* newValue);
1144
1145 // Utility method to test InplaceUpdate
1146 void validateNumberOfEntries(int numValues, int cf = 0);
1147
1148 void CopyFile(const std::string& source, const std::string& destination,
1149 uint64_t size = 0);
1150
1151 Status GetAllSSTFiles(std::unordered_map<std::string, uint64_t>* sst_files,
1152 uint64_t* total_size = nullptr);
1153
1154 std::vector<std::uint64_t> ListTableFiles(Env* env, const std::string& path);
1155
1156 void VerifyDBFromMap(
1157 std::map<std::string, std::string> true_data,
1158 size_t* total_reads_res = nullptr, bool tailing_iter = false,
1159 std::map<std::string, Status> status = std::map<std::string, Status>());
1160
1161 void VerifyDBInternal(
1162 std::vector<std::pair<std::string, std::string>> true_data);
1163
1164 #ifndef ROCKSDB_LITE
1165 uint64_t GetNumberOfSstFilesForColumnFamily(DB* db,
1166 std::string column_family_name);
1167 #endif // ROCKSDB_LITE
1168
1169 uint64_t TestGetTickerCount(const Options& options, Tickers ticker_type) {
1170 return options.statistics->getTickerCount(ticker_type);
1171 }
1172
1173 uint64_t TestGetAndResetTickerCount(const Options& options,
1174 Tickers ticker_type) {
1175 return options.statistics->getAndResetTickerCount(ticker_type);
1176 }
1177
1178 // Note: reverting this setting within the same test run is not yet
1179 // supported
1180 void SetTimeElapseOnlySleepOnReopen(DBOptions* options);
1181
1182 private: // Prone to error on direct use
1183 void MaybeInstallTimeElapseOnlySleep(const DBOptions& options);
1184
1185 bool time_elapse_only_sleep_on_reopen_ = false;
1186 };
1187
1188 } // namespace ROCKSDB_NAMESPACE