1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
16 #include "rocksdb/compaction_filter.h"
17 #include "rocksdb/env.h"
18 #include "rocksdb/iterator.h"
19 #include "rocksdb/merge_operator.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/slice.h"
22 #include "rocksdb/table.h"
23 #include "table/block_based_table_factory.h"
24 #include "table/internal_iterator.h"
25 #include "table/plain_table_factory.h"
26 #include "util/mutexlock.h"
27 #include "util/random.h"
31 class SequentialFileReader
;
35 extern const uint32_t kDefaultFormatVersion
;
36 extern const uint32_t kLatestFormatVersion
;
38 // Store in *dst a random string of length "len" and return a Slice that
39 // references the generated data.
40 extern Slice
RandomString(Random
* rnd
, int len
, std::string
* dst
);
42 extern std::string
RandomHumanReadableString(Random
* rnd
, int len
);
44 // Return a random key with the specified length that may contain interesting
45 // characters (e.g. \x00, \xff, etc.).
46 enum RandomKeyType
: char { RANDOM
, LARGEST
, SMALLEST
, MIDDLE
};
47 extern std::string
RandomKey(Random
* rnd
, int len
,
48 RandomKeyType type
= RandomKeyType::RANDOM
);
50 // Store in *dst a string of length "len" that will compress to
51 // "N*compressed_fraction" bytes and return a Slice that references
52 // the generated data.
53 extern Slice
CompressibleString(Random
* rnd
, double compressed_fraction
,
54 int len
, std::string
* dst
);
56 // A wrapper that allows injection of errors.
57 class ErrorEnv
: public EnvWrapper
{
59 bool writable_file_error_
;
60 int num_writable_file_errors_
;
62 ErrorEnv() : EnvWrapper(Env::Default()),
63 writable_file_error_(false),
64 num_writable_file_errors_(0) { }
66 virtual Status
NewWritableFile(const std::string
& fname
,
67 std::unique_ptr
<WritableFile
>* result
,
68 const EnvOptions
& soptions
) override
{
70 if (writable_file_error_
) {
71 ++num_writable_file_errors_
;
72 return Status::IOError(fname
, "fake error");
74 return target()->NewWritableFile(fname
, result
, soptions
);
79 // An internal comparator that just forward comparing results from the
80 // user comparator in it. Can be used to test entities that have no dependency
81 // on internal key structure but consumes InternalKeyComparator, like
83 class PlainInternalKeyComparator
: public InternalKeyComparator
{
85 explicit PlainInternalKeyComparator(const Comparator
* c
)
86 : InternalKeyComparator(c
) {}
88 virtual ~PlainInternalKeyComparator() {}
90 virtual int Compare(const Slice
& a
, const Slice
& b
) const override
{
91 return user_comparator()->Compare(a
, b
);
96 // A test comparator which compare two strings in this way:
97 // (1) first compare prefix of 8 bytes in alphabet order,
98 // (2) if two strings share the same prefix, sort the other part of the string
99 // in the reverse alphabet order.
100 // This helps simulate the case of compounded key of [entity][timestamp] and
101 // latest timestamp first.
102 class SimpleSuffixReverseComparator
: public Comparator
{
104 SimpleSuffixReverseComparator() {}
106 virtual const char* Name() const override
{
107 return "SimpleSuffixReverseComparator";
110 virtual int Compare(const Slice
& a
, const Slice
& b
) const override
{
111 Slice prefix_a
= Slice(a
.data(), 8);
112 Slice prefix_b
= Slice(b
.data(), 8);
113 int prefix_comp
= prefix_a
.compare(prefix_b
);
114 if (prefix_comp
!= 0) {
117 Slice suffix_a
= Slice(a
.data() + 8, a
.size() - 8);
118 Slice suffix_b
= Slice(b
.data() + 8, b
.size() - 8);
119 return -(suffix_a
.compare(suffix_b
));
122 virtual void FindShortestSeparator(std::string
* /*start*/,
123 const Slice
& /*limit*/) const override
{}
125 virtual void FindShortSuccessor(std::string
* /*key*/) const override
{}
128 // Returns a user key comparator that can be used for comparing two uint64_t
129 // slices. Instead of comparing slices byte-wise, it compares all the 8 bytes
130 // at once. Assumes same endian-ness is used though the database's lifetime.
131 // Symantics of comparison would differ from Bytewise comparator in little
133 extern const Comparator
* Uint64Comparator();
135 // Iterator over a vector of keys/values
136 class VectorIterator
: public InternalIterator
{
138 explicit VectorIterator(const std::vector
<std::string
>& keys
)
139 : keys_(keys
), current_(keys
.size()) {
140 std::sort(keys_
.begin(), keys_
.end());
141 values_
.resize(keys
.size());
144 VectorIterator(const std::vector
<std::string
>& keys
,
145 const std::vector
<std::string
>& values
)
146 : keys_(keys
), values_(values
), current_(keys
.size()) {
147 assert(keys_
.size() == values_
.size());
150 virtual bool Valid() const override
{ return current_
< keys_
.size(); }
152 virtual void SeekToFirst() override
{ current_
= 0; }
153 virtual void SeekToLast() override
{ current_
= keys_
.size() - 1; }
155 virtual void Seek(const Slice
& target
) override
{
156 current_
= std::lower_bound(keys_
.begin(), keys_
.end(), target
.ToString()) -
160 virtual void SeekForPrev(const Slice
& target
) override
{
161 current_
= std::upper_bound(keys_
.begin(), keys_
.end(), target
.ToString()) -
170 virtual void Next() override
{ current_
++; }
171 virtual void Prev() override
{ current_
--; }
173 virtual Slice
key() const override
{ return Slice(keys_
[current_
]); }
174 virtual Slice
value() const override
{ return Slice(values_
[current_
]); }
176 virtual Status
status() const override
{ return Status::OK(); }
178 virtual bool IsKeyPinned() const override
{ return true; }
179 virtual bool IsValuePinned() const override
{ return true; }
182 std::vector
<std::string
> keys_
;
183 std::vector
<std::string
> values_
;
186 extern WritableFileWriter
* GetWritableFileWriter(WritableFile
* wf
,
187 const std::string
& fname
);
189 extern RandomAccessFileReader
* GetRandomAccessFileReader(RandomAccessFile
* raf
);
191 extern SequentialFileReader
* GetSequentialFileReader(SequentialFile
* se
,
192 const std::string
& fname
);
194 class StringSink
: public WritableFile
{
196 std::string contents_
;
198 explicit StringSink(Slice
* reader_contents
= nullptr) :
201 reader_contents_(reader_contents
),
203 if (reader_contents_
!= nullptr) {
204 *reader_contents_
= Slice(contents_
.data(), 0);
208 const std::string
& contents() const { return contents_
; }
210 virtual Status
Truncate(uint64_t size
) override
{
211 contents_
.resize(static_cast<size_t>(size
));
214 virtual Status
Close() override
{ return Status::OK(); }
215 virtual Status
Flush() override
{
216 if (reader_contents_
!= nullptr) {
217 assert(reader_contents_
->size() <= last_flush_
);
218 size_t offset
= last_flush_
- reader_contents_
->size();
219 *reader_contents_
= Slice(
220 contents_
.data() + offset
,
221 contents_
.size() - offset
);
222 last_flush_
= contents_
.size();
227 virtual Status
Sync() override
{ return Status::OK(); }
228 virtual Status
Append(const Slice
& slice
) override
{
229 contents_
.append(slice
.data(), slice
.size());
232 void Drop(size_t bytes
) {
233 if (reader_contents_
!= nullptr) {
234 contents_
.resize(contents_
.size() - bytes
);
235 *reader_contents_
= Slice(
236 reader_contents_
->data(), reader_contents_
->size() - bytes
);
237 last_flush_
= contents_
.size();
242 Slice
* reader_contents_
;
246 // A wrapper around a StringSink to give it a RandomRWFile interface
247 class RandomRWStringSink
: public RandomRWFile
{
249 explicit RandomRWStringSink(StringSink
* ss
) : ss_(ss
) {}
251 Status
Write(uint64_t offset
, const Slice
& data
) override
{
252 if (offset
+ data
.size() > ss_
->contents_
.size()) {
253 ss_
->contents_
.resize(static_cast<size_t>(offset
) + data
.size(), '\0');
256 char* pos
= const_cast<char*>(ss_
->contents_
.data() + offset
);
257 memcpy(pos
, data
.data(), data
.size());
261 Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
262 char* /*scratch*/) const override
{
263 *result
= Slice(nullptr, 0);
264 if (offset
< ss_
->contents_
.size()) {
266 std::min(static_cast<size_t>(ss_
->contents_
.size() - offset
), n
);
267 *result
= Slice(ss_
->contents_
.data() + offset
, str_res_sz
);
272 Status
Flush() override
{ return Status::OK(); }
274 Status
Sync() override
{ return Status::OK(); }
276 Status
Close() override
{ return Status::OK(); }
278 const std::string
& contents() const { return ss_
->contents(); }
284 // Like StringSink, this writes into a string. Unlink StringSink, it
285 // has some initial content and overwrites it, just like a recycled
287 class OverwritingStringSink
: public WritableFile
{
289 explicit OverwritingStringSink(Slice
* reader_contents
)
292 reader_contents_(reader_contents
),
295 const std::string
& contents() const { return contents_
; }
297 virtual Status
Truncate(uint64_t size
) override
{
298 contents_
.resize(static_cast<size_t>(size
));
301 virtual Status
Close() override
{ return Status::OK(); }
302 virtual Status
Flush() override
{
303 if (last_flush_
< contents_
.size()) {
304 assert(reader_contents_
->size() >= contents_
.size());
305 memcpy((char*)reader_contents_
->data() + last_flush_
,
306 contents_
.data() + last_flush_
, contents_
.size() - last_flush_
);
307 last_flush_
= contents_
.size();
311 virtual Status
Sync() override
{ return Status::OK(); }
312 virtual Status
Append(const Slice
& slice
) override
{
313 contents_
.append(slice
.data(), slice
.size());
316 void Drop(size_t bytes
) {
317 contents_
.resize(contents_
.size() - bytes
);
318 if (last_flush_
> contents_
.size()) last_flush_
= contents_
.size();
322 std::string contents_
;
323 Slice
* reader_contents_
;
327 class StringSource
: public RandomAccessFile
{
329 explicit StringSource(const Slice
& contents
, uint64_t uniq_id
= 0,
331 : contents_(contents
.data(), contents
.size()),
336 virtual ~StringSource() { }
338 uint64_t Size() const { return contents_
.size(); }
340 virtual Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
341 char* scratch
) const override
{
343 if (offset
> contents_
.size()) {
344 return Status::InvalidArgument("invalid Read offset");
346 if (offset
+ n
> contents_
.size()) {
347 n
= contents_
.size() - static_cast<size_t>(offset
);
350 memcpy(scratch
, &contents_
[static_cast<size_t>(offset
)], n
);
351 *result
= Slice(scratch
, n
);
353 *result
= Slice(&contents_
[static_cast<size_t>(offset
)], n
);
358 virtual size_t GetUniqueId(char* id
, size_t max_size
) const override
{
364 rid
= EncodeVarint64(rid
, uniq_id_
);
365 rid
= EncodeVarint64(rid
, 0);
366 return static_cast<size_t>(rid
-id
);
369 int total_reads() const { return total_reads_
; }
371 void set_total_reads(int tr
) { total_reads_
= tr
; }
374 std::string contents_
;
377 mutable int total_reads_
;
380 class NullLogger
: public Logger
{
383 virtual void Logv(const char* /*format*/, va_list /*ap*/) override
{}
384 virtual size_t GetLogFileSize() const override
{ return 0; }
387 // Corrupts key by changing the type
388 extern void CorruptKeyType(InternalKey
* ikey
);
390 extern std::string
KeyStr(const std::string
& user_key
,
391 const SequenceNumber
& seq
, const ValueType
& t
,
392 bool corrupt
= false);
394 class SleepingBackgroundTask
{
396 SleepingBackgroundTask()
399 done_with_sleep_(false),
403 MutexLock
l(&mutex_
);
407 MutexLock
l(&mutex_
);
410 while (should_sleep_
) {
414 done_with_sleep_
= true;
417 void WaitUntilSleeping() {
418 MutexLock
l(&mutex_
);
419 while (!sleeping_
|| !should_sleep_
) {
424 MutexLock
l(&mutex_
);
425 should_sleep_
= false;
428 void WaitUntilDone() {
429 MutexLock
l(&mutex_
);
430 while (!done_with_sleep_
) {
435 MutexLock
l(&mutex_
);
436 return should_sleep_
== false;
440 MutexLock
l(&mutex_
);
441 should_sleep_
= true;
442 done_with_sleep_
= false;
445 static void DoSleepTask(void* arg
) {
446 reinterpret_cast<SleepingBackgroundTask
*>(arg
)->DoSleep();
451 port::CondVar bg_cv_
; // Signalled when background work finishes
453 bool done_with_sleep_
;
457 // Filters merge operands and values that are equal to `num`.
458 class FilterNumber
: public CompactionFilter
{
460 explicit FilterNumber(uint64_t num
) : num_(num
) {}
462 std::string
last_merge_operand_key() { return last_merge_operand_key_
; }
464 bool Filter(int /*level*/, const rocksdb::Slice
& /*key*/,
465 const rocksdb::Slice
& value
, std::string
* /*new_value*/,
466 bool* /*value_changed*/) const override
{
467 if (value
.size() == sizeof(uint64_t)) {
468 return num_
== DecodeFixed64(value
.data());
473 bool FilterMergeOperand(int /*level*/, const rocksdb::Slice
& key
,
474 const rocksdb::Slice
& value
) const override
{
475 last_merge_operand_key_
= key
.ToString();
476 if (value
.size() == sizeof(uint64_t)) {
477 return num_
== DecodeFixed64(value
.data());
482 const char* Name() const override
{ return "FilterBadMergeOperand"; }
485 mutable std::string last_merge_operand_key_
;
489 inline std::string
EncodeInt(uint64_t x
) {
491 PutFixed64(&result
, x
);
495 class StringEnv
: public EnvWrapper
{
497 class SeqStringSource
: public SequentialFile
{
499 explicit SeqStringSource(const std::string
& data
)
500 : data_(data
), offset_(0) {}
501 ~SeqStringSource() {}
502 Status
Read(size_t n
, Slice
* result
, char* scratch
) override
{
504 if (offset_
< data_
.size()) {
505 n
= std::min(data_
.size() - offset_
, n
);
506 memcpy(scratch
, data_
.data() + offset_
, n
);
508 *result
= Slice(scratch
, n
);
510 return Status::InvalidArgument(
511 "Attemp to read when it already reached eof.");
515 Status
Skip(uint64_t n
) override
{
516 if (offset_
>= data_
.size()) {
517 return Status::InvalidArgument(
518 "Attemp to read when it already reached eof.");
520 // TODO(yhchiang): Currently doesn't handle the overflow case.
521 offset_
+= static_cast<size_t>(n
);
530 class StringSink
: public WritableFile
{
532 explicit StringSink(std::string
* contents
)
533 : WritableFile(), contents_(contents
) {}
534 virtual Status
Truncate(uint64_t size
) override
{
535 contents_
->resize(static_cast<size_t>(size
));
538 virtual Status
Close() override
{ return Status::OK(); }
539 virtual Status
Flush() override
{ return Status::OK(); }
540 virtual Status
Sync() override
{ return Status::OK(); }
541 virtual Status
Append(const Slice
& slice
) override
{
542 contents_
->append(slice
.data(), slice
.size());
547 std::string
* contents_
;
550 explicit StringEnv(Env
* t
) : EnvWrapper(t
) {}
551 virtual ~StringEnv() {}
553 const std::string
& GetContent(const std::string
& f
) { return files_
[f
]; }
555 const Status
WriteToNewFile(const std::string
& file_name
,
556 const std::string
& content
) {
557 std::unique_ptr
<WritableFile
> r
;
558 auto s
= NewWritableFile(file_name
, &r
, EnvOptions());
565 assert(files_
[file_name
] == content
);
569 // The following text is boilerplate that forwards all methods to target()
570 Status
NewSequentialFile(const std::string
& f
,
571 std::unique_ptr
<SequentialFile
>* r
,
572 const EnvOptions
& /*options*/) override
{
573 auto iter
= files_
.find(f
);
574 if (iter
== files_
.end()) {
575 return Status::NotFound("The specified file does not exist", f
);
577 r
->reset(new SeqStringSource(iter
->second
));
580 Status
NewRandomAccessFile(const std::string
& /*f*/,
581 std::unique_ptr
<RandomAccessFile
>* /*r*/,
582 const EnvOptions
& /*options*/) override
{
583 return Status::NotSupported();
585 Status
NewWritableFile(const std::string
& f
, std::unique_ptr
<WritableFile
>* r
,
586 const EnvOptions
& /*options*/) override
{
587 auto iter
= files_
.find(f
);
588 if (iter
!= files_
.end()) {
589 return Status::IOError("The specified file already exists", f
);
591 r
->reset(new StringSink(&files_
[f
]));
594 virtual Status
NewDirectory(const std::string
& /*name*/,
595 std::unique_ptr
<Directory
>* /*result*/) override
{
596 return Status::NotSupported();
598 Status
FileExists(const std::string
& f
) override
{
599 if (files_
.find(f
) == files_
.end()) {
600 return Status::NotFound();
604 Status
GetChildren(const std::string
& /*dir*/,
605 std::vector
<std::string
>* /*r*/) override
{
606 return Status::NotSupported();
608 Status
DeleteFile(const std::string
& f
) override
{
612 Status
CreateDir(const std::string
& /*d*/) override
{
613 return Status::NotSupported();
615 Status
CreateDirIfMissing(const std::string
& /*d*/) override
{
616 return Status::NotSupported();
618 Status
DeleteDir(const std::string
& /*d*/) override
{
619 return Status::NotSupported();
621 Status
GetFileSize(const std::string
& f
, uint64_t* s
) override
{
622 auto iter
= files_
.find(f
);
623 if (iter
== files_
.end()) {
624 return Status::NotFound("The specified file does not exist:", f
);
626 *s
= iter
->second
.size();
630 Status
GetFileModificationTime(const std::string
& /*fname*/,
631 uint64_t* /*file_mtime*/) override
{
632 return Status::NotSupported();
635 Status
RenameFile(const std::string
& /*s*/,
636 const std::string
& /*t*/) override
{
637 return Status::NotSupported();
640 Status
LinkFile(const std::string
& /*s*/, const std::string
& /*t*/) override
{
641 return Status::NotSupported();
644 Status
LockFile(const std::string
& /*f*/, FileLock
** /*l*/) override
{
645 return Status::NotSupported();
648 Status
UnlockFile(FileLock
* /*l*/) override
{ return Status::NotSupported(); }
651 std::unordered_map
<std::string
, std::string
> files_
;
654 // Randomly initialize the given DBOptions
655 void RandomInitDBOptions(DBOptions
* db_opt
, Random
* rnd
);
657 // Randomly initialize the given ColumnFamilyOptions
658 // Note that the caller is responsible for releasing non-null
659 // cf_opt->compaction_filter.
660 void RandomInitCFOptions(ColumnFamilyOptions
* cf_opt
, Random
* rnd
);
662 // A dummy merge operator which can change its name
663 class ChanglingMergeOperator
: public MergeOperator
{
665 explicit ChanglingMergeOperator(const std::string
& name
)
666 : name_(name
+ "MergeOperator") {}
667 ~ChanglingMergeOperator() {}
669 void SetName(const std::string
& name
) { name_
= name
; }
671 virtual bool FullMergeV2(const MergeOperationInput
& /*merge_in*/,
672 MergeOperationOutput
* /*merge_out*/) const override
{
675 virtual bool PartialMergeMulti(const Slice
& /*key*/,
676 const std::deque
<Slice
>& /*operand_list*/,
677 std::string
* /*new_value*/,
678 Logger
* /*logger*/) const override
{
681 virtual const char* Name() const override
{ return name_
.c_str(); }
687 // Returns a dummy merge operator with random name.
688 MergeOperator
* RandomMergeOperator(Random
* rnd
);
690 // A dummy compaction filter which can change its name
691 class ChanglingCompactionFilter
: public CompactionFilter
{
693 explicit ChanglingCompactionFilter(const std::string
& name
)
694 : name_(name
+ "CompactionFilter") {}
695 ~ChanglingCompactionFilter() {}
697 void SetName(const std::string
& name
) { name_
= name
; }
699 bool Filter(int /*level*/, const Slice
& /*key*/,
700 const Slice
& /*existing_value*/, std::string
* /*new_value*/,
701 bool* /*value_changed*/) const override
{
705 const char* Name() const override
{ return name_
.c_str(); }
711 // Returns a dummy compaction filter with a random name.
712 CompactionFilter
* RandomCompactionFilter(Random
* rnd
);
714 // A dummy compaction filter factory which can change its name
715 class ChanglingCompactionFilterFactory
: public CompactionFilterFactory
{
717 explicit ChanglingCompactionFilterFactory(const std::string
& name
)
718 : name_(name
+ "CompactionFilterFactory") {}
719 ~ChanglingCompactionFilterFactory() {}
721 void SetName(const std::string
& name
) { name_
= name
; }
723 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
724 const CompactionFilter::Context
& /*context*/) override
{
725 return std::unique_ptr
<CompactionFilter
>();
728 // Returns a name that identifies this compaction filter factory.
729 const char* Name() const override
{ return name_
.c_str(); }
735 CompressionType
RandomCompressionType(Random
* rnd
);
737 void RandomCompressionTypeVector(const size_t count
,
738 std::vector
<CompressionType
>* types
,
741 CompactionFilterFactory
* RandomCompactionFilterFactory(Random
* rnd
);
743 const SliceTransform
* RandomSliceTransform(Random
* rnd
, int pre_defined
= -1);
745 TableFactory
* RandomTableFactory(Random
* rnd
, int pre_defined
= -1);
747 std::string
RandomName(Random
* rnd
, const size_t len
);
749 Status
DestroyDir(Env
* env
, const std::string
& dir
);
751 bool IsDirectIOSupported(Env
* env
, const std::string
& dir
);
754 } // namespace rocksdb