1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/write_prepared_txn.h"
14 #include "db/column_family.h"
15 #include "db/db_impl/db_impl.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/status.h"
18 #include "rocksdb/utilities/transaction_db.h"
19 #include "util/cast_util.h"
20 #include "utilities/transactions/pessimistic_transaction.h"
21 #include "utilities/transactions/write_prepared_txn_db.h"
23 namespace ROCKSDB_NAMESPACE
{
27 WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB
* txn_db
,
28 const WriteOptions
& write_options
,
29 const TransactionOptions
& txn_options
)
30 : PessimisticTransaction(txn_db
, write_options
, txn_options
, false),
32 // Call Initialize outside PessimisticTransaction constructor otherwise it
33 // would skip overridden functions in WritePreparedTxn since they are not
34 // defined yet in the constructor of PessimisticTransaction
35 Initialize(txn_options
);
38 void WritePreparedTxn::Initialize(const TransactionOptions
& txn_options
) {
39 PessimisticTransaction::Initialize(txn_options
);
40 prepare_batch_cnt_
= 0;
43 void WritePreparedTxn::MultiGet(const ReadOptions
& options
,
44 ColumnFamilyHandle
* column_family
,
45 const size_t num_keys
, const Slice
* keys
,
46 PinnableSlice
* values
, Status
* statuses
,
47 const bool sorted_input
) {
48 SequenceNumber min_uncommitted
, snap_seq
;
49 const SnapshotBackup backed_by_snapshot
=
50 wpt_db_
->AssignMinMaxSeqs(options
.snapshot
, &min_uncommitted
, &snap_seq
);
51 WritePreparedTxnReadCallback
callback(wpt_db_
, snap_seq
, min_uncommitted
,
53 write_batch_
.MultiGetFromBatchAndDB(db_
, options
, column_family
, num_keys
,
54 keys
, values
, statuses
, sorted_input
,
56 if (UNLIKELY(!callback
.valid() ||
57 !wpt_db_
->ValidateSnapshot(snap_seq
, backed_by_snapshot
))) {
58 wpt_db_
->WPRecordTick(TXN_GET_TRY_AGAIN
);
59 for (size_t i
= 0; i
< num_keys
; i
++) {
60 statuses
[i
] = Status::TryAgain();
65 Status
WritePreparedTxn::Get(const ReadOptions
& options
,
66 ColumnFamilyHandle
* column_family
,
67 const Slice
& key
, PinnableSlice
* pinnable_val
) {
68 SequenceNumber min_uncommitted
, snap_seq
;
69 const SnapshotBackup backed_by_snapshot
=
70 wpt_db_
->AssignMinMaxSeqs(options
.snapshot
, &min_uncommitted
, &snap_seq
);
71 WritePreparedTxnReadCallback
callback(wpt_db_
, snap_seq
, min_uncommitted
,
73 auto res
= write_batch_
.GetFromBatchAndDB(db_
, options
, column_family
, key
,
74 pinnable_val
, &callback
);
75 if (LIKELY(callback
.valid() &&
76 wpt_db_
->ValidateSnapshot(callback
.max_visible_seq(),
77 backed_by_snapshot
))) {
80 wpt_db_
->WPRecordTick(TXN_GET_TRY_AGAIN
);
81 return Status::TryAgain();
85 Iterator
* WritePreparedTxn::GetIterator(const ReadOptions
& options
) {
86 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
87 Iterator
* db_iter
= wpt_db_
->NewIterator(options
);
90 return write_batch_
.NewIteratorWithBase(db_iter
);
93 Iterator
* WritePreparedTxn::GetIterator(const ReadOptions
& options
,
94 ColumnFamilyHandle
* column_family
) {
95 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
96 Iterator
* db_iter
= wpt_db_
->NewIterator(options
, column_family
);
99 return write_batch_
.NewIteratorWithBase(column_family
, db_iter
);
102 Status
WritePreparedTxn::PrepareInternal() {
103 WriteOptions write_options
= write_options_
;
104 write_options
.disableWAL
= false;
105 const bool WRITE_AFTER_COMMIT
= true;
106 const bool kFirstPrepareBatch
= true;
107 auto s
= WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
108 name_
, !WRITE_AFTER_COMMIT
);
110 // For each duplicate key we account for a new sub-batch
111 prepare_batch_cnt_
= GetWriteBatch()->SubBatchCnt();
112 // Having AddPrepared in the PreReleaseCallback allows in-order addition of
113 // prepared entries to PreparedHeap and hence enables an optimization. Refer to
114 // SmallestUnCommittedSeq for more details.
115 AddPreparedCallback
add_prepared_callback(
116 wpt_db_
, db_impl_
, prepare_batch_cnt_
,
117 db_impl_
->immutable_db_options().two_write_queues
, kFirstPrepareBatch
);
118 const bool DISABLE_MEMTABLE
= true;
119 uint64_t seq_used
= kMaxSequenceNumber
;
120 s
= db_impl_
->WriteImpl(write_options
, GetWriteBatch()->GetWriteBatch(),
121 /*callback*/ nullptr, &log_number_
, /*log ref*/ 0,
122 !DISABLE_MEMTABLE
, &seq_used
, prepare_batch_cnt_
,
123 &add_prepared_callback
);
124 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
125 auto prepare_seq
= seq_used
;
130 Status
WritePreparedTxn::CommitWithoutPrepareInternal() {
131 // For each duplicate key we account for a new sub-batch
132 const size_t batch_cnt
= GetWriteBatch()->SubBatchCnt();
133 return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt
);
136 Status
WritePreparedTxn::CommitBatchInternal(WriteBatch
* batch
,
138 return wpt_db_
->WriteInternal(write_options_
, batch
, batch_cnt
, this);
141 Status
WritePreparedTxn::CommitInternal() {
142 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
143 "CommitInternal prepare_seq: %" PRIu64
, GetID());
144 // We take the commit-time batch and append the Commit marker.
145 // The Memtable will ignore the Commit marker in non-recovery mode
146 WriteBatch
* working_batch
= GetCommitTimeWriteBatch();
147 const bool empty
= working_batch
->Count() == 0;
148 auto s
= WriteBatchInternal::MarkCommit(working_batch
, name_
);
151 const bool for_recovery
= use_only_the_last_commit_time_batch_for_recovery_
;
152 if (!empty
&& for_recovery
) {
153 // When not writing to memtable, we can still cache the latest write batch.
154 // The cached batch will be written to memtable in WriteRecoverableState
155 // during FlushMemTable
156 WriteBatchInternal::SetAsLastestPersistentState(working_batch
);
159 auto prepare_seq
= GetId();
160 const bool includes_data
= !empty
&& !for_recovery
;
161 assert(prepare_batch_cnt_
);
162 size_t commit_batch_cnt
= 0;
163 if (UNLIKELY(includes_data
)) {
164 ROCKS_LOG_WARN(db_impl_
->immutable_db_options().info_log
,
165 "Duplicate key overhead");
166 SubBatchCounter
counter(*wpt_db_
->GetCFComparatorMap());
167 s
= working_batch
->Iterate(&counter
);
169 commit_batch_cnt
= counter
.BatchCount();
171 const bool disable_memtable
= !includes_data
;
172 const bool do_one_write
=
173 !db_impl_
->immutable_db_options().two_write_queues
|| disable_memtable
;
174 WritePreparedCommitEntryPreReleaseCallback
update_commit_map(
175 wpt_db_
, db_impl_
, prepare_seq
, prepare_batch_cnt_
, commit_batch_cnt
);
176 // This is to call AddPrepared on CommitTimeWriteBatch
177 const bool kFirstPrepareBatch
= true;
178 AddPreparedCallback
add_prepared_callback(
179 wpt_db_
, db_impl_
, commit_batch_cnt
,
180 db_impl_
->immutable_db_options().two_write_queues
, !kFirstPrepareBatch
);
181 PreReleaseCallback
* pre_release_callback
;
183 pre_release_callback
= &update_commit_map
;
185 pre_release_callback
= &add_prepared_callback
;
187 uint64_t seq_used
= kMaxSequenceNumber
;
188 // Since the prepared batch is directly written to memtable, there is already
189 // a connection between the memtable and its WAL, so there is no need to
190 // redundantly reference the log that contains the prepared data.
191 const uint64_t zero_log_number
= 0ull;
192 size_t batch_cnt
= UNLIKELY(commit_batch_cnt
) ? commit_batch_cnt
: 1;
193 s
= db_impl_
->WriteImpl(write_options_
, working_batch
, nullptr, nullptr,
194 zero_log_number
, disable_memtable
, &seq_used
,
195 batch_cnt
, pre_release_callback
);
196 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
197 const SequenceNumber commit_batch_seq
= seq_used
;
198 if (LIKELY(do_one_write
|| !s
.ok())) {
199 if (UNLIKELY(!db_impl_
->immutable_db_options().two_write_queues
&&
201 // Note: RemovePrepared should be called after WriteImpl that publishsed
202 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
203 wpt_db_
->RemovePrepared(prepare_seq
, prepare_batch_cnt_
);
204 } // else RemovePrepared is called from within PreReleaseCallback
205 if (UNLIKELY(!do_one_write
)) {
207 // Cleanup the prepared entry we added with add_prepared_callback
208 wpt_db_
->RemovePrepared(commit_batch_seq
, commit_batch_cnt
);
211 } // else do the 2nd write to publish seq
212 // Note: the 2nd write comes with a performance penality. So if we have too
213 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
214 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
215 // two_write_queues should be disabled to avoid many additional writes here.
216 const size_t kZeroData
= 0;
217 // Update commit map only from the 2nd queue
218 WritePreparedCommitEntryPreReleaseCallback
update_commit_map_with_aux_batch(
219 wpt_db_
, db_impl_
, prepare_seq
, prepare_batch_cnt_
, kZeroData
,
220 commit_batch_seq
, commit_batch_cnt
);
221 WriteBatch empty_batch
;
222 s
= empty_batch
.PutLogData(Slice());
224 // In the absence of Prepare markers, use Noop as a batch separator
225 s
= WriteBatchInternal::InsertNoop(&empty_batch
);
227 const bool DISABLE_MEMTABLE
= true;
228 const size_t ONE_BATCH
= 1;
229 const uint64_t NO_REF_LOG
= 0;
230 s
= db_impl_
->WriteImpl(write_options_
, &empty_batch
, nullptr, nullptr,
231 NO_REF_LOG
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
232 &update_commit_map_with_aux_batch
);
233 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
234 if (UNLIKELY(!db_impl_
->immutable_db_options().two_write_queues
)) {
236 // Note: RemovePrepared should be called after WriteImpl that publishsed
237 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
238 wpt_db_
->RemovePrepared(prepare_seq
, prepare_batch_cnt_
);
240 wpt_db_
->RemovePrepared(commit_batch_seq
, commit_batch_cnt
);
241 } // else RemovePrepared is called from within PreReleaseCallback
245 Status
WritePreparedTxn::RollbackInternal() {
246 ROCKS_LOG_WARN(db_impl_
->immutable_db_options().info_log
,
247 "RollbackInternal prepare_seq: %" PRIu64
, GetId());
248 WriteBatch rollback_batch
;
249 assert(GetId() != kMaxSequenceNumber
);
251 auto cf_map_shared_ptr
= wpt_db_
->GetCFHandleMap();
252 auto cf_comp_map_shared_ptr
= wpt_db_
->GetCFComparatorMap();
253 auto read_at_seq
= kMaxSequenceNumber
;
254 ReadOptions roptions
;
255 // to prevent callback's seq to be overrriden inside DBImpk::Get
256 roptions
.snapshot
= wpt_db_
->GetMaxSnapshot();
257 struct RollbackWriteBatchBuilder
: public WriteBatch::Handler
{
259 WritePreparedTxnReadCallback callback
;
260 WriteBatch
* rollback_batch_
;
261 std::map
<uint32_t, const Comparator
*>& comparators_
;
262 std::map
<uint32_t, ColumnFamilyHandle
*>& handles_
;
263 using CFKeys
= std::set
<Slice
, SetComparator
>;
264 std::map
<uint32_t, CFKeys
> keys_
;
265 bool rollback_merge_operands_
;
266 ReadOptions roptions_
;
267 RollbackWriteBatchBuilder(
268 DBImpl
* db
, WritePreparedTxnDB
* wpt_db
, SequenceNumber snap_seq
,
269 WriteBatch
* dst_batch
,
270 std::map
<uint32_t, const Comparator
*>& comparators
,
271 std::map
<uint32_t, ColumnFamilyHandle
*>& handles
,
272 bool rollback_merge_operands
, ReadOptions _roptions
)
274 callback(wpt_db
, snap_seq
), // disable min_uncommitted optimization
275 rollback_batch_(dst_batch
),
276 comparators_(comparators
),
278 rollback_merge_operands_(rollback_merge_operands
),
279 roptions_(_roptions
) {}
281 Status
Rollback(uint32_t cf
, const Slice
& key
) {
283 CFKeys
& cf_keys
= keys_
[cf
];
284 if (cf_keys
.size() == 0) { // just inserted
285 auto cmp
= comparators_
[cf
];
286 keys_
[cf
] = CFKeys(SetComparator(cmp
));
288 auto it
= cf_keys
.insert(key
);
290 false) { // second is false if a element already existed.
294 PinnableSlice pinnable_val
;
296 auto cf_handle
= handles_
[cf
];
297 DBImpl::GetImplOptions get_impl_options
;
298 get_impl_options
.column_family
= cf_handle
;
299 get_impl_options
.value
= &pinnable_val
;
300 get_impl_options
.value_found
= ¬_used
;
301 get_impl_options
.callback
= &callback
;
302 s
= db_
->GetImpl(roptions_
, key
, get_impl_options
);
303 assert(s
.ok() || s
.IsNotFound());
305 s
= rollback_batch_
->Put(cf_handle
, key
, pinnable_val
);
307 } else if (s
.IsNotFound()) {
308 // There has been no readable value before txn. By adding a delete we
309 // make sure that there will be none afterwards either.
310 s
= rollback_batch_
->Delete(cf_handle
, key
);
313 // Unexpected status. Return it to the user.
318 Status
PutCF(uint32_t cf
, const Slice
& key
, const Slice
& /*val*/) override
{
319 return Rollback(cf
, key
);
322 Status
DeleteCF(uint32_t cf
, const Slice
& key
) override
{
323 return Rollback(cf
, key
);
326 Status
SingleDeleteCF(uint32_t cf
, const Slice
& key
) override
{
327 return Rollback(cf
, key
);
330 Status
MergeCF(uint32_t cf
, const Slice
& key
,
331 const Slice
& /*val*/) override
{
332 if (rollback_merge_operands_
) {
333 return Rollback(cf
, key
);
339 Status
MarkNoop(bool) override
{ return Status::OK(); }
340 Status
MarkBeginPrepare(bool) override
{ return Status::OK(); }
341 Status
MarkEndPrepare(const Slice
&) override
{ return Status::OK(); }
342 Status
MarkCommit(const Slice
&) override
{ return Status::OK(); }
343 Status
MarkRollback(const Slice
&) override
{
344 return Status::InvalidArgument();
348 bool WriteAfterCommit() const override
{ return false; }
349 } rollback_handler(db_impl_
, wpt_db_
, read_at_seq
, &rollback_batch
,
350 *cf_comp_map_shared_ptr
.get(), *cf_map_shared_ptr
.get(),
351 wpt_db_
->txn_db_options_
.rollback_merge_operands
,
353 auto s
= GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler
);
357 // The Rollback marker will be used as a batch separator
358 s
= WriteBatchInternal::MarkRollback(&rollback_batch
, name_
);
360 bool do_one_write
= !db_impl_
->immutable_db_options().two_write_queues
;
361 const bool DISABLE_MEMTABLE
= true;
362 const uint64_t NO_REF_LOG
= 0;
363 uint64_t seq_used
= kMaxSequenceNumber
;
364 const size_t ONE_BATCH
= 1;
365 const bool kFirstPrepareBatch
= true;
366 // We commit the rolled back prepared batches. Although this is
367 // counter-intuitive, i) it is safe to do so, since the prepared batches are
368 // already canceled out by the rollback batch, ii) adding the commit entry to
369 // CommitCache will allow us to benefit from the existing mechanism in
370 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
371 // with a live snapshot around so that the live snapshot properly skips the
372 // entry even if its prepare seq is lower than max_evicted_seq_.
373 AddPreparedCallback
add_prepared_callback(
374 wpt_db_
, db_impl_
, ONE_BATCH
,
375 db_impl_
->immutable_db_options().two_write_queues
, !kFirstPrepareBatch
);
376 WritePreparedCommitEntryPreReleaseCallback
update_commit_map(
377 wpt_db_
, db_impl_
, GetId(), prepare_batch_cnt_
, ONE_BATCH
);
378 PreReleaseCallback
* pre_release_callback
;
380 pre_release_callback
= &update_commit_map
;
382 pre_release_callback
= &add_prepared_callback
;
384 // Note: the rollback batch does not need AddPrepared since it is written to
385 // DB in one shot. min_uncommitted still works since it requires capturing
386 // data that is written to DB but not yet committed, while
387 // the rollback batch commits with PreReleaseCallback.
388 s
= db_impl_
->WriteImpl(write_options_
, &rollback_batch
, nullptr, nullptr,
389 NO_REF_LOG
, !DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
390 pre_release_callback
);
391 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
396 assert(!db_impl_
->immutable_db_options().two_write_queues
);
397 wpt_db_
->RemovePrepared(GetId(), prepare_batch_cnt_
);
399 } // else do the 2nd write for commit
400 uint64_t rollback_seq
= seq_used
;
401 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
402 "RollbackInternal 2nd write rollback_seq: %" PRIu64
,
404 // Commit the batch by writing an empty batch to the queue that will release
405 // the commit sequence number to readers.
406 WritePreparedRollbackPreReleaseCallback
update_commit_map_with_prepare(
407 wpt_db_
, db_impl_
, GetId(), rollback_seq
, prepare_batch_cnt_
);
408 WriteBatch empty_batch
;
409 s
= empty_batch
.PutLogData(Slice());
411 // In the absence of Prepare markers, use Noop as a batch separator
412 s
= WriteBatchInternal::InsertNoop(&empty_batch
);
414 s
= db_impl_
->WriteImpl(write_options_
, &empty_batch
, nullptr, nullptr,
415 NO_REF_LOG
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
416 &update_commit_map_with_prepare
);
417 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
418 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
419 "RollbackInternal (status=%s) commit: %" PRIu64
,
420 s
.ToString().c_str(), GetId());
421 // TODO(lth): For WriteUnPrepared that rollback is called frequently,
422 // RemovePrepared could be moved to the callback to reduce lock contention.
424 wpt_db_
->RemovePrepared(GetId(), prepare_batch_cnt_
);
426 // Note: RemovePrepared for prepared batch is called from within
427 // PreReleaseCallback
428 wpt_db_
->RemovePrepared(rollback_seq
, ONE_BATCH
);
433 Status
WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle
* column_family
,
435 SequenceNumber
* tracked_at_seq
) {
438 SequenceNumber min_uncommitted
=
439 static_cast_with_check
<const SnapshotImpl
>(snapshot_
.get())
441 SequenceNumber snap_seq
= snapshot_
->GetSequenceNumber();
442 // tracked_at_seq is either max or the last snapshot with which this key was
443 // trackeed so there is no need to apply the IsInSnapshot to this comparison
444 // here as tracked_at_seq is not a prepare seq.
445 if (*tracked_at_seq
<= snap_seq
) {
446 // If the key has been previous validated at a sequence number earlier
447 // than the curent snapshot's sequence number, we already know it has not
452 *tracked_at_seq
= snap_seq
;
454 ColumnFamilyHandle
* cfh
=
455 column_family
? column_family
: db_impl_
->DefaultColumnFamily();
457 WritePreparedTxnReadCallback
snap_checker(wpt_db_
, snap_seq
, min_uncommitted
,
458 kBackedByDBSnapshot
);
459 return TransactionUtil::CheckKeyForConflicts(db_impl_
, cfh
, key
.ToString(),
460 snap_seq
, false /* cache_only */,
461 &snap_checker
, min_uncommitted
);
464 void WritePreparedTxn::SetSnapshot() {
465 const bool kForWWConflictCheck
= true;
466 SnapshotImpl
* snapshot
= wpt_db_
->GetSnapshotInternal(kForWWConflictCheck
);
467 SetSnapshotInternal(snapshot
);
470 Status
WritePreparedTxn::RebuildFromWriteBatch(WriteBatch
* src_batch
) {
471 auto ret
= PessimisticTransaction::RebuildFromWriteBatch(src_batch
);
472 prepare_batch_cnt_
= GetWriteBatch()->SubBatchCnt();
476 } // namespace ROCKSDB_NAMESPACE
478 #endif // ROCKSDB_LITE