// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/write_batch_with_index.h"
-#include <limits>
#include <memory>
#include "db/column_family.h"
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "util/arena.h"
+#include "util/cast_util.h"
+#include "util/string_util.h"
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
namespace rocksdb {
void Next() override {
if (!Valid()) {
status_ = Status::NotSupported("Next() on invalid iterator");
+ return;
}
if (!forward_) {
void Prev() override {
if (!Valid()) {
status_ = Status::NotSupported("Prev() on invalid iterator");
+ return;
}
if (forward_) {
private:
void AssertInvariants() {
#ifndef NDEBUG
+ bool not_ok = false;
+ if (!base_iterator_->status().ok()) {
+ assert(!base_iterator_->Valid());
+ not_ok = true;
+ }
+ if (!delta_iterator_->status().ok()) {
+ assert(!delta_iterator_->Valid());
+ not_ok = true;
+ }
+ if (not_ok) {
+ assert(!Valid());
+ assert(!status().ok());
+ return;
+ }
+
if (!Valid()) {
return;
}
void UpdateCurrent() {
// Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__
+ status_ = Status::OK();
while (true) {
WriteEntry delta_entry;
if (DeltaValid()) {
+ assert(delta_iterator_->status().ok());
delta_entry = delta_iterator_->Entry();
+ } else if (!delta_iterator_->status().ok()) {
+ // Expose the error status and stop.
+ current_at_base_ = false;
+ return;
}
equal_keys_ = false;
if (!BaseValid()) {
+ if (!base_iterator_->status().ok()) {
+ // Expose the error status and stop.
+ current_at_base_ = true;
+ return;
+ }
+
// Base has finished.
if (!DeltaValid()) {
// Finished
}
virtual void SeekToFirst() override {
- WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
- column_family_id_, 0, 0);
+ WriteBatchIndexEntry search_entry(
+ nullptr /* search_key */, column_family_id_,
+ true /* is_forward_direction */, true /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
}
virtual void SeekToLast() override {
- WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
- column_family_id_ + 1, 0, 0);
+ WriteBatchIndexEntry search_entry(
+ nullptr /* search_key */, column_family_id_ + 1,
+ true /* is_forward_direction */, true /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
if (!skip_list_iter_.Valid()) {
skip_list_iter_.SeekToLast();
}
virtual void Seek(const Slice& key) override {
- WriteBatchIndexEntry search_entry(&key, column_family_id_);
+ WriteBatchIndexEntry search_entry(&key, column_family_id_,
+ true /* is_forward_direction */,
+ false /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
}
virtual void SeekForPrev(const Slice& key) override {
- WriteBatchIndexEntry search_entry(&key, column_family_id_);
+ WriteBatchIndexEntry search_entry(&key, column_family_id_,
+ false /* is_forward_direction */,
+ false /* is_seek_to_first */);
skip_list_iter_.SeekForPrev(&search_entry);
}
};
struct WriteBatchWithIndex::Rep {
- Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
- size_t max_bytes = 0, bool _overwrite_key = false)
+ explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
+ size_t max_bytes = 0, bool _overwrite_key = false)
: write_batch(reserved_bytes, max_bytes),
comparator(index_comparator, &write_batch),
skip_list(comparator, &arena),
overwrite_key(_overwrite_key),
- last_entry_offset(0) {}
+ last_entry_offset(0),
+ last_sub_batch_offset(0),
+ sub_batch_cnt(1) {}
ReadableWriteBatch write_batch;
WriteBatchEntryComparator comparator;
Arena arena;
WriteBatchEntrySkipList skip_list;
bool overwrite_key;
size_t last_entry_offset;
+ // The starting offset of the last sub-batch. A sub-batch starts right before
+ // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
+ // the default, means that no duplicate key is detected so far.
+ size_t last_sub_batch_offset;
+ // Total number of sub-batches in the write batch. Default is 1.
+ size_t sub_batch_cnt;
// Remember current offset of internal write batch, which is used as
// the starting offset of the next record.
}
WriteBatchIndexEntry* non_const_entry =
const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
+ if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
+ last_sub_batch_offset = last_entry_offset;
+ sub_batch_cnt++;
+ }
non_const_entry->offset = last_entry_offset;
return true;
}
wb_data.size() - last_entry_offset);
// Extract key
Slice key;
- bool success __attribute__((__unused__)) =
+ bool success __attribute__((__unused__));
+ success =
ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
assert(success);
- auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
- auto* index_entry =
- new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
- key.data() - wb_data.data(), key.size());
- skip_list.Insert(index_entry);
- }
-
- void WriteBatchWithIndex::Rep::Clear() {
- write_batch.Clear();
- ClearIndex();
- }
+ auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
+ auto* index_entry =
+ new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
+ key.data() - wb_data.data(), key.size());
+ skip_list.Insert(index_entry);
+}
- void WriteBatchWithIndex::Rep::ClearIndex() {
- skip_list.~WriteBatchEntrySkipList();
- arena.~Arena();
- new (&arena) Arena();
- new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
- last_entry_offset = 0;
- }
+void WriteBatchWithIndex::Rep::Clear() {
+ write_batch.Clear();
+ ClearIndex();
+}
- Status WriteBatchWithIndex::Rep::ReBuildIndex() {
- Status s;
+void WriteBatchWithIndex::Rep::ClearIndex() {
+ skip_list.~WriteBatchEntrySkipList();
+ arena.~Arena();
+ new (&arena) Arena();
+ new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
+ last_entry_offset = 0;
+ last_sub_batch_offset = 0;
+ sub_batch_cnt = 1;
+}
- ClearIndex();
+Status WriteBatchWithIndex::Rep::ReBuildIndex() {
+ Status s;
- if (write_batch.Count() == 0) {
- // Nothing to re-index
- return s;
- }
+ ClearIndex();
- size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
+ if (write_batch.Count() == 0) {
+ // Nothing to re-index
+ return s;
+ }
- Slice input(write_batch.Data());
- input.remove_prefix(offset);
+ size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
- // Loop through all entries in Rep and add each one to the index
- int found = 0;
- while (s.ok() && !input.empty()) {
- Slice key, value, blob, xid;
- uint32_t column_family_id = 0; // default
- char tag = 0;
+ Slice input(write_batch.Data());
+ input.remove_prefix(offset);
- // set offset of current entry for call to AddNewEntry()
- last_entry_offset = input.data() - write_batch.Data().data();
+ // Loop through all entries in Rep and add each one to the index
+ int found = 0;
+ while (s.ok() && !input.empty()) {
+ Slice key, value, blob, xid;
+ uint32_t column_family_id = 0; // default
+ char tag = 0;
- s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
- &value, &blob, &xid);
- if (!s.ok()) {
- break;
- }
+ // set offset of current entry for call to AddNewEntry()
+ last_entry_offset = input.data() - write_batch.Data().data();
- switch (tag) {
- case kTypeColumnFamilyValue:
- case kTypeValue:
- case kTypeColumnFamilyDeletion:
- case kTypeDeletion:
- case kTypeColumnFamilySingleDeletion:
- case kTypeSingleDeletion:
- case kTypeColumnFamilyMerge:
- case kTypeMerge:
- found++;
- if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
- AddNewEntry(column_family_id);
- }
- break;
- case kTypeLogData:
- case kTypeBeginPrepareXID:
- case kTypeEndPrepareXID:
- case kTypeCommitXID:
- case kTypeRollbackXID:
- case kTypeNoop:
- break;
- default:
- return Status::Corruption("unknown WriteBatch tag");
- }
+ s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
+ &value, &blob, &xid);
+ if (!s.ok()) {
+ break;
}
- if (s.ok() && found != write_batch.Count()) {
- s = Status::Corruption("WriteBatch has wrong count");
+ switch (tag) {
+ case kTypeColumnFamilyValue:
+ case kTypeValue:
+ case kTypeColumnFamilyDeletion:
+ case kTypeDeletion:
+ case kTypeColumnFamilySingleDeletion:
+ case kTypeSingleDeletion:
+ case kTypeColumnFamilyMerge:
+ case kTypeMerge:
+ found++;
+ if (!UpdateExistingEntryWithCfId(column_family_id, key)) {
+ AddNewEntry(column_family_id);
+ }
+ break;
+ case kTypeLogData:
+ case kTypeBeginPrepareXID:
+ case kTypeBeginPersistedPrepareXID:
+ case kTypeBeginUnprepareXID:
+ case kTypeEndPrepareXID:
+ case kTypeCommitXID:
+ case kTypeRollbackXID:
+ case kTypeNoop:
+ break;
+ default:
+ return Status::Corruption("unknown WriteBatch tag in ReBuildIndex",
+ ToString(static_cast<unsigned int>(tag)));
}
+ }
- return s;
+ if (s.ok() && found != write_batch.Count()) {
+ s = Status::Corruption("WriteBatch has wrong count");
}
- WriteBatchWithIndex::WriteBatchWithIndex(
- const Comparator* default_index_comparator, size_t reserved_bytes,
- bool overwrite_key, size_t max_bytes)
- : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
- overwrite_key)) {}
+ return s;
+}
+
+WriteBatchWithIndex::WriteBatchWithIndex(
+ const Comparator* default_index_comparator, size_t reserved_bytes,
+ bool overwrite_key, size_t max_bytes)
+ : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
+ overwrite_key)) {}
- WriteBatchWithIndex::~WriteBatchWithIndex() {}
+WriteBatchWithIndex::~WriteBatchWithIndex() {}
- WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
+WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
- WBWIIterator* WriteBatchWithIndex::NewIterator() {
- return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
+size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
+
+WBWIIterator* WriteBatchWithIndex::NewIterator() {
+ return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
}
WBWIIterator* WriteBatchWithIndex::NewIterator(
const ReadOptions& read_options,
const Slice& key,
std::string* value) {
+ assert(value != nullptr);
+ PinnableSlice pinnable_val(value);
+ assert(!pinnable_val.IsPinned());
+ auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
+ &pinnable_val);
+ if (s.ok() && pinnable_val.IsPinned()) {
+ value->assign(pinnable_val.data(), pinnable_val.size());
+ } // else value is already assigned
+ return s;
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
+ const ReadOptions& read_options,
+ const Slice& key,
+ PinnableSlice* pinnable_val) {
return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
- value);
+ pinnable_val);
}
Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
ColumnFamilyHandle* column_family,
const Slice& key,
std::string* value) {
+ assert(value != nullptr);
+ PinnableSlice pinnable_val(value);
+ assert(!pinnable_val.IsPinned());
+ auto s =
+ GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
+ if (s.ok() && pinnable_val.IsPinned()) {
+ value->assign(pinnable_val.data(), pinnable_val.size());
+ } // else value is already assigned
+ return s;
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
+ const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key,
+ PinnableSlice* pinnable_val) {
+ return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val,
+ nullptr);
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(
+ DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
Status s;
MergeContext merge_context;
const ImmutableDBOptions& immuable_db_options =
- reinterpret_cast<DBImpl*>(db)->immutable_db_options();
+ static_cast_with_check<DBImpl, DB>(db->GetRootDB())
+ ->immutable_db_options();
- std::string batch_value;
+ // Since the lifetime of the WriteBatch is the same as that of the transaction
+ // we cannot pin it as otherwise the returned value will not be available
+ // after the transaction finishes.
+ std::string& batch_value = *pinnable_val->GetSelf();
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::GetFromBatch(
immuable_db_options, this, column_family, key, &merge_context,
&rep->comparator, &batch_value, rep->overwrite_key, &s);
if (result == WriteBatchWithIndexInternal::Result::kFound) {
- value->assign(batch_value.data(), batch_value.size());
+ pinnable_val->PinSelf();
return s;
}
if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
result == WriteBatchWithIndexInternal::Result::kNotFound);
// Did not find key in batch OR could not resolve Merges. Try DB.
- s = db->Get(read_options, column_family, key, value);
+ if (!callback) {
+ s = db->Get(read_options, column_family, key, pinnable_val);
+ } else {
+ s = static_cast_with_check<DBImpl, DB>(db->GetRootDB())
+ ->GetImpl(read_options, column_family, key, pinnable_val, nullptr,
+ callback);
+ }
if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
Env* env = immuable_db_options.env;
Logger* logger = immuable_db_options.info_log.get();
- Slice db_slice(*value);
Slice* merge_data;
if (s.ok()) {
- merge_data = &db_slice;
+ merge_data = pinnable_val;
} else { // Key not present in db (s.IsNotFound())
merge_data = nullptr;
}
if (merge_operator) {
- s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data,
- merge_context.GetOperands(), value,
- logger, statistics, env);
+ s = MergeHelper::TimedFullMerge(
+ merge_operator, key, merge_data, merge_context.GetOperands(),
+ pinnable_val->GetSelf(), logger, statistics, env);
+ pinnable_val->PinSelf();
} else {
s = Status::InvalidArgument("Options::merge_operator must be set");
}
Status s = rep->write_batch.RollbackToSavePoint();
if (s.ok()) {
+ rep->sub_batch_cnt = 1;
+ rep->last_sub_batch_offset = 0;
s = rep->ReBuildIndex();
}
return s;
}
+Status WriteBatchWithIndex::PopSavePoint() {
+ return rep->write_batch.PopSavePoint();
+}
+
void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
rep->write_batch.SetMaxBytes(max_bytes);
}
+size_t WriteBatchWithIndex::GetDataSize() const {
+ return rep->write_batch.GetDataSize();
+}
+
} // namespace rocksdb
#endif // !ROCKSDB_LITE