#include "db/write_batch_internal.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
+#include "port/lang.h"
#include "rocksdb/merge_operator.h"
#include "util/autovector.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/duplicate_detector.h"
#include "util/string_util.h"
-#include "util/util.h"
namespace ROCKSDB_NAMESPACE {
auto rv = content_flags_.load(std::memory_order_relaxed);
if ((rv & ContentFlags::DEFERRED) != 0) {
BatchContentClassifier classifier;
- Iterate(&classifier);
+ // Should we handle status here?
+ Iterate(&classifier).PermitUncheckedError();
rv = classifier.content_flags;
// this method is conceptually const, because it is performing a lazy
case kTypeBeginPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
- handler->MarkBeginPrepare();
+ s = handler->MarkBeginPrepare();
+ assert(s.ok());
empty_batch = false;
if (!handler->WriteAfterCommit()) {
s = Status::NotSupported(
case kTypeBeginPersistedPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
- handler->MarkBeginPrepare();
+ s = handler->MarkBeginPrepare();
+ assert(s.ok());
empty_batch = false;
if (handler->WriteAfterCommit()) {
s = Status::NotSupported(
case kTypeBeginUnprepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
- handler->MarkBeginPrepare(true /* unprepared */);
+ s = handler->MarkBeginPrepare(true /* unprepared */);
+ assert(s.ok());
empty_batch = false;
if (handler->WriteAfterCommit()) {
s = Status::NotSupported(
case kTypeEndPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
- handler->MarkEndPrepare(xid);
+ s = handler->MarkEndPrepare(xid);
+ assert(s.ok());
empty_batch = true;
break;
case kTypeCommitXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
- handler->MarkCommit(xid);
+ s = handler->MarkCommit(xid);
+ assert(s.ok());
empty_batch = true;
break;
case kTypeRollbackXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
- handler->MarkRollback(xid);
+ s = handler->MarkRollback(xid);
+ assert(s.ok());
empty_batch = true;
break;
case kTypeNoop:
- handler->MarkNoop(empty_batch);
+ s = handler->MarkNoop(empty_batch);
+ assert(s.ok());
empty_batch = true;
break;
default:
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
- PutLengthPrefixedSlice(&b->rep_, key);
+ if (0 == b->timestamp_size_) {
+ PutLengthPrefixedSlice(&b->rep_, key);
+ } else {
+ PutVarint32(&b->rep_,
+ static_cast<uint32_t>(key.size() + b->timestamp_size_));
+ b->rep_.append(key.data(), key.size());
+ b->rep_.append(b->timestamp_size_, '\0');
+ }
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
- PutLengthPrefixedSliceParts(&b->rep_, key);
+ if (0 == b->timestamp_size_) {
+ PutLengthPrefixedSliceParts(&b->rep_, key);
+ } else {
+ PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
+ }
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
ignore_missing_column_families_(ignore_missing_column_families),
recovering_log_number_(recovering_log_number),
log_number_ref_(0),
- db_(static_cast_with_check<DBImpl, DB>(db)),
+ db_(static_cast_with_check<DBImpl>(db)),
concurrent_memtable_writes_(concurrent_memtable_writes),
post_info_created_(false),
has_valid_writes_(has_valid_writes),
const Slice& value, ValueType value_type) {
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
- WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
- return Status::OK();
+ return WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key,
+ value);
// else insert the values to the memtable right away
}
- Status seek_status;
- if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+ Status ret_status;
+ if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
- WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
+ ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
+ key, value);
+ assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key);
}
MaybeAdvanceSeq(batch_boundry);
- return seek_status;
+ return ret_status;
}
- Status ret_status;
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetImmutableMemTableOptions();
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object.
- WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
+ ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
+ key, value);
+ assert(ret_status.ok());
}
// Since all Puts are logged in transaction logs (if enabled), always bump
// sequence number. Even if the update eventually fails and does not result
Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
- WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
- return Status::OK();
+ return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
// else insert the values to the memtable right away
}
- Status seek_status;
- if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+ Status ret_status;
+ if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
- WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
+ ret_status =
+ WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
+ assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key);
}
MaybeAdvanceSeq(batch_boundry);
- return seek_status;
+ return ret_status;
}
- auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
+ ColumnFamilyData* cfd = cf_mems_->current();
+ assert(!cfd || cfd->user_comparator());
+ const size_t ts_sz = (cfd && cfd->user_comparator())
+ ? cfd->user_comparator()->timestamp_size()
+ : 0;
+ const ValueType delete_type =
+ (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
+ ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type);
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object.
- WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
+ ret_status =
+ WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
}
return ret_status;
}
Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
- WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
- return Status::OK();
+ return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
+ key);
// else insert the values to the memtable right away
}
- Status seek_status;
- if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+ Status ret_status;
+ if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
- WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
- key);
+ ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
+ column_family_id, key);
+ assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key);
}
MaybeAdvanceSeq(batch_boundry);
- return seek_status;
+ return ret_status;
}
- auto ret_status =
+ ret_status =
DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object.
- WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
+ ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
+ column_family_id, key);
}
return ret_status;
}
const Slice& end_key) override {
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
- WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
- begin_key, end_key);
- return Status::OK();
+ return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
+ begin_key, end_key);
// else insert the values to the memtable right away
}
- Status seek_status;
- if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+ Status ret_status;
+ if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
- WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
- begin_key, end_key);
+ ret_status = WriteBatchInternal::DeleteRange(
+ rebuilding_trx_, column_family_id, begin_key, end_key);
+ assert(ret_status.ok());
// TODO(myabandeh): when transactional DeleteRange support is added,
// check if end_key must also be added.
batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key);
}
MaybeAdvanceSeq(batch_boundry);
- return seek_status;
+ return ret_status;
}
if (db_ != nullptr) {
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
- auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cf_handle)->cfd();
+ auto* cfd =
+ static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd();
if (!cfd->is_delete_range_supported()) {
return Status::NotSupported(
std::string("DeleteRange not supported for table type ") +
cfd->ioptions()->table_factory->Name() + " in CF " +
cfd->GetName());
}
+ int cmp = cfd->user_comparator()->Compare(begin_key, end_key);
+ if (cmp > 0) {
+ // It's an empty range where endpoints appear mistaken. Don't bother
+ // applying it to the DB, and return an error to the user.
+ return Status::InvalidArgument("end key comes before start key");
+ } else if (cmp == 0) {
+ // It's an empty range. Don't bother applying it to the DB.
+ return Status::OK();
+ }
}
- auto ret_status =
+ ret_status =
DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object.
- WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
- begin_key, end_key);
+ ret_status = WriteBatchInternal::DeleteRange(
+ rebuilding_trx_, column_family_id, begin_key, end_key);
}
return ret_status;
}
const Slice& value) override {
// optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
- WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
- return Status::OK();
+ return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
+ value);
// else insert the values to the memtable right away
}
- Status seek_status;
- if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
+ Status ret_status;
+ if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
- WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
- value);
+ ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
+ column_family_id, key, value);
+ assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key);
}
MaybeAdvanceSeq(batch_boundry);
- return seek_status;
+ return ret_status;
}
- Status ret_status;
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetImmutableMemTableOptions();
bool perform_merge = false;
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object.
- WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
+ ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
+ key, value);
+ assert(ret_status.ok());
}
MaybeAdvanceSeq();
CheckMemtableFull();