1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
16 #include "rocksdb/compaction_filter.h"
17 #include "rocksdb/env.h"
18 #include "rocksdb/iterator.h"
19 #include "rocksdb/merge_operator.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/slice.h"
22 #include "rocksdb/table.h"
23 #include "table/block_based_table_factory.h"
24 #include "table/internal_iterator.h"
25 #include "table/plain_table_factory.h"
26 #include "util/mutexlock.h"
27 #include "util/random.h"
31 class SequentialFileReader
;
35 // Store in *dst a random string of length "len" and return a Slice that
36 // references the generated data.
37 extern Slice
RandomString(Random
* rnd
, int len
, std::string
* dst
);
39 extern std::string
RandomHumanReadableString(Random
* rnd
, int len
);
41 // Return a random key with the specified length that may contain interesting
42 // characters (e.g. \x00, \xff, etc.).
43 enum RandomKeyType
: char { RANDOM
, LARGEST
, SMALLEST
, MIDDLE
};
44 extern std::string
RandomKey(Random
* rnd
, int len
,
45 RandomKeyType type
= RandomKeyType::RANDOM
);
47 // Store in *dst a string of length "len" that will compress to
48 // "N*compressed_fraction" bytes and return a Slice that references
49 // the generated data.
50 extern Slice
CompressibleString(Random
* rnd
, double compressed_fraction
,
51 int len
, std::string
* dst
);
53 // A wrapper that allows injection of errors.
54 class ErrorEnv
: public EnvWrapper
{
56 bool writable_file_error_
;
57 int num_writable_file_errors_
;
59 ErrorEnv() : EnvWrapper(Env::Default()),
60 writable_file_error_(false),
61 num_writable_file_errors_(0) { }
63 virtual Status
NewWritableFile(const std::string
& fname
,
64 unique_ptr
<WritableFile
>* result
,
65 const EnvOptions
& soptions
) override
{
67 if (writable_file_error_
) {
68 ++num_writable_file_errors_
;
69 return Status::IOError(fname
, "fake error");
71 return target()->NewWritableFile(fname
, result
, soptions
);
75 // An internal comparator that just forward comparing results from the
76 // user comparator in it. Can be used to test entities that have no dependency
77 // on internal key structure but consumes InternalKeyComparator, like
79 class PlainInternalKeyComparator
: public InternalKeyComparator
{
81 explicit PlainInternalKeyComparator(const Comparator
* c
)
82 : InternalKeyComparator(c
) {}
84 virtual ~PlainInternalKeyComparator() {}
86 virtual int Compare(const Slice
& a
, const Slice
& b
) const override
{
87 return user_comparator()->Compare(a
, b
);
89 virtual void FindShortestSeparator(std::string
* start
,
90 const Slice
& limit
) const override
{
91 user_comparator()->FindShortestSeparator(start
, limit
);
93 virtual void FindShortSuccessor(std::string
* key
) const override
{
94 user_comparator()->FindShortSuccessor(key
);
98 // A test comparator which compare two strings in this way:
99 // (1) first compare prefix of 8 bytes in alphabet order,
100 // (2) if two strings share the same prefix, sort the other part of the string
101 // in the reverse alphabet order.
102 // This helps simulate the case of compounded key of [entity][timestamp] and
103 // latest timestamp first.
104 class SimpleSuffixReverseComparator
: public Comparator
{
106 SimpleSuffixReverseComparator() {}
108 virtual const char* Name() const override
{
109 return "SimpleSuffixReverseComparator";
112 virtual int Compare(const Slice
& a
, const Slice
& b
) const override
{
113 Slice prefix_a
= Slice(a
.data(), 8);
114 Slice prefix_b
= Slice(b
.data(), 8);
115 int prefix_comp
= prefix_a
.compare(prefix_b
);
116 if (prefix_comp
!= 0) {
119 Slice suffix_a
= Slice(a
.data() + 8, a
.size() - 8);
120 Slice suffix_b
= Slice(b
.data() + 8, b
.size() - 8);
121 return -(suffix_a
.compare(suffix_b
));
124 virtual void FindShortestSeparator(std::string
* start
,
125 const Slice
& limit
) const override
{}
127 virtual void FindShortSuccessor(std::string
* key
) const override
{}
130 // Returns a user key comparator that can be used for comparing two uint64_t
131 // slices. Instead of comparing slices byte-wise, it compares all the 8 bytes
132 // at once. Assumes same endian-ness is used though the database's lifetime.
133 // Symantics of comparison would differ from Bytewise comparator in little
135 extern const Comparator
* Uint64Comparator();
137 // Iterator over a vector of keys/values
138 class VectorIterator
: public InternalIterator
{
140 explicit VectorIterator(const std::vector
<std::string
>& keys
)
141 : keys_(keys
), current_(keys
.size()) {
142 std::sort(keys_
.begin(), keys_
.end());
143 values_
.resize(keys
.size());
146 VectorIterator(const std::vector
<std::string
>& keys
,
147 const std::vector
<std::string
>& values
)
148 : keys_(keys
), values_(values
), current_(keys
.size()) {
149 assert(keys_
.size() == values_
.size());
152 virtual bool Valid() const override
{ return current_
< keys_
.size(); }
154 virtual void SeekToFirst() override
{ current_
= 0; }
155 virtual void SeekToLast() override
{ current_
= keys_
.size() - 1; }
157 virtual void Seek(const Slice
& target
) override
{
158 current_
= std::lower_bound(keys_
.begin(), keys_
.end(), target
.ToString()) -
162 virtual void SeekForPrev(const Slice
& target
) override
{
163 current_
= std::upper_bound(keys_
.begin(), keys_
.end(), target
.ToString()) -
172 virtual void Next() override
{ current_
++; }
173 virtual void Prev() override
{ current_
--; }
175 virtual Slice
key() const override
{ return Slice(keys_
[current_
]); }
176 virtual Slice
value() const override
{ return Slice(values_
[current_
]); }
178 virtual Status
status() const override
{ return Status::OK(); }
181 std::vector
<std::string
> keys_
;
182 std::vector
<std::string
> values_
;
185 extern WritableFileWriter
* GetWritableFileWriter(WritableFile
* wf
);
187 extern RandomAccessFileReader
* GetRandomAccessFileReader(RandomAccessFile
* raf
);
189 extern SequentialFileReader
* GetSequentialFileReader(SequentialFile
* se
);
191 class StringSink
: public WritableFile
{
193 std::string contents_
;
195 explicit StringSink(Slice
* reader_contents
= nullptr) :
198 reader_contents_(reader_contents
),
200 if (reader_contents_
!= nullptr) {
201 *reader_contents_
= Slice(contents_
.data(), 0);
205 const std::string
& contents() const { return contents_
; }
207 virtual Status
Truncate(uint64_t size
) override
{
208 contents_
.resize(static_cast<size_t>(size
));
211 virtual Status
Close() override
{ return Status::OK(); }
212 virtual Status
Flush() override
{
213 if (reader_contents_
!= nullptr) {
214 assert(reader_contents_
->size() <= last_flush_
);
215 size_t offset
= last_flush_
- reader_contents_
->size();
216 *reader_contents_
= Slice(
217 contents_
.data() + offset
,
218 contents_
.size() - offset
);
219 last_flush_
= contents_
.size();
224 virtual Status
Sync() override
{ return Status::OK(); }
225 virtual Status
Append(const Slice
& slice
) override
{
226 contents_
.append(slice
.data(), slice
.size());
229 void Drop(size_t bytes
) {
230 if (reader_contents_
!= nullptr) {
231 contents_
.resize(contents_
.size() - bytes
);
232 *reader_contents_
= Slice(
233 reader_contents_
->data(), reader_contents_
->size() - bytes
);
234 last_flush_
= contents_
.size();
239 Slice
* reader_contents_
;
243 // A wrapper around a StringSink to give it a RandomRWFile interface
244 class RandomRWStringSink
: public RandomRWFile
{
246 explicit RandomRWStringSink(StringSink
* ss
) : ss_(ss
) {}
248 Status
Write(uint64_t offset
, const Slice
& data
) {
249 if (offset
+ data
.size() > ss_
->contents_
.size()) {
250 ss_
->contents_
.resize(offset
+ data
.size(), '\0');
253 char* pos
= const_cast<char*>(ss_
->contents_
.data() + offset
);
254 memcpy(pos
, data
.data(), data
.size());
258 Status
Read(uint64_t offset
, size_t n
, Slice
* result
, char* scratch
) const {
259 *result
= Slice(nullptr, 0);
260 if (offset
< ss_
->contents_
.size()) {
262 std::min(static_cast<size_t>(ss_
->contents_
.size() - offset
), n
);
263 *result
= Slice(ss_
->contents_
.data() + offset
, str_res_sz
);
268 Status
Flush() { return Status::OK(); }
270 Status
Sync() { return Status::OK(); }
272 Status
Close() { return Status::OK(); }
274 const std::string
& contents() const { return ss_
->contents(); }
280 // Like StringSink, this writes into a string. Unlink StringSink, it
281 // has some initial content and overwrites it, just like a recycled
283 class OverwritingStringSink
: public WritableFile
{
285 explicit OverwritingStringSink(Slice
* reader_contents
)
288 reader_contents_(reader_contents
),
291 const std::string
& contents() const { return contents_
; }
293 virtual Status
Truncate(uint64_t size
) override
{
294 contents_
.resize(static_cast<size_t>(size
));
297 virtual Status
Close() override
{ return Status::OK(); }
298 virtual Status
Flush() override
{
299 if (last_flush_
< contents_
.size()) {
300 assert(reader_contents_
->size() >= contents_
.size());
301 memcpy((char*)reader_contents_
->data() + last_flush_
,
302 contents_
.data() + last_flush_
, contents_
.size() - last_flush_
);
303 last_flush_
= contents_
.size();
307 virtual Status
Sync() override
{ return Status::OK(); }
308 virtual Status
Append(const Slice
& slice
) override
{
309 contents_
.append(slice
.data(), slice
.size());
312 void Drop(size_t bytes
) {
313 contents_
.resize(contents_
.size() - bytes
);
314 if (last_flush_
> contents_
.size()) last_flush_
= contents_
.size();
318 std::string contents_
;
319 Slice
* reader_contents_
;
323 class StringSource
: public RandomAccessFile
{
325 explicit StringSource(const Slice
& contents
, uint64_t uniq_id
= 0,
327 : contents_(contents
.data(), contents
.size()),
332 virtual ~StringSource() { }
334 uint64_t Size() const { return contents_
.size(); }
336 virtual Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
337 char* scratch
) const override
{
339 if (offset
> contents_
.size()) {
340 return Status::InvalidArgument("invalid Read offset");
342 if (offset
+ n
> contents_
.size()) {
343 n
= contents_
.size() - static_cast<size_t>(offset
);
346 memcpy(scratch
, &contents_
[static_cast<size_t>(offset
)], n
);
347 *result
= Slice(scratch
, n
);
349 *result
= Slice(&contents_
[static_cast<size_t>(offset
)], n
);
354 virtual size_t GetUniqueId(char* id
, size_t max_size
) const override
{
360 rid
= EncodeVarint64(rid
, uniq_id_
);
361 rid
= EncodeVarint64(rid
, 0);
362 return static_cast<size_t>(rid
-id
);
365 int total_reads() const { return total_reads_
; }
367 void set_total_reads(int tr
) { total_reads_
= tr
; }
370 std::string contents_
;
373 mutable int total_reads_
;
376 class NullLogger
: public Logger
{
379 virtual void Logv(const char* format
, va_list ap
) override
{}
380 virtual size_t GetLogFileSize() const override
{ return 0; }
383 // Corrupts key by changing the type
384 extern void CorruptKeyType(InternalKey
* ikey
);
386 extern std::string
KeyStr(const std::string
& user_key
,
387 const SequenceNumber
& seq
, const ValueType
& t
,
388 bool corrupt
= false);
390 class SleepingBackgroundTask
{
392 SleepingBackgroundTask()
395 done_with_sleep_(false),
399 MutexLock
l(&mutex_
);
403 MutexLock
l(&mutex_
);
406 while (should_sleep_
) {
410 done_with_sleep_
= true;
413 void WaitUntilSleeping() {
414 MutexLock
l(&mutex_
);
415 while (!sleeping_
|| !should_sleep_
) {
420 MutexLock
l(&mutex_
);
421 should_sleep_
= false;
424 void WaitUntilDone() {
425 MutexLock
l(&mutex_
);
426 while (!done_with_sleep_
) {
431 MutexLock
l(&mutex_
);
432 return should_sleep_
== false;
436 MutexLock
l(&mutex_
);
437 should_sleep_
= true;
438 done_with_sleep_
= false;
441 static void DoSleepTask(void* arg
) {
442 reinterpret_cast<SleepingBackgroundTask
*>(arg
)->DoSleep();
447 port::CondVar bg_cv_
; // Signalled when background work finishes
449 bool done_with_sleep_
;
453 // Filters merge operands and values that are equal to `num`.
454 class FilterNumber
: public CompactionFilter
{
456 explicit FilterNumber(uint64_t num
) : num_(num
) {}
458 std::string
last_merge_operand_key() { return last_merge_operand_key_
; }
460 bool Filter(int level
, const rocksdb::Slice
& key
, const rocksdb::Slice
& value
,
461 std::string
* new_value
, bool* value_changed
) const override
{
462 if (value
.size() == sizeof(uint64_t)) {
463 return num_
== DecodeFixed64(value
.data());
468 bool FilterMergeOperand(int level
, const rocksdb::Slice
& key
,
469 const rocksdb::Slice
& value
) const override
{
470 last_merge_operand_key_
= key
.ToString();
471 if (value
.size() == sizeof(uint64_t)) {
472 return num_
== DecodeFixed64(value
.data());
477 const char* Name() const override
{ return "FilterBadMergeOperand"; }
480 mutable std::string last_merge_operand_key_
;
484 inline std::string
EncodeInt(uint64_t x
) {
486 PutFixed64(&result
, x
);
490 class StringEnv
: public EnvWrapper
{
492 class SeqStringSource
: public SequentialFile
{
494 explicit SeqStringSource(const std::string
& data
)
495 : data_(data
), offset_(0) {}
496 ~SeqStringSource() {}
497 Status
Read(size_t n
, Slice
* result
, char* scratch
) override
{
499 if (offset_
< data_
.size()) {
500 n
= std::min(data_
.size() - offset_
, n
);
501 memcpy(scratch
, data_
.data() + offset_
, n
);
503 *result
= Slice(scratch
, n
);
505 return Status::InvalidArgument(
506 "Attemp to read when it already reached eof.");
510 Status
Skip(uint64_t n
) override
{
511 if (offset_
>= data_
.size()) {
512 return Status::InvalidArgument(
513 "Attemp to read when it already reached eof.");
515 // TODO(yhchiang): Currently doesn't handle the overflow case.
525 class StringSink
: public WritableFile
{
527 explicit StringSink(std::string
* contents
)
528 : WritableFile(), contents_(contents
) {}
529 virtual Status
Truncate(uint64_t size
) override
{
530 contents_
->resize(size
);
533 virtual Status
Close() override
{ return Status::OK(); }
534 virtual Status
Flush() override
{ return Status::OK(); }
535 virtual Status
Sync() override
{ return Status::OK(); }
536 virtual Status
Append(const Slice
& slice
) override
{
537 contents_
->append(slice
.data(), slice
.size());
542 std::string
* contents_
;
545 explicit StringEnv(Env
* t
) : EnvWrapper(t
) {}
546 virtual ~StringEnv() {}
548 const std::string
& GetContent(const std::string
& f
) { return files_
[f
]; }
550 const Status
WriteToNewFile(const std::string
& file_name
,
551 const std::string
& content
) {
552 unique_ptr
<WritableFile
> r
;
553 auto s
= NewWritableFile(file_name
, &r
, EnvOptions());
560 assert(files_
[file_name
] == content
);
564 // The following text is boilerplate that forwards all methods to target()
565 Status
NewSequentialFile(const std::string
& f
, unique_ptr
<SequentialFile
>* r
,
566 const EnvOptions
& options
) override
{
567 auto iter
= files_
.find(f
);
568 if (iter
== files_
.end()) {
569 return Status::NotFound("The specified file does not exist", f
);
571 r
->reset(new SeqStringSource(iter
->second
));
574 Status
NewRandomAccessFile(const std::string
& f
,
575 unique_ptr
<RandomAccessFile
>* r
,
576 const EnvOptions
& options
) override
{
577 return Status::NotSupported();
579 Status
NewWritableFile(const std::string
& f
, unique_ptr
<WritableFile
>* r
,
580 const EnvOptions
& options
) override
{
581 auto iter
= files_
.find(f
);
582 if (iter
!= files_
.end()) {
583 return Status::IOError("The specified file already exists", f
);
585 r
->reset(new StringSink(&files_
[f
]));
588 virtual Status
NewDirectory(const std::string
& name
,
589 unique_ptr
<Directory
>* result
) override
{
590 return Status::NotSupported();
592 Status
FileExists(const std::string
& f
) override
{
593 if (files_
.find(f
) == files_
.end()) {
594 return Status::NotFound();
598 Status
GetChildren(const std::string
& dir
,
599 std::vector
<std::string
>* r
) override
{
600 return Status::NotSupported();
602 Status
DeleteFile(const std::string
& f
) override
{
606 Status
CreateDir(const std::string
& d
) override
{
607 return Status::NotSupported();
609 Status
CreateDirIfMissing(const std::string
& d
) override
{
610 return Status::NotSupported();
612 Status
DeleteDir(const std::string
& d
) override
{
613 return Status::NotSupported();
615 Status
GetFileSize(const std::string
& f
, uint64_t* s
) override
{
616 auto iter
= files_
.find(f
);
617 if (iter
== files_
.end()) {
618 return Status::NotFound("The specified file does not exist:", f
);
620 *s
= iter
->second
.size();
624 Status
GetFileModificationTime(const std::string
& fname
,
625 uint64_t* file_mtime
) override
{
626 return Status::NotSupported();
629 Status
RenameFile(const std::string
& s
, const std::string
& t
) override
{
630 return Status::NotSupported();
633 Status
LinkFile(const std::string
& s
, const std::string
& t
) override
{
634 return Status::NotSupported();
637 Status
LockFile(const std::string
& f
, FileLock
** l
) override
{
638 return Status::NotSupported();
641 Status
UnlockFile(FileLock
* l
) override
{ return Status::NotSupported(); }
644 std::unordered_map
<std::string
, std::string
> files_
;
647 // Randomly initialize the given DBOptions
648 void RandomInitDBOptions(DBOptions
* db_opt
, Random
* rnd
);
650 // Randomly initialize the given ColumnFamilyOptions
651 // Note that the caller is responsible for releasing non-null
652 // cf_opt->compaction_filter.
653 void RandomInitCFOptions(ColumnFamilyOptions
* cf_opt
, Random
* rnd
);
655 // A dummy merge operator which can change its name
656 class ChanglingMergeOperator
: public MergeOperator
{
658 explicit ChanglingMergeOperator(const std::string
& name
)
659 : name_(name
+ "MergeOperator") {}
660 ~ChanglingMergeOperator() {}
662 void SetName(const std::string
& name
) { name_
= name
; }
664 virtual bool FullMergeV2(const MergeOperationInput
& merge_in
,
665 MergeOperationOutput
* merge_out
) const override
{
668 virtual bool PartialMergeMulti(const Slice
& key
,
669 const std::deque
<Slice
>& operand_list
,
670 std::string
* new_value
,
671 Logger
* logger
) const override
{
674 virtual const char* Name() const override
{ return name_
.c_str(); }
680 // Returns a dummy merge operator with random name.
681 MergeOperator
* RandomMergeOperator(Random
* rnd
);
683 // A dummy compaction filter which can change its name
684 class ChanglingCompactionFilter
: public CompactionFilter
{
686 explicit ChanglingCompactionFilter(const std::string
& name
)
687 : name_(name
+ "CompactionFilter") {}
688 ~ChanglingCompactionFilter() {}
690 void SetName(const std::string
& name
) { name_
= name
; }
692 bool Filter(int level
, const Slice
& key
, const Slice
& existing_value
,
693 std::string
* new_value
, bool* value_changed
) const override
{
697 const char* Name() const override
{ return name_
.c_str(); }
703 // Returns a dummy compaction filter with a random name.
704 CompactionFilter
* RandomCompactionFilter(Random
* rnd
);
706 // A dummy compaction filter factory which can change its name
707 class ChanglingCompactionFilterFactory
: public CompactionFilterFactory
{
709 explicit ChanglingCompactionFilterFactory(const std::string
& name
)
710 : name_(name
+ "CompactionFilterFactory") {}
711 ~ChanglingCompactionFilterFactory() {}
713 void SetName(const std::string
& name
) { name_
= name
; }
715 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
716 const CompactionFilter::Context
& context
) override
{
717 return std::unique_ptr
<CompactionFilter
>();
720 // Returns a name that identifies this compaction filter factory.
721 const char* Name() const override
{ return name_
.c_str(); }
727 CompressionType
RandomCompressionType(Random
* rnd
);
729 void RandomCompressionTypeVector(const size_t count
,
730 std::vector
<CompressionType
>* types
,
733 CompactionFilterFactory
* RandomCompactionFilterFactory(Random
* rnd
);
735 const SliceTransform
* RandomSliceTransform(Random
* rnd
, int pre_defined
= -1);
737 TableFactory
* RandomTableFactory(Random
* rnd
, int pre_defined
= -1);
739 std::string
RandomName(Random
* rnd
, const size_t len
);
741 Status
DestroyDir(Env
* env
, const std::string
& dir
);
744 } // namespace rocksdb