// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
namespace rocksdb {
-Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl,
- ColumnFamilyHandle* column_family,
- const std::string& key,
- SequenceNumber key_seq,
- bool cache_only) {
+Status TransactionUtil::CheckKeyForConflicts(
+ DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key,
+ SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker) {
Status result;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
SequenceNumber earliest_seq =
db_impl->GetEarliestMemTableSequenceNumber(sv, true);
- result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only);
+ result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, cache_only,
+ snap_checker);
db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
}
Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
SequenceNumber earliest_seq,
- SequenceNumber key_seq, const std::string& key,
- bool cache_only) {
+ SequenceNumber snap_seq,
+ const std::string& key, bool cache_only,
+ ReadCallback* snap_checker) {
Status result;
bool need_to_read_sst = false;
result = Status::TryAgain(
"Transaction ould not check for conflicts as the MemTable does not "
"countain a long enough history to check write at SequenceNumber: ",
- ToString(key_seq));
+ ToString(snap_seq));
}
- } else if (key_seq < earliest_seq) {
+ } else if (snap_seq < earliest_seq) {
need_to_read_sst = true;
if (cache_only) {
"max_write_buffer_number_to_maintain option could reduce the "
"frequency "
"of this error.",
- key_seq, earliest_seq);
+ snap_seq, earliest_seq);
result = Status::TryAgain(msg);
}
}
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
result = s;
- } else if (found_record_for_key && (seq > key_seq)) {
- // Write Conflict
- result = Status::Busy();
+ } else if (found_record_for_key) {
+ bool write_conflict = snap_checker == nullptr
+ ? snap_seq < seq
+ : !snap_checker->IsVisible(seq);
+ if (write_conflict) {
+ result = Status::Busy();
+ }
}
}