1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/transaction_util.h"
14 #include "db/db_impl/db_impl.h"
15 #include "rocksdb/status.h"
16 #include "rocksdb/utilities/write_batch_with_index.h"
17 #include "util/cast_util.h"
18 #include "util/string_util.h"
20 namespace ROCKSDB_NAMESPACE
{
22 Status
TransactionUtil::CheckKeyForConflicts(
23 DBImpl
* db_impl
, ColumnFamilyHandle
* column_family
, const std::string
& key
,
24 SequenceNumber snap_seq
, const std::string
* const read_ts
, bool cache_only
,
25 ReadCallback
* snap_checker
, SequenceNumber min_uncommitted
) {
28 auto cfh
= static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
);
29 auto cfd
= cfh
->cfd();
30 SuperVersion
* sv
= db_impl
->GetAndRefSuperVersion(cfd
);
33 result
= Status::InvalidArgument("Could not access column family " +
38 SequenceNumber earliest_seq
=
39 db_impl
->GetEarliestMemTableSequenceNumber(sv
, true);
41 result
= CheckKey(db_impl
, sv
, earliest_seq
, snap_seq
, key
, read_ts
,
42 cache_only
, snap_checker
, min_uncommitted
);
44 db_impl
->ReturnAndCleanupSuperVersion(cfd
, sv
);
50 Status
TransactionUtil::CheckKey(DBImpl
* db_impl
, SuperVersion
* sv
,
51 SequenceNumber earliest_seq
,
52 SequenceNumber snap_seq
,
53 const std::string
& key
,
54 const std::string
* const read_ts
,
55 bool cache_only
, ReadCallback
* snap_checker
,
56 SequenceNumber min_uncommitted
) {
57 // When `min_uncommitted` is provided, keys are not always committed
58 // in sequence number order, and `snap_checker` is used to check whether
59 // specific sequence number is in the database is visible to the transaction.
60 // So `snap_checker` must be provided.
61 assert(min_uncommitted
== kMaxSequenceNumber
|| snap_checker
!= nullptr);
64 bool need_to_read_sst
= false;
66 // Since it would be too slow to check the SST files, we will only use
67 // the memtables to check whether there have been any recent writes
68 // to this key after it was accessed in this transaction. But if the
69 // Memtables do not contain a long enough history, we must fail the
71 if (earliest_seq
== kMaxSequenceNumber
) {
72 // The age of this memtable is unknown. Cannot rely on it to check
73 // for recent writes. This error shouldn't happen often in practice as
74 // the Memtable should have a valid earliest sequence number except in some
75 // corner cases (such as error cases during recovery).
76 need_to_read_sst
= true;
79 result
= Status::TryAgain(
80 "Transaction could not check for conflicts as the MemTable does not "
81 "contain a long enough history to check write at SequenceNumber: ",
82 std::to_string(snap_seq
));
84 } else if (snap_seq
< earliest_seq
|| min_uncommitted
<= earliest_seq
) {
85 // Use <= for min_uncommitted since earliest_seq is actually the largest sec
86 // before this memtable was created
87 need_to_read_sst
= true;
90 // The age of this memtable is too new to use to check for recent
93 snprintf(msg
, sizeof(msg
),
94 "Transaction could not check for conflicts for operation at "
95 "SequenceNumber %" PRIu64
96 " as the MemTable only contains changes newer than "
97 "SequenceNumber %" PRIu64
98 ". Increasing the value of the "
99 "max_write_buffer_size_to_maintain option could reduce the "
102 snap_seq
, earliest_seq
);
103 result
= Status::TryAgain(msg
);
108 SequenceNumber seq
= kMaxSequenceNumber
;
109 std::string timestamp
;
110 bool found_record_for_key
= false;
112 // When min_uncommitted == kMaxSequenceNumber, writes are committed in
113 // sequence number order, so only keys larger than `snap_seq` can cause
115 // When min_uncommitted != kMaxSequenceNumber, keys lower than
116 // min_uncommitted will not triggered conflicts, while keys larger than
117 // min_uncommitted might create conflicts, so we need to read them out
118 // from the DB, and call callback to snap_checker to determine. So only
119 // keys lower than min_uncommitted can be skipped.
120 SequenceNumber lower_bound_seq
=
121 (min_uncommitted
== kMaxSequenceNumber
) ? snap_seq
: min_uncommitted
;
122 Status s
= db_impl
->GetLatestSequenceForKey(
123 sv
, key
, !need_to_read_sst
, lower_bound_seq
, &seq
,
124 !read_ts
? nullptr : ×tamp
, &found_record_for_key
,
125 /*is_blob_index=*/nullptr);
127 if (!(s
.ok() || s
.IsNotFound() || s
.IsMergeInProgress())) {
129 } else if (found_record_for_key
) {
130 bool write_conflict
= snap_checker
== nullptr
132 : !snap_checker
->IsVisible(seq
);
133 // Perform conflict checking based on timestamp if applicable.
134 if (!write_conflict
&& read_ts
!= nullptr) {
135 ColumnFamilyData
* cfd
= sv
->cfd
;
137 const Comparator
* const ucmp
= cfd
->user_comparator();
139 assert(read_ts
->size() == ucmp
->timestamp_size());
140 assert(read_ts
->size() == timestamp
.size());
141 // Write conflict if *ts < timestamp.
142 write_conflict
= ucmp
->CompareTimestamp(*read_ts
, timestamp
) < 0;
144 if (write_conflict
) {
145 result
= Status::Busy();
153 Status
TransactionUtil::CheckKeysForConflicts(DBImpl
* db_impl
,
154 const LockTracker
& tracker
,
158 std::unique_ptr
<LockTracker::ColumnFamilyIterator
> cf_it(
159 tracker
.GetColumnFamilyIterator());
160 assert(cf_it
!= nullptr);
161 while (cf_it
->HasNext()) {
162 ColumnFamilyId cf
= cf_it
->Next();
164 SuperVersion
* sv
= db_impl
->GetAndRefSuperVersion(cf
);
166 result
= Status::InvalidArgument("Could not access column family " +
171 SequenceNumber earliest_seq
=
172 db_impl
->GetEarliestMemTableSequenceNumber(sv
, true);
174 // For each of the keys in this transaction, check to see if someone has
175 // written to this key since the start of the transaction.
176 std::unique_ptr
<LockTracker::KeyIterator
> key_it(
177 tracker
.GetKeyIterator(cf
));
178 assert(key_it
!= nullptr);
179 while (key_it
->HasNext()) {
180 const std::string
& key
= key_it
->Next();
181 PointLockStatus status
= tracker
.GetPointLockStatus(cf
, key
);
182 const SequenceNumber key_seq
= status
.seq
;
184 // TODO: support timestamp-based conflict checking.
185 // CheckKeysForConflicts() is currently used only by optimistic
187 result
= CheckKey(db_impl
, sv
, earliest_seq
, key_seq
, key
,
188 /*read_ts=*/nullptr, cache_only
);
194 db_impl
->ReturnAndCleanupSuperVersion(cf
, sv
);
204 } // namespace ROCKSDB_NAMESPACE
206 #endif // ROCKSDB_LITE