1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
21 #include <unordered_set>
25 #include "db/db_impl/db_impl.h"
26 #include "file/filename.h"
27 #include "rocksdb/advanced_options.h"
28 #include "rocksdb/cache.h"
29 #include "rocksdb/compaction_filter.h"
30 #include "rocksdb/convenience.h"
31 #include "rocksdb/db.h"
32 #include "rocksdb/env.h"
33 #include "rocksdb/file_system.h"
34 #include "rocksdb/filter_policy.h"
35 #include "rocksdb/io_status.h"
36 #include "rocksdb/options.h"
37 #include "rocksdb/slice.h"
38 #include "rocksdb/sst_file_writer.h"
39 #include "rocksdb/statistics.h"
40 #include "rocksdb/table.h"
41 #include "rocksdb/utilities/checkpoint.h"
42 #include "table/mock_table.h"
43 #include "table/scoped_arena_iterator.h"
44 #include "test_util/sync_point.h"
45 #include "test_util/testharness.h"
46 #include "util/cast_util.h"
47 #include "util/compression.h"
48 #include "util/mutexlock.h"
49 #include "util/string_util.h"
50 #include "utilities/merge_operators.h"
52 namespace ROCKSDB_NAMESPACE
{
58 explicit AtomicCounter(Env
* env
= NULL
)
59 : env_(env
), cond_count_(&mu_
), count_(0) {}
64 cond_count_
.SignalAll();
72 bool WaitFor(int count
) {
75 uint64_t start
= env_
->NowMicros();
76 while (count_
< count
) {
77 uint64_t now
= env_
->NowMicros();
78 cond_count_
.TimedWait(now
+ /*1s*/ 1 * 1000 * 1000);
79 if (env_
->NowMicros() - start
> /*10s*/ 10 * 1000 * 1000) {
83 GTEST_LOG_(WARNING
) << "WaitFor is taking more time than usual";
93 cond_count_
.SignalAll();
99 port::CondVar cond_count_
;
103 struct OptionsOverride
{
104 std::shared_ptr
<const FilterPolicy
> filter_policy
= nullptr;
105 // These will be used only if filter_policy is set
106 bool partition_filters
= false;
107 // Force using a default block cache. (Setting to false allows ASAN build
108 // use a trivially small block cache for better UAF error detection.)
109 bool full_block_cache
= false;
110 uint64_t metadata_block_size
= 1024;
112 // Used as a bit mask of individual enums in which to skip an XF test point
118 enum SkipPolicy
{ kSkipNone
= 0, kSkipNoSnapshot
= 1, kSkipNoPrefix
= 2 };
120 // Special Env used to delay background operations
121 class SpecialEnv
: public EnvWrapper
{
123 explicit SpecialEnv(Env
* base
, bool time_elapse_only_sleep
= false);
125 static const char* kClassName() { return "SpecialEnv"; }
126 const char* Name() const override
{ return kClassName(); }
128 Status
NewWritableFile(const std::string
& f
, std::unique_ptr
<WritableFile
>* r
,
129 const EnvOptions
& soptions
) override
{
130 class SSTableFile
: public WritableFile
{
133 std::unique_ptr
<WritableFile
> base_
;
136 SSTableFile(SpecialEnv
* env
, std::unique_ptr
<WritableFile
>&& base
)
137 : env_(env
), base_(std::move(base
)) {}
138 Status
Append(const Slice
& data
) override
{
139 if (env_
->table_write_callback_
) {
140 (*env_
->table_write_callback_
)();
142 if (env_
->drop_writes_
.load(std::memory_order_acquire
)) {
143 // Drop writes on the floor
145 } else if (env_
->no_space_
.load(std::memory_order_acquire
)) {
146 return Status::NoSpace("No space left on device");
148 env_
->bytes_written_
+= data
.size();
149 return base_
->Append(data
);
154 const DataVerificationInfo
& /* verification_info */) override
{
157 Status
PositionedAppend(const Slice
& data
, uint64_t offset
) override
{
158 if (env_
->table_write_callback_
) {
159 (*env_
->table_write_callback_
)();
161 if (env_
->drop_writes_
.load(std::memory_order_acquire
)) {
162 // Drop writes on the floor
164 } else if (env_
->no_space_
.load(std::memory_order_acquire
)) {
165 return Status::NoSpace("No space left on device");
167 env_
->bytes_written_
+= data
.size();
168 return base_
->PositionedAppend(data
, offset
);
171 Status
PositionedAppend(
172 const Slice
& data
, uint64_t offset
,
173 const DataVerificationInfo
& /* verification_info */) override
{
174 return PositionedAppend(data
, offset
);
176 Status
Truncate(uint64_t size
) override
{ return base_
->Truncate(size
); }
177 Status
RangeSync(uint64_t offset
, uint64_t nbytes
) override
{
178 Status s
= base_
->RangeSync(offset
, nbytes
);
179 #if !(defined NDEBUG) || !defined(OS_WIN)
180 TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::RangeSync", &s
);
181 #endif // !(defined NDEBUG) || !defined(OS_WIN)
184 Status
Close() override
{
185 // SyncPoint is not supported in Released Windows Mode.
186 #if !(defined NDEBUG) || !defined(OS_WIN)
187 // Check preallocation size
188 // preallocation size is never passed to base file.
189 size_t preallocation_size
= preallocation_block_size();
190 TEST_SYNC_POINT_CALLBACK("DBTestWritableFile.GetPreallocationStatus",
191 &preallocation_size
);
192 #endif // !(defined NDEBUG) || !defined(OS_WIN)
193 Status s
= base_
->Close();
194 #if !(defined NDEBUG) || !defined(OS_WIN)
195 TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Close", &s
);
196 #endif // !(defined NDEBUG) || !defined(OS_WIN)
199 Status
Flush() override
{ return base_
->Flush(); }
200 Status
Sync() override
{
201 ++env_
->sync_counter_
;
202 while (env_
->delay_sstable_sync_
.load(std::memory_order_acquire
)) {
203 env_
->SleepForMicroseconds(100000);
206 if (!env_
->skip_fsync_
) {
209 #if !(defined NDEBUG) || !defined(OS_WIN)
210 TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Sync", &s
);
211 #endif // !(defined NDEBUG) || !defined(OS_WIN)
214 void SetIOPriority(Env::IOPriority pri
) override
{
215 base_
->SetIOPriority(pri
);
217 Env::IOPriority
GetIOPriority() override
{
218 return base_
->GetIOPriority();
220 bool use_direct_io() const override
{ return base_
->use_direct_io(); }
221 Status
Allocate(uint64_t offset
, uint64_t len
) override
{
222 return base_
->Allocate(offset
, len
);
224 size_t GetUniqueId(char* id
, size_t max_size
) const override
{
225 return base_
->GetUniqueId(id
, max_size
);
228 class ManifestFile
: public WritableFile
{
230 ManifestFile(SpecialEnv
* env
, std::unique_ptr
<WritableFile
>&& b
)
231 : env_(env
), base_(std::move(b
)) {}
232 Status
Append(const Slice
& data
) override
{
233 if (env_
->manifest_write_error_
.load(std::memory_order_acquire
)) {
234 return Status::IOError("simulated writer error");
236 return base_
->Append(data
);
241 const DataVerificationInfo
& /*verification_info*/) override
{
245 Status
Truncate(uint64_t size
) override
{ return base_
->Truncate(size
); }
246 Status
Close() override
{ return base_
->Close(); }
247 Status
Flush() override
{ return base_
->Flush(); }
248 Status
Sync() override
{
249 ++env_
->sync_counter_
;
250 if (env_
->manifest_sync_error_
.load(std::memory_order_acquire
)) {
251 return Status::IOError("simulated sync error");
253 if (env_
->skip_fsync_
) {
256 return base_
->Sync();
260 uint64_t GetFileSize() override
{ return base_
->GetFileSize(); }
261 Status
Allocate(uint64_t offset
, uint64_t len
) override
{
262 return base_
->Allocate(offset
, len
);
267 std::unique_ptr
<WritableFile
> base_
;
269 class WalFile
: public WritableFile
{
271 WalFile(SpecialEnv
* env
, std::unique_ptr
<WritableFile
>&& b
)
272 : env_(env
), base_(std::move(b
)) {
273 env_
->num_open_wal_file_
.fetch_add(1);
275 virtual ~WalFile() { env_
->num_open_wal_file_
.fetch_add(-1); }
276 Status
Append(const Slice
& data
) override
{
277 #if !(defined NDEBUG) || !defined(OS_WIN)
278 TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1");
281 if (env_
->log_write_error_
.load(std::memory_order_acquire
)) {
282 s
= Status::IOError("simulated writer error");
285 env_
->log_write_slowdown_
.load(std::memory_order_acquire
);
287 env_
->SleepForMicroseconds(slowdown
);
289 s
= base_
->Append(data
);
291 #if !(defined NDEBUG) || !defined(OS_WIN)
292 TEST_SYNC_POINT("SpecialEnv::WalFile::Append:2");
298 const DataVerificationInfo
& /* verification_info */) override
{
301 Status
Truncate(uint64_t size
) override
{ return base_
->Truncate(size
); }
302 void PrepareWrite(size_t offset
, size_t len
) override
{
303 base_
->PrepareWrite(offset
, len
);
305 void SetPreallocationBlockSize(size_t size
) override
{
306 base_
->SetPreallocationBlockSize(size
);
308 Status
Close() override
{
309 // SyncPoint is not supported in Released Windows Mode.
310 #if !(defined NDEBUG) || !defined(OS_WIN)
311 // Check preallocation size
312 size_t block_size
, last_allocated_block
;
313 base_
->GetPreallocationStatus(&block_size
, &last_allocated_block
);
314 TEST_SYNC_POINT_CALLBACK("DBTestWalFile.GetPreallocationStatus",
316 #endif // !(defined NDEBUG) || !defined(OS_WIN)
318 return base_
->Close();
320 Status
Flush() override
{ return base_
->Flush(); }
321 Status
Sync() override
{
322 ++env_
->sync_counter_
;
323 if (env_
->corrupt_in_sync_
) {
324 EXPECT_OK(Append(std::string(33000, ' ')));
325 return Status::IOError("Ingested Sync Failure");
327 if (env_
->skip_fsync_
) {
330 return base_
->Sync();
333 bool IsSyncThreadSafe() const override
{
334 return env_
->is_wal_sync_thread_safe_
.load();
336 Status
Allocate(uint64_t offset
, uint64_t len
) override
{
337 return base_
->Allocate(offset
, len
);
342 std::unique_ptr
<WritableFile
> base_
;
344 class OtherFile
: public WritableFile
{
346 OtherFile(SpecialEnv
* env
, std::unique_ptr
<WritableFile
>&& b
)
347 : env_(env
), base_(std::move(b
)) {}
348 Status
Append(const Slice
& data
) override
{ return base_
->Append(data
); }
351 const DataVerificationInfo
& /*verification_info*/) override
{
354 Status
Truncate(uint64_t size
) override
{ return base_
->Truncate(size
); }
355 Status
Close() override
{ return base_
->Close(); }
356 Status
Flush() override
{ return base_
->Flush(); }
357 Status
Sync() override
{
358 if (env_
->skip_fsync_
) {
361 return base_
->Sync();
364 uint64_t GetFileSize() override
{ return base_
->GetFileSize(); }
365 Status
Allocate(uint64_t offset
, uint64_t len
) override
{
366 return base_
->Allocate(offset
, len
);
371 std::unique_ptr
<WritableFile
> base_
;
374 if (no_file_overwrite_
.load(std::memory_order_acquire
) &&
375 target()->FileExists(f
).ok()) {
376 return Status::NotSupported("SpecialEnv::no_file_overwrite_ is true.");
379 if (non_writeable_rate_
.load(std::memory_order_acquire
) > 0) {
380 uint32_t random_number
;
382 MutexLock
l(&rnd_mutex_
);
383 random_number
= rnd_
.Uniform(100);
385 if (random_number
< non_writeable_rate_
.load()) {
386 return Status::IOError("simulated random write error");
390 new_writable_count_
++;
392 if (non_writable_count_
.load() > 0) {
393 non_writable_count_
--;
394 return Status::IOError("simulated write error");
397 EnvOptions optimized
= soptions
;
398 if (strstr(f
.c_str(), "MANIFEST") != nullptr ||
399 strstr(f
.c_str(), "log") != nullptr) {
400 optimized
.use_mmap_writes
= false;
401 optimized
.use_direct_writes
= false;
404 Status s
= target()->NewWritableFile(f
, r
, optimized
);
406 if (strstr(f
.c_str(), ".sst") != nullptr) {
407 r
->reset(new SSTableFile(this, std::move(*r
)));
408 } else if (strstr(f
.c_str(), "MANIFEST") != nullptr) {
409 r
->reset(new ManifestFile(this, std::move(*r
)));
410 } else if (strstr(f
.c_str(), "log") != nullptr) {
411 r
->reset(new WalFile(this, std::move(*r
)));
413 r
->reset(new OtherFile(this, std::move(*r
)));
419 Status
NewRandomAccessFile(const std::string
& f
,
420 std::unique_ptr
<RandomAccessFile
>* r
,
421 const EnvOptions
& soptions
) override
{
422 class CountingFile
: public RandomAccessFile
{
424 CountingFile(std::unique_ptr
<RandomAccessFile
>&& target
,
425 anon::AtomicCounter
* counter
,
426 std::atomic
<size_t>* bytes_read
)
427 : target_(std::move(target
)),
429 bytes_read_(bytes_read
) {}
430 virtual Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
431 char* scratch
) const override
{
432 counter_
->Increment();
433 Status s
= target_
->Read(offset
, n
, result
, scratch
);
434 *bytes_read_
+= result
->size();
438 virtual Status
Prefetch(uint64_t offset
, size_t n
) override
{
439 Status s
= target_
->Prefetch(offset
, n
);
445 std::unique_ptr
<RandomAccessFile
> target_
;
446 anon::AtomicCounter
* counter_
;
447 std::atomic
<size_t>* bytes_read_
;
450 class RandomFailureFile
: public RandomAccessFile
{
452 RandomFailureFile(std::unique_ptr
<RandomAccessFile
>&& target
,
453 std::atomic
<uint64_t>* failure_cnt
, uint32_t fail_odd
)
454 : target_(std::move(target
)),
455 fail_cnt_(failure_cnt
),
456 fail_odd_(fail_odd
) {}
457 virtual Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
458 char* scratch
) const override
{
459 if (Random::GetTLSInstance()->OneIn(fail_odd_
)) {
460 fail_cnt_
->fetch_add(1);
461 return Status::IOError("random error");
463 return target_
->Read(offset
, n
, result
, scratch
);
466 virtual Status
Prefetch(uint64_t offset
, size_t n
) override
{
467 return target_
->Prefetch(offset
, n
);
471 std::unique_ptr
<RandomAccessFile
> target_
;
472 std::atomic
<uint64_t>* fail_cnt_
;
476 Status s
= target()->NewRandomAccessFile(f
, r
, soptions
);
477 random_file_open_counter_
++;
479 if (count_random_reads_
) {
480 r
->reset(new CountingFile(std::move(*r
), &random_read_counter_
,
481 &random_read_bytes_counter_
));
482 } else if (rand_reads_fail_odd_
> 0) {
483 r
->reset(new RandomFailureFile(std::move(*r
), &num_reads_fails_
,
484 rand_reads_fail_odd_
));
488 if (s
.ok() && soptions
.compaction_readahead_size
> 0) {
489 compaction_readahead_size_
= soptions
.compaction_readahead_size
;
494 virtual Status
NewSequentialFile(const std::string
& f
,
495 std::unique_ptr
<SequentialFile
>* r
,
496 const EnvOptions
& soptions
) override
{
497 class CountingFile
: public SequentialFile
{
499 CountingFile(std::unique_ptr
<SequentialFile
>&& target
,
500 anon::AtomicCounter
* counter
)
501 : target_(std::move(target
)), counter_(counter
) {}
502 virtual Status
Read(size_t n
, Slice
* result
, char* scratch
) override
{
503 counter_
->Increment();
504 return target_
->Read(n
, result
, scratch
);
506 virtual Status
Skip(uint64_t n
) override
{ return target_
->Skip(n
); }
509 std::unique_ptr
<SequentialFile
> target_
;
510 anon::AtomicCounter
* counter_
;
513 Status s
= target()->NewSequentialFile(f
, r
, soptions
);
514 if (s
.ok() && count_sequential_reads_
) {
515 r
->reset(new CountingFile(std::move(*r
), &sequential_read_counter_
));
520 virtual void SleepForMicroseconds(int micros
) override
{
521 sleep_counter_
.Increment();
522 if (no_slowdown_
|| time_elapse_only_sleep_
) {
523 addon_microseconds_
.fetch_add(micros
);
526 target()->SleepForMicroseconds(micros
);
530 void MockSleepForMicroseconds(int64_t micros
) {
531 sleep_counter_
.Increment();
532 assert(no_slowdown_
);
533 addon_microseconds_
.fetch_add(micros
);
536 void MockSleepForSeconds(int64_t seconds
) {
537 sleep_counter_
.Increment();
538 assert(no_slowdown_
);
539 addon_microseconds_
.fetch_add(seconds
* 1000000);
542 virtual Status
GetCurrentTime(int64_t* unix_time
) override
{
544 if (time_elapse_only_sleep_
) {
545 *unix_time
= maybe_starting_time_
;
547 s
= target()->GetCurrentTime(unix_time
);
550 // mock microseconds elapsed to seconds of time
551 *unix_time
+= addon_microseconds_
.load() / 1000000;
556 virtual uint64_t NowCPUNanos() override
{
557 now_cpu_count_
.fetch_add(1);
558 return target()->NowCPUNanos();
561 virtual uint64_t NowNanos() override
{
562 return (time_elapse_only_sleep_
? 0 : target()->NowNanos()) +
563 addon_microseconds_
.load() * 1000;
566 virtual uint64_t NowMicros() override
{
567 return (time_elapse_only_sleep_
? 0 : target()->NowMicros()) +
568 addon_microseconds_
.load();
571 virtual Status
DeleteFile(const std::string
& fname
) override
{
572 delete_count_
.fetch_add(1);
573 return target()->DeleteFile(fname
);
576 void SetMockSleep(bool enabled
= true) { no_slowdown_
= enabled
; }
578 Status
NewDirectory(const std::string
& name
,
579 std::unique_ptr
<Directory
>* result
) override
{
581 return target()->NewDirectory(name
, result
);
583 class NoopDirectory
: public Directory
{
588 Status
Fsync() override
{ return Status::OK(); }
589 Status
Close() override
{ return Status::OK(); }
592 result
->reset(new NoopDirectory());
597 Status
RenameFile(const std::string
& src
, const std::string
& dest
) override
{
598 rename_count_
.fetch_add(1);
599 if (rename_error_
.load(std::memory_order_acquire
)) {
600 return Status::NotSupported("Simulated `RenameFile()` error.");
602 return target()->RenameFile(src
, dest
);
605 // Something to return when mocking current time
606 const int64_t maybe_starting_time_
;
609 port::Mutex rnd_mutex_
; // Lock to pretect rnd_
611 // sstable Sync() calls are blocked while this pointer is non-nullptr.
612 std::atomic
<bool> delay_sstable_sync_
;
614 // Drop writes on the floor while this pointer is non-nullptr.
615 std::atomic
<bool> drop_writes_
;
617 // Simulate no-space errors while this pointer is non-nullptr.
618 std::atomic
<bool> no_space_
;
620 // Simulate non-writable file system while this pointer is non-nullptr
621 std::atomic
<bool> non_writable_
;
623 // Force sync of manifest files to fail while this pointer is non-nullptr
624 std::atomic
<bool> manifest_sync_error_
;
626 // Force write to manifest files to fail while this pointer is non-nullptr
627 std::atomic
<bool> manifest_write_error_
;
629 // Force write to log files to fail while this pointer is non-nullptr
630 std::atomic
<bool> log_write_error_
;
632 // Force `RenameFile()` to fail while this pointer is non-nullptr
633 std::atomic
<bool> rename_error_
{false};
635 // Slow down every log write, in micro-seconds.
636 std::atomic
<int> log_write_slowdown_
;
638 // If true, returns Status::NotSupported for file overwrite.
639 std::atomic
<bool> no_file_overwrite_
;
641 // Number of WAL files that are still open for write.
642 std::atomic
<int> num_open_wal_file_
;
644 bool count_random_reads_
;
645 uint32_t rand_reads_fail_odd_
= 0;
646 std::atomic
<uint64_t> num_reads_fails_
;
647 anon::AtomicCounter random_read_counter_
;
648 std::atomic
<size_t> random_read_bytes_counter_
;
649 std::atomic
<int> random_file_open_counter_
;
651 bool count_sequential_reads_
;
652 anon::AtomicCounter sequential_read_counter_
;
654 anon::AtomicCounter sleep_counter_
;
656 std::atomic
<int64_t> bytes_written_
;
658 std::atomic
<int> sync_counter_
;
660 // If true, all fsync to files and directories are skipped.
661 bool skip_fsync_
= false;
663 // If true, ingest the corruption to file during sync.
664 bool corrupt_in_sync_
= false;
666 std::atomic
<uint32_t> non_writeable_rate_
;
668 std::atomic
<uint32_t> new_writable_count_
;
670 std::atomic
<uint32_t> non_writable_count_
;
672 std::function
<void()>* table_write_callback_
;
674 std::atomic
<int> now_cpu_count_
;
676 std::atomic
<int> delete_count_
;
678 std::atomic
<int> rename_count_
{0};
680 std::atomic
<bool> is_wal_sync_thread_safe_
{true};
682 std::atomic
<size_t> compaction_readahead_size_
{};
684 private: // accessing these directly is prone to error
685 friend class DBTestBase
;
687 std::atomic
<int64_t> addon_microseconds_
{0};
689 // Do not modify in the env of a running DB (could cause deadlock)
690 std::atomic
<bool> time_elapse_only_sleep_
;
696 class FileTemperatureTestFS
: public FileSystemWrapper
{
698 explicit FileTemperatureTestFS(const std::shared_ptr
<FileSystem
>& fs
)
699 : FileSystemWrapper(fs
) {}
701 static const char* kClassName() { return "FileTemperatureTestFS"; }
702 const char* Name() const override
{ return kClassName(); }
704 IOStatus
NewSequentialFile(const std::string
& fname
, const FileOptions
& opts
,
705 std::unique_ptr
<FSSequentialFile
>* result
,
706 IODebugContext
* dbg
) override
{
707 IOStatus s
= target()->NewSequentialFile(fname
, opts
, result
, dbg
);
710 if (ParseFileName(GetFileName(fname
), &number
, &type
) &&
711 type
== kTableFile
) {
712 MutexLock
lock(&mu_
);
713 requested_sst_file_temperatures_
.emplace_back(number
, opts
.temperature
);
715 if (opts
.temperature
!= Temperature::kUnknown
) {
716 // Be extra picky and don't open if a wrong non-unknown temperature is
718 auto e
= current_sst_file_temperatures_
.find(number
);
719 if (e
!= current_sst_file_temperatures_
.end() &&
720 e
->second
!= opts
.temperature
) {
722 return IOStatus::PathNotFound("Temperature mismatch on " + fname
);
725 *result
= WrapWithTemperature
<FSSequentialFileOwnerWrapper
>(
726 number
, std::move(*result
));
732 IOStatus
NewRandomAccessFile(const std::string
& fname
,
733 const FileOptions
& opts
,
734 std::unique_ptr
<FSRandomAccessFile
>* result
,
735 IODebugContext
* dbg
) override
{
736 IOStatus s
= target()->NewRandomAccessFile(fname
, opts
, result
, dbg
);
739 if (ParseFileName(GetFileName(fname
), &number
, &type
) &&
740 type
== kTableFile
) {
741 MutexLock
lock(&mu_
);
742 requested_sst_file_temperatures_
.emplace_back(number
, opts
.temperature
);
744 if (opts
.temperature
!= Temperature::kUnknown
) {
745 // Be extra picky and don't open if a wrong non-unknown temperature is
747 auto e
= current_sst_file_temperatures_
.find(number
);
748 if (e
!= current_sst_file_temperatures_
.end() &&
749 e
->second
!= opts
.temperature
) {
751 return IOStatus::PathNotFound("Temperature mismatch on " + fname
);
754 *result
= WrapWithTemperature
<FSRandomAccessFileOwnerWrapper
>(
755 number
, std::move(*result
));
761 void PopRequestedSstFileTemperatures(
762 std::vector
<std::pair
<uint64_t, Temperature
>>* out
= nullptr) {
763 MutexLock
lock(&mu_
);
765 *out
= std::move(requested_sst_file_temperatures_
);
766 assert(requested_sst_file_temperatures_
.empty());
768 requested_sst_file_temperatures_
.clear();
772 IOStatus
NewWritableFile(const std::string
& fname
, const FileOptions
& opts
,
773 std::unique_ptr
<FSWritableFile
>* result
,
774 IODebugContext
* dbg
) override
{
777 if (ParseFileName(GetFileName(fname
), &number
, &type
) &&
778 type
== kTableFile
) {
779 MutexLock
lock(&mu_
);
780 current_sst_file_temperatures_
[number
] = opts
.temperature
;
782 return target()->NewWritableFile(fname
, opts
, result
, dbg
);
785 void CopyCurrentSstFileTemperatures(std::map
<uint64_t, Temperature
>* out
) {
786 MutexLock
lock(&mu_
);
787 *out
= current_sst_file_temperatures_
;
790 void OverrideSstFileTemperature(uint64_t number
, Temperature temp
) {
791 MutexLock
lock(&mu_
);
792 current_sst_file_temperatures_
[number
] = temp
;
797 std::vector
<std::pair
<uint64_t, Temperature
>>
798 requested_sst_file_temperatures_
;
799 std::map
<uint64_t, Temperature
> current_sst_file_temperatures_
;
801 std::string
GetFileName(const std::string
& fname
) {
802 auto filename
= fname
.substr(fname
.find_last_of(kFilePathSeparator
) + 1);
803 // workaround only for Windows that the file path could contain both Windows
804 // FilePathSeparator and '/'
805 filename
= filename
.substr(filename
.find_last_of('/') + 1);
809 template <class FileOwnerWrapperT
, /*inferred*/ class FileT
>
810 std::unique_ptr
<FileT
> WrapWithTemperature(uint64_t number
,
811 std::unique_ptr
<FileT
>&& t
) {
812 class FileWithTemp
: public FileOwnerWrapperT
{
814 FileWithTemp(FileTemperatureTestFS
* fs
, uint64_t number
,
815 std::unique_ptr
<FileT
>&& t
)
816 : FileOwnerWrapperT(std::move(t
)), fs_(fs
), number_(number
) {}
818 Temperature
GetTemperature() const override
{
819 MutexLock
lock(&fs_
->mu_
);
820 return fs_
->current_sst_file_temperatures_
[number_
];
824 FileTemperatureTestFS
* fs_
;
827 return std::make_unique
<FileWithTemp
>(this, number
, std::move(t
));
831 class OnFileDeletionListener
: public EventListener
{
833 OnFileDeletionListener() : matched_count_(0), expected_file_name_("") {}
834 const char* Name() const override
{ return kClassName(); }
835 static const char* kClassName() { return "OnFileDeletionListener"; }
837 void SetExpectedFileName(const std::string file_name
) {
838 expected_file_name_
= file_name
;
841 void VerifyMatchedCount(size_t expected_value
) {
842 ASSERT_EQ(matched_count_
, expected_value
);
845 void OnTableFileDeleted(const TableFileDeletionInfo
& info
) override
{
846 if (expected_file_name_
!= "") {
847 ASSERT_EQ(expected_file_name_
, info
.file_path
);
848 expected_file_name_
= "";
854 size_t matched_count_
;
855 std::string expected_file_name_
;
858 class FlushCounterListener
: public EventListener
{
860 const char* Name() const override
{ return kClassName(); }
861 static const char* kClassName() { return "FlushCounterListener"; }
862 std::atomic
<int> count
{0};
863 std::atomic
<FlushReason
> expected_flush_reason
{FlushReason::kOthers
};
865 void OnFlushBegin(DB
* /*db*/, const FlushJobInfo
& flush_job_info
) override
{
867 ASSERT_EQ(expected_flush_reason
.load(), flush_job_info
.flush_reason
);
872 // A test merge operator mimics put but also fails if one of merge operands is
874 class TestPutOperator
: public MergeOperator
{
876 virtual bool FullMergeV2(const MergeOperationInput
& merge_in
,
877 MergeOperationOutput
* merge_out
) const override
{
878 if (merge_in
.existing_value
!= nullptr &&
879 *(merge_in
.existing_value
) == "corrupted") {
882 for (auto value
: merge_in
.operand_list
) {
883 if (value
== "corrupted") {
887 merge_out
->existing_operand
= merge_in
.operand_list
.back();
891 virtual const char* Name() const override
{ return "TestPutOperator"; }
894 // A wrapper around Cache that can easily be extended with instrumentation,
896 class CacheWrapper
: public Cache
{
898 explicit CacheWrapper(std::shared_ptr
<Cache
> target
)
899 : target_(std::move(target
)) {}
901 const char* Name() const override
{ return target_
->Name(); }
904 Status
Insert(const Slice
& key
, void* value
, size_t charge
,
905 void (*deleter
)(const Slice
& key
, void* value
),
906 Handle
** handle
= nullptr,
907 Priority priority
= Priority::LOW
) override
{
908 return target_
->Insert(key
, value
, charge
, deleter
, handle
, priority
);
912 Handle
* Lookup(const Slice
& key
, Statistics
* stats
= nullptr) override
{
913 return target_
->Lookup(key
, stats
);
916 bool Ref(Handle
* handle
) override
{ return target_
->Ref(handle
); }
918 using Cache::Release
;
919 bool Release(Handle
* handle
, bool erase_if_last_ref
= false) override
{
920 return target_
->Release(handle
, erase_if_last_ref
);
923 void* Value(Handle
* handle
) override
{ return target_
->Value(handle
); }
925 void Erase(const Slice
& key
) override
{ target_
->Erase(key
); }
926 uint64_t NewId() override
{ return target_
->NewId(); }
928 void SetCapacity(size_t capacity
) override
{ target_
->SetCapacity(capacity
); }
930 void SetStrictCapacityLimit(bool strict_capacity_limit
) override
{
931 target_
->SetStrictCapacityLimit(strict_capacity_limit
);
934 bool HasStrictCapacityLimit() const override
{
935 return target_
->HasStrictCapacityLimit();
938 size_t GetCapacity() const override
{ return target_
->GetCapacity(); }
940 size_t GetUsage() const override
{ return target_
->GetUsage(); }
942 size_t GetUsage(Handle
* handle
) const override
{
943 return target_
->GetUsage(handle
);
946 size_t GetPinnedUsage() const override
{ return target_
->GetPinnedUsage(); }
948 size_t GetCharge(Handle
* handle
) const override
{
949 return target_
->GetCharge(handle
);
952 DeleterFn
GetDeleter(Handle
* handle
) const override
{
953 return target_
->GetDeleter(handle
);
956 void ApplyToAllCacheEntries(void (*callback
)(void*, size_t),
957 bool thread_safe
) override
{
958 target_
->ApplyToAllCacheEntries(callback
, thread_safe
);
961 void ApplyToAllEntries(
962 const std::function
<void(const Slice
& key
, void* value
, size_t charge
,
963 DeleterFn deleter
)>& callback
,
964 const ApplyToAllEntriesOptions
& opts
) override
{
965 target_
->ApplyToAllEntries(callback
, opts
);
968 void EraseUnRefEntries() override
{ target_
->EraseUnRefEntries(); }
971 std::shared_ptr
<Cache
> target_
;
975 * A cache wrapper that tracks certain CacheEntryRole's cache charge, its
976 * peaks and increments
984 * increments = {p1-a, p2-b}
986 template <CacheEntryRole R
>
987 class TargetCacheChargeTrackingCache
: public CacheWrapper
{
989 explicit TargetCacheChargeTrackingCache(std::shared_ptr
<Cache
> target
);
992 Status
Insert(const Slice
& key
, void* value
, size_t charge
,
993 void (*deleter
)(const Slice
& key
, void* value
),
994 Handle
** handle
= nullptr,
995 Priority priority
= Priority::LOW
) override
;
997 using Cache::Release
;
998 bool Release(Handle
* handle
, bool erase_if_last_ref
= false) override
;
1000 std::size_t GetCacheCharge() { return cur_cache_charge_
; }
1002 std::deque
<std::size_t> GetChargedCachePeaks() { return cache_charge_peaks_
; }
1004 std::size_t GetChargedCacheIncrementSum() {
1005 return cache_charge_increments_sum_
;
1009 static const Cache::DeleterFn kNoopDeleter
;
1011 std::size_t cur_cache_charge_
;
1012 std::size_t cache_charge_peak_
;
1013 std::size_t cache_charge_increment_
;
1014 bool last_peak_tracked_
;
1015 std::deque
<std::size_t> cache_charge_peaks_
;
1016 std::size_t cache_charge_increments_sum_
;
1019 class DBTestBase
: public testing::Test
{
1021 // Sequence of option configurations to try
1022 enum OptionConfig
: int {
1024 kBlockBasedTableWithPrefixHashIndex
= 1,
1025 kBlockBasedTableWithWholeKeyHashIndex
= 2,
1026 kPlainTableFirstBytePrefix
= 3,
1027 kPlainTableCappedPrefix
= 4,
1028 kPlainTableCappedPrefixNonMmap
= 5,
1029 kPlainTableAllBytesPrefix
= 6,
1034 kFullFilterWithNewTableReaderForCompactions
= 11,
1038 kWalDirAndMmapReads
= 15,
1039 kManifestFileSize
= 16,
1042 kUniversalCompaction
= 19,
1043 kUniversalCompactionMultiLevel
= 20,
1044 kCompressedBlockCache
= 21,
1045 kInfiniteMaxOpenFiles
= 22,
1046 kCRC32cChecksum
= 23,
1047 kFIFOCompaction
= 24,
1048 kOptimizeFiltersForHits
= 25,
1050 kRecycleLogFiles
= 27,
1051 kConcurrentSkipList
= 28,
1052 kPipelinedWrite
= 29,
1053 kConcurrentWALWrites
= 30,
1055 kLevelSubcompactions
,
1056 kBlockBasedTableWithIndexRestartInterval
,
1057 kBlockBasedTableWithPartitionedIndex
,
1058 kBlockBasedTableWithPartitionedIndexFormat4
,
1059 kBlockBasedTableWithLatestFormat
,
1060 kPartitionedFilterWithNewTableReaderForCompactions
,
1061 kUniversalSubcompactions
,
1063 // This must be the last line
1068 std::string dbname_
;
1069 std::string alternative_wal_dir_
;
1070 std::string alternative_db_log_dir_
;
1072 Env
* encrypted_env_
;
1074 std::shared_ptr
<Env
> env_guard_
;
1076 std::vector
<ColumnFamilyHandle
*> handles_
;
1079 Options last_options_
;
1081 // Skip some options, as they may not be applicable to a specific test.
1082 // To add more skip constants, use values 4, 8, 16, etc.
1085 kSkipDeletesFilterFirst
= 1,
1086 kSkipUniversalCompaction
= 2,
1088 kSkipPlainTable
= 8,
1089 kSkipHashIndex
= 16,
1090 kSkipNoSeekToLast
= 32,
1091 kSkipFIFOCompaction
= 128,
1092 kSkipMmapReads
= 256,
1095 const int kRangeDelSkipConfigs
=
1096 // Plain tables do not support range deletions.
1098 // MmapReads disables the iterator pinning that RangeDelAggregator
1102 // `env_do_fsync` decides whether the special Env would do real
1103 // fsync for files and directories. Skipping fsync can speed up
1104 // tests, but won't cover the exact fsync logic.
1105 DBTestBase(const std::string path
, bool env_do_fsync
);
1109 static std::string
Key(int i
) {
1111 snprintf(buf
, sizeof(buf
), "key%06d", i
);
1112 return std::string(buf
);
1115 static bool ShouldSkipOptions(int option_config
, int skip_mask
= kNoSkip
);
1117 // Switch to a fresh database with the next option configuration to
1118 // test. Return false if there are no more configurations to test.
1119 bool ChangeOptions(int skip_mask
= kNoSkip
);
1121 // Switch between different compaction styles.
1122 bool ChangeCompactOptions();
1124 // Switch between different WAL-realted options.
1125 bool ChangeWalOptions();
1127 // Switch between different filter policy
1128 // Jump from kDefault to kFilter to kFullFilter
1129 bool ChangeFilterOptions();
1131 // Switch between different DB options for file ingestion tests.
1132 bool ChangeOptionsForFileIngestionTest();
1134 // Return the current option configuration.
1135 Options
CurrentOptions(const anon::OptionsOverride
& options_override
=
1136 anon::OptionsOverride()) const;
1138 Options
CurrentOptions(const Options
& default_options
,
1139 const anon::OptionsOverride
& options_override
=
1140 anon::OptionsOverride()) const;
1142 Options
GetDefaultOptions() const;
1144 Options
GetOptions(int option_config
) const {
1145 return GetOptions(option_config
, GetDefaultOptions());
1148 Options
GetOptions(int option_config
, const Options
& default_options
,
1149 const anon::OptionsOverride
& options_override
=
1150 anon::OptionsOverride()) const;
1152 DBImpl
* dbfull() { return static_cast_with_check
<DBImpl
>(db_
); }
1154 void CreateColumnFamilies(const std::vector
<std::string
>& cfs
,
1155 const Options
& options
);
1157 void CreateAndReopenWithCF(const std::vector
<std::string
>& cfs
,
1158 const Options
& options
);
1160 void ReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
1161 const std::vector
<Options
>& options
);
1163 void ReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
1164 const Options
& options
);
1166 Status
TryReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
1167 const std::vector
<Options
>& options
);
1169 Status
TryReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
1170 const Options
& options
);
1172 void Reopen(const Options
& options
);
1176 void DestroyAndReopen(const Options
& options
);
1178 void Destroy(const Options
& options
, bool delete_cf_paths
= false);
1180 Status
ReadOnlyReopen(const Options
& options
);
1182 Status
TryReopen(const Options
& options
);
1184 bool IsDirectIOSupported();
1186 bool IsMemoryMappedAccessSupported() const;
1188 Status
Flush(int cf
= 0);
1190 Status
Flush(const std::vector
<int>& cf_ids
);
1192 Status
Put(const Slice
& k
, const Slice
& v
, WriteOptions wo
= WriteOptions());
1194 Status
Put(int cf
, const Slice
& k
, const Slice
& v
,
1195 WriteOptions wo
= WriteOptions());
1197 Status
Merge(const Slice
& k
, const Slice
& v
,
1198 WriteOptions wo
= WriteOptions());
1200 Status
Merge(int cf
, const Slice
& k
, const Slice
& v
,
1201 WriteOptions wo
= WriteOptions());
1203 Status
Delete(const std::string
& k
);
1205 Status
Delete(int cf
, const std::string
& k
);
1207 Status
SingleDelete(const std::string
& k
);
1209 Status
SingleDelete(int cf
, const std::string
& k
);
1211 std::string
Get(const std::string
& k
, const Snapshot
* snapshot
= nullptr);
1213 std::string
Get(int cf
, const std::string
& k
,
1214 const Snapshot
* snapshot
= nullptr);
1216 Status
Get(const std::string
& k
, PinnableSlice
* v
);
1218 std::vector
<std::string
> MultiGet(std::vector
<int> cfs
,
1219 const std::vector
<std::string
>& k
,
1220 const Snapshot
* snapshot
,
1222 const bool async
= false);
1224 std::vector
<std::string
> MultiGet(const std::vector
<std::string
>& k
,
1225 const Snapshot
* snapshot
= nullptr,
1226 const bool async
= false);
1228 uint64_t GetNumSnapshots();
1230 uint64_t GetTimeOldestSnapshots();
1232 uint64_t GetSequenceOldestSnapshots();
1234 // Return a string that contains all key,value pairs in order,
1235 // formatted like "(k1->v1)(k2->v2)".
1236 std::string
Contents(int cf
= 0);
1238 std::string
AllEntriesFor(const Slice
& user_key
, int cf
= 0);
1240 // Similar to AllEntriesFor but this function also covers reopen with fifo.
1241 // Note that test cases with snapshots or entries in memtable should simply
1242 // use AllEntriesFor instead as snapshots and entries in memtable will
1243 // survive after db reopen.
1244 void CheckAllEntriesWithFifoReopen(const std::string
& expected_value
,
1245 const Slice
& user_key
, int cf
,
1246 const std::vector
<std::string
>& cfs
,
1247 const Options
& options
);
1249 #ifndef ROCKSDB_LITE
1250 int NumSortedRuns(int cf
= 0);
1252 uint64_t TotalSize(int cf
= 0);
1254 uint64_t SizeAtLevel(int level
);
1256 size_t TotalLiveFiles(int cf
= 0);
1258 size_t CountLiveFiles();
1260 int NumTableFilesAtLevel(int level
, int cf
= 0);
1262 double CompressionRatioAtLevel(int level
, int cf
= 0);
1264 int TotalTableFiles(int cf
= 0, int levels
= -1);
1265 #endif // ROCKSDB_LITE
1267 std::vector
<uint64_t> GetBlobFileNumbers();
1269 // Return spread of files per level
1270 std::string
FilesPerLevel(int cf
= 0);
1272 size_t CountFiles();
1274 Status
CountFiles(size_t* count
);
1276 Status
Size(const Slice
& start
, const Slice
& limit
, uint64_t* size
) {
1277 return Size(start
, limit
, 0, size
);
1280 Status
Size(const Slice
& start
, const Slice
& limit
, int cf
, uint64_t* size
);
1282 void Compact(int cf
, const Slice
& start
, const Slice
& limit
,
1283 uint32_t target_path_id
);
1285 void Compact(int cf
, const Slice
& start
, const Slice
& limit
);
1287 void Compact(const Slice
& start
, const Slice
& limit
);
1289 // Do n memtable compactions, each of which produces an sstable
1290 // covering the range [small,large].
1291 void MakeTables(int n
, const std::string
& small
, const std::string
& large
,
1294 // Prevent pushing of new sstables into deeper levels by adding
1295 // tables that cover a specified range to all levels.
1296 void FillLevels(const std::string
& smallest
, const std::string
& largest
,
1299 void MoveFilesToLevel(int level
, int cf
= 0);
1301 #ifndef ROCKSDB_LITE
1302 void DumpFileCounts(const char* label
);
1303 #endif // ROCKSDB_LITE
1305 std::string
DumpSSTableList();
1307 static void GetSstFiles(Env
* env
, std::string path
,
1308 std::vector
<std::string
>* files
);
1310 int GetSstFileCount(std::string path
);
1312 // this will generate non-overlapping files since it keeps increasing key_idx
1313 void GenerateNewFile(Random
* rnd
, int* key_idx
, bool nowait
= false);
1315 void GenerateNewFile(int fd
, Random
* rnd
, int* key_idx
, bool nowait
= false);
1317 static const int kNumKeysByGenerateNewRandomFile
;
1318 static const int KNumKeysByGenerateNewFile
= 100;
1320 void GenerateNewRandomFile(Random
* rnd
, bool nowait
= false);
1322 std::string
IterStatus(Iterator
* iter
);
1324 Options
OptionsForLogIterTest();
1326 std::string
DummyString(size_t len
, char c
= 'a');
1328 void VerifyIterLast(std::string expected_key
, int cf
= 0);
1330 // Used to test InplaceUpdate
1332 // If previous value is nullptr or delta is > than previous value,
1333 // sets newValue with delta
1334 // If previous value is not empty,
1335 // updates previous value with 'b' string of previous value size - 1.
1336 static UpdateStatus
updateInPlaceSmallerSize(char* prevValue
,
1337 uint32_t* prevSize
, Slice delta
,
1338 std::string
* newValue
);
1340 static UpdateStatus
updateInPlaceSmallerVarintSize(char* prevValue
,
1343 std::string
* newValue
);
1345 static UpdateStatus
updateInPlaceLargerSize(char* prevValue
,
1346 uint32_t* prevSize
, Slice delta
,
1347 std::string
* newValue
);
1349 static UpdateStatus
updateInPlaceNoAction(char* prevValue
, uint32_t* prevSize
,
1350 Slice delta
, std::string
* newValue
);
1352 // Utility method to test InplaceUpdate
1353 void validateNumberOfEntries(int numValues
, int cf
= 0);
1355 void CopyFile(const std::string
& source
, const std::string
& destination
,
1358 Status
GetAllDataFiles(const FileType file_type
,
1359 std::unordered_map
<std::string
, uint64_t>* sst_files
,
1360 uint64_t* total_size
= nullptr);
1362 std::vector
<std::uint64_t> ListTableFiles(Env
* env
, const std::string
& path
);
1364 void VerifyDBFromMap(
1365 std::map
<std::string
, std::string
> true_data
,
1366 size_t* total_reads_res
= nullptr, bool tailing_iter
= false,
1367 std::map
<std::string
, Status
> status
= std::map
<std::string
, Status
>());
1369 void VerifyDBInternal(
1370 std::vector
<std::pair
<std::string
, std::string
>> true_data
);
1372 #ifndef ROCKSDB_LITE
1373 uint64_t GetNumberOfSstFilesForColumnFamily(DB
* db
,
1374 std::string column_family_name
);
1376 uint64_t GetSstSizeHelper(Temperature temperature
);
1377 #endif // ROCKSDB_LITE
1379 uint64_t TestGetTickerCount(const Options
& options
, Tickers ticker_type
) {
1380 return options
.statistics
->getTickerCount(ticker_type
);
1383 uint64_t TestGetAndResetTickerCount(const Options
& options
,
1384 Tickers ticker_type
) {
1385 return options
.statistics
->getAndResetTickerCount(ticker_type
);
1388 // Note: reverting this setting within the same test run is not yet
1390 void SetTimeElapseOnlySleepOnReopen(DBOptions
* options
);
1392 private: // Prone to error on direct use
1393 void MaybeInstallTimeElapseOnlySleep(const DBOptions
& options
);
1395 bool time_elapse_only_sleep_on_reopen_
= false;
1398 // For verifying that all files generated by current version have SST
1400 void VerifySstUniqueIds(const TablePropertiesCollection
& props
);
1402 } // namespace ROCKSDB_NAMESPACE