1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
12 #include "utilities/transactions/write_prepared_txn.h"
13 #include "utilities/transactions/write_unprepared_txn_db.h"
15 namespace ROCKSDB_NAMESPACE
{
17 class WriteUnpreparedTxnDB
;
18 class WriteUnpreparedTxn
;
20 // WriteUnprepared transactions needs to be able to read their own uncommitted
21 // writes, and supporting this requires some careful consideration. Because
22 // writes in the current transaction may be flushed to DB already, we cannot
23 // rely on the contents of WriteBatchWithIndex to determine whether a key should
24 // be visible or not, so we have to remember to check the DB for any uncommitted
25 // keys that should be visible to us. First, we will need to change the seek to
26 // snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
27 // Any key greater than max_visible_seq should not be visible because they
28 // cannot be unprepared by the current transaction and they are not in its
31 // When we seek to max_visible_seq, one of these cases will happen:
32 // 1. We hit a unprepared key from the current transaction.
33 // 2. We hit a unprepared key from the another transaction.
34 // 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
35 // 4. We hit a committed key with seq <= snap_seq.
37 // IsVisibleFullCheck handles all cases correctly.
40 // Note that max_visible_seq is only calculated once at iterator construction
41 // time, meaning if the same transaction is adding more unprep seqs through
42 // writes during iteration, these newer writes may not be visible. This is not a
43 // problem for MySQL though because it avoids modifying the index as it is
44 // scanning through it to avoid the Halloween Problem. Instead, it scans the
45 // index once up front, and modifies based on a temporary copy.
47 // In DBIter, there is a "reseek" optimization if the iterator skips over too
48 // many keys. However, this assumes that the reseek seeks exactly to the
49 // required key. In write unprepared, even after seeking directly to
50 // max_visible_seq, some iteration may be required before hitting a visible key,
51 // and special precautions must be taken to avoid performing another reseek,
52 // leading to an infinite loop.
54 class WriteUnpreparedTxnReadCallback
: public ReadCallback
{
56 WriteUnpreparedTxnReadCallback(
57 WritePreparedTxnDB
* db
, SequenceNumber snapshot
,
58 SequenceNumber min_uncommitted
,
59 const std::map
<SequenceNumber
, size_t>& unprep_seqs
,
60 SnapshotBackup backed_by_snapshot
)
61 // Pass our last uncommitted seq as the snapshot to the parent class to
62 // ensure that the parent will not prematurely filter out own writes. We
63 // will do the exact comparison against snapshots in IsVisibleFullCheck
65 : ReadCallback(CalcMaxVisibleSeq(unprep_seqs
, snapshot
), min_uncommitted
),
67 unprep_seqs_(unprep_seqs
),
68 wup_snapshot_(snapshot
),
69 backed_by_snapshot_(backed_by_snapshot
) {
70 (void)backed_by_snapshot_
; // to silence unused private field warning
73 virtual ~WriteUnpreparedTxnReadCallback() {
74 // If it is not backed by snapshot, the caller must check validity
75 assert(valid_checked_
|| backed_by_snapshot_
== kBackedByDBSnapshot
);
78 virtual bool IsVisibleFullCheck(SequenceNumber seq
) override
;
81 valid_checked_
= true;
82 return snap_released_
== false;
85 void Refresh(SequenceNumber seq
) override
{
86 max_visible_seq_
= std::max(max_visible_seq_
, seq
);
90 static SequenceNumber
CalcMaxVisibleSeq(
91 const std::map
<SequenceNumber
, size_t>& unprep_seqs
,
92 SequenceNumber snapshot_seq
) {
93 SequenceNumber max_unprepared
= 0;
94 if (unprep_seqs
.size()) {
96 unprep_seqs
.rbegin()->first
+ unprep_seqs
.rbegin()->second
- 1;
98 return std::max(max_unprepared
, snapshot_seq
);
102 WritePreparedTxnDB
* db_
;
103 const std::map
<SequenceNumber
, size_t>& unprep_seqs_
;
104 SequenceNumber wup_snapshot_
;
105 // Whether max_visible_seq_ is backed by a snapshot
106 const SnapshotBackup backed_by_snapshot_
;
107 bool snap_released_
= false;
108 // Safety check to ensure that the caller has checked invalid statuses
109 bool valid_checked_
= false;
112 class WriteUnpreparedTxn
: public WritePreparedTxn
{
114 WriteUnpreparedTxn(WriteUnpreparedTxnDB
* db
,
115 const WriteOptions
& write_options
,
116 const TransactionOptions
& txn_options
);
118 virtual ~WriteUnpreparedTxn();
120 using TransactionBaseImpl::Put
;
121 virtual Status
Put(ColumnFamilyHandle
* column_family
, const Slice
& key
,
123 const bool assume_tracked
= false) override
;
124 virtual Status
Put(ColumnFamilyHandle
* column_family
, const SliceParts
& key
,
125 const SliceParts
& value
,
126 const bool assume_tracked
= false) override
;
128 using TransactionBaseImpl::Merge
;
129 virtual Status
Merge(ColumnFamilyHandle
* column_family
, const Slice
& key
,
131 const bool assume_tracked
= false) override
;
133 using TransactionBaseImpl::Delete
;
134 virtual Status
Delete(ColumnFamilyHandle
* column_family
, const Slice
& key
,
135 const bool assume_tracked
= false) override
;
136 virtual Status
Delete(ColumnFamilyHandle
* column_family
,
137 const SliceParts
& key
,
138 const bool assume_tracked
= false) override
;
140 using TransactionBaseImpl::SingleDelete
;
141 virtual Status
SingleDelete(ColumnFamilyHandle
* column_family
,
143 const bool assume_tracked
= false) override
;
144 virtual Status
SingleDelete(ColumnFamilyHandle
* column_family
,
145 const SliceParts
& key
,
146 const bool assume_tracked
= false) override
;
148 // In WriteUnprepared, untracked writes will break snapshot validation logic.
149 // Snapshot validation will only check the largest sequence number of a key to
150 // see if it was committed or not. However, an untracked unprepared write will
151 // hide smaller committed sequence numbers.
153 // TODO(lth): Investigate whether it is worth having snapshot validation
154 // validate all values larger than snap_seq. Otherwise, we should return
155 // Status::NotSupported for untracked writes.
157 virtual Status
RebuildFromWriteBatch(WriteBatch
*) override
;
159 virtual uint64_t GetLastLogNumber() const override
{
160 return last_log_number_
;
163 void RemoveActiveIterator(Iterator
* iter
) {
164 active_iterators_
.erase(
165 std::remove(active_iterators_
.begin(), active_iterators_
.end(), iter
),
166 active_iterators_
.end());
170 void Initialize(const TransactionOptions
& txn_options
) override
;
172 Status
PrepareInternal() override
;
174 Status
CommitWithoutPrepareInternal() override
;
175 Status
CommitInternal() override
;
177 Status
RollbackInternal() override
;
179 void Clear() override
;
181 void SetSavePoint() override
;
182 Status
RollbackToSavePoint() override
;
183 Status
PopSavePoint() override
;
185 // Get and GetIterator needs to be overridden so that a ReadCallback to
186 // handle read-your-own-write is used.
187 using Transaction::Get
;
188 virtual Status
Get(const ReadOptions
& options
,
189 ColumnFamilyHandle
* column_family
, const Slice
& key
,
190 PinnableSlice
* value
) override
;
192 using Transaction::MultiGet
;
193 virtual void MultiGet(const ReadOptions
& options
,
194 ColumnFamilyHandle
* column_family
,
195 const size_t num_keys
, const Slice
* keys
,
196 PinnableSlice
* values
, Status
* statuses
,
197 const bool sorted_input
= false) override
;
199 using Transaction::GetIterator
;
200 virtual Iterator
* GetIterator(const ReadOptions
& options
) override
;
201 virtual Iterator
* GetIterator(const ReadOptions
& options
,
202 ColumnFamilyHandle
* column_family
) override
;
204 virtual Status
ValidateSnapshot(ColumnFamilyHandle
* column_family
,
206 SequenceNumber
* tracked_at_seq
) override
;
209 friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test
;
210 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test
;
211 friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test
;
212 friend class WriteUnpreparedTxnDB
;
214 const std::map
<SequenceNumber
, size_t>& GetUnpreparedSequenceNumbers();
215 Status
WriteRollbackKeys(const TransactionKeyMap
& tracked_keys
,
216 WriteBatchWithIndex
* rollback_batch
,
217 ReadCallback
* callback
, const ReadOptions
& roptions
);
219 Status
MaybeFlushWriteBatchToDB();
220 Status
FlushWriteBatchToDB(bool prepared
);
221 Status
FlushWriteBatchToDBInternal(bool prepared
);
222 Status
FlushWriteBatchWithSavePointToDB();
223 Status
RollbackToSavePointInternal();
224 Status
HandleWrite(std::function
<Status()> do_write
);
226 // For write unprepared, we check on every writebatch append to see if
227 // write_batch_flush_threshold_ has been exceeded, and then call
228 // FlushWriteBatchToDB if so. This logic is encapsulated in
229 // MaybeFlushWriteBatchToDB.
230 int64_t write_batch_flush_threshold_
;
231 WriteUnpreparedTxnDB
* wupt_db_
;
233 // Ordered list of unprep_seq sequence numbers that we have already written
236 // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
237 // written by this transaction.
239 // Note that this contains both prepared and unprepared batches, since they
240 // are treated similarily in prepare heap/commit map, so it simplifies the
242 std::map
<SequenceNumber
, size_t> unprep_seqs_
;
244 uint64_t last_log_number_
;
246 // Recovered transactions have tracked_keys_ populated, but are not actually
247 // locked for efficiency reasons. For recovered transactions, skip unlocking
248 // keys when transaction ends.
251 // Track the largest sequence number at which we performed snapshot
252 // validation. If snapshot validation was skipped because no snapshot was set,
253 // then this is set to GetLastPublishedSequence. This value is useful because
254 // it means that for keys that have unprepared seqnos, we can guarantee that
255 // no committed keys by other transactions can exist between
256 // largest_validated_seq_ and max_unprep_seq. See
257 // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
258 // necessary for iterator Prev().
260 // Currently this value only increases during the lifetime of a transaction,
261 // but in some cases, we should be able to restore the previously largest
262 // value when calling RollbackToSavepoint.
263 SequenceNumber largest_validated_seq_
;
265 using KeySet
= std::unordered_map
<uint32_t, std::vector
<std::string
>>;
267 // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
268 // used during RollbackToSavepoint to determine visibility when restoring
271 // TODO(lth): Since all unprep_seqs_ sets further down the stack must be
272 // subsets, this can potentially be deduplicated by just storing set
273 // difference. Investigate if this is worth it.
274 std::map
<SequenceNumber
, size_t> unprep_seqs_
;
276 // This snapshot will be used to read keys at this savepoint if we call
277 // RollbackToSavePoint.
278 std::unique_ptr
<ManagedSnapshot
> snapshot_
;
280 SavePoint(const std::map
<SequenceNumber
, size_t>& seqs
,
281 ManagedSnapshot
* snapshot
)
282 : unprep_seqs_(seqs
), snapshot_(snapshot
){};
285 // We have 3 data structures holding savepoint information:
286 // 1. TransactionBaseImpl::save_points_
287 // 2. WriteUnpreparedTxn::flushed_save_points_
288 // 3. WriteUnpreparecTxn::unflushed_save_points_
290 // TransactionBaseImpl::save_points_ holds information about all write
291 // batches, including the current in-memory write_batch_, or unprepared
292 // batches that have been written out. Its responsibility is just to track
293 // which keys have been modified in every savepoint.
295 // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints
296 // set on unprepared batches that have already flushed. It holds the snapshot
297 // and unprep_seqs at that savepoint, so that the rollback process can
298 // determine which keys were visible at that point in time.
300 // WriteUnpreparecTxn::unflushed_save_points_ holds information about
301 // savepoints on the current in-memory write_batch_. It simply records the
302 // size of the write batch at every savepoint.
304 // TODO(lth): Remove the redundancy between save_point_boundaries_ and
305 // write_batch_.save_points_.
307 // Based on this information, here are some invariants:
308 // size(unflushed_save_points_) = size(write_batch_.save_points_)
309 // size(flushed_save_points_) + size(unflushed_save_points_)
310 // = size(save_points_)
312 std::unique_ptr
<autovector
<WriteUnpreparedTxn::SavePoint
>>
313 flushed_save_points_
;
314 std::unique_ptr
<autovector
<size_t>> unflushed_save_points_
;
316 // It is currently unsafe to flush a write batch if there are active iterators
317 // created from this transaction. This is because we use WriteBatchWithIndex
318 // to do merging reads from the DB and the write batch. If we flush the write
319 // batch, it is possible that the delta iterator on the iterator will point to
321 std::vector
<Iterator
*> active_iterators_
;
323 // Untracked keys that we have to rollback.
325 // TODO(lth): Currently we we do not record untracked keys per-savepoint.
326 // This means that when rolling back to savepoints, we have to check all
327 // keys in the current transaction for rollback. Note that this is only
328 // inefficient, but still correct because we take a snapshot at every
329 // savepoint, and we will use that snapshot to construct the rollback batch.
330 // The rollback batch will then contain a reissue of the same marker.
332 // A more optimal solution would be to only check keys changed since the
333 // last savepoint. Also, it may make sense to merge this into tracked_keys_
334 // and differentiate between tracked but not locked keys to avoid having two
335 // very similar data structures.
336 KeySet untracked_keys_
;
339 } // namespace ROCKSDB_NAMESPACE
341 #endif // ROCKSDB_LITE