1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/write_prepared_txn.h"
14 #include "db/column_family.h"
15 #include "db/db_impl/db_impl.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/status.h"
18 #include "rocksdb/utilities/transaction_db.h"
19 #include "util/cast_util.h"
20 #include "utilities/transactions/pessimistic_transaction.h"
21 #include "utilities/transactions/write_prepared_txn_db.h"
23 namespace ROCKSDB_NAMESPACE
{
27 WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB
* txn_db
,
28 const WriteOptions
& write_options
,
29 const TransactionOptions
& txn_options
)
30 : PessimisticTransaction(txn_db
, write_options
, txn_options
, false),
32 // Call Initialize outside PessimisticTransaction constructor otherwise it
33 // would skip overridden functions in WritePreparedTxn since they are not
34 // defined yet in the constructor of PessimisticTransaction
35 Initialize(txn_options
);
38 void WritePreparedTxn::Initialize(const TransactionOptions
& txn_options
) {
39 PessimisticTransaction::Initialize(txn_options
);
40 prepare_batch_cnt_
= 0;
43 void WritePreparedTxn::MultiGet(const ReadOptions
& options
,
44 ColumnFamilyHandle
* column_family
,
45 const size_t num_keys
, const Slice
* keys
,
46 PinnableSlice
* values
, Status
* statuses
,
47 const bool sorted_input
) {
48 SequenceNumber min_uncommitted
, snap_seq
;
49 const SnapshotBackup backed_by_snapshot
=
50 wpt_db_
->AssignMinMaxSeqs(options
.snapshot
, &min_uncommitted
, &snap_seq
);
51 WritePreparedTxnReadCallback
callback(wpt_db_
, snap_seq
, min_uncommitted
,
53 write_batch_
.MultiGetFromBatchAndDB(db_
, options
, column_family
, num_keys
,
54 keys
, values
, statuses
, sorted_input
,
56 if (UNLIKELY(!callback
.valid() ||
57 !wpt_db_
->ValidateSnapshot(snap_seq
, backed_by_snapshot
))) {
58 wpt_db_
->WPRecordTick(TXN_GET_TRY_AGAIN
);
59 for (size_t i
= 0; i
< num_keys
; i
++) {
60 statuses
[i
] = Status::TryAgain();
65 Status
WritePreparedTxn::Get(const ReadOptions
& options
,
66 ColumnFamilyHandle
* column_family
,
67 const Slice
& key
, PinnableSlice
* pinnable_val
) {
68 SequenceNumber min_uncommitted
, snap_seq
;
69 const SnapshotBackup backed_by_snapshot
=
70 wpt_db_
->AssignMinMaxSeqs(options
.snapshot
, &min_uncommitted
, &snap_seq
);
71 WritePreparedTxnReadCallback
callback(wpt_db_
, snap_seq
, min_uncommitted
,
73 Status res
= write_batch_
.GetFromBatchAndDB(db_
, options
, column_family
, key
,
74 pinnable_val
, &callback
);
75 const bool callback_valid
=
76 callback
.valid(); // NOTE: validity of callback must always be checked
77 // before it is destructed
79 if (!LIKELY(callback_valid
&&
80 wpt_db_
->ValidateSnapshot(callback
.max_visible_seq(),
81 backed_by_snapshot
))) {
82 wpt_db_
->WPRecordTick(TXN_GET_TRY_AGAIN
);
83 res
= Status::TryAgain();
90 Iterator
* WritePreparedTxn::GetIterator(const ReadOptions
& options
) {
91 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
92 Iterator
* db_iter
= wpt_db_
->NewIterator(options
);
95 return write_batch_
.NewIteratorWithBase(db_iter
);
98 Iterator
* WritePreparedTxn::GetIterator(const ReadOptions
& options
,
99 ColumnFamilyHandle
* column_family
) {
100 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
101 Iterator
* db_iter
= wpt_db_
->NewIterator(options
, column_family
);
104 return write_batch_
.NewIteratorWithBase(column_family
, db_iter
);
107 Status
WritePreparedTxn::PrepareInternal() {
108 WriteOptions write_options
= write_options_
;
109 write_options
.disableWAL
= false;
110 const bool WRITE_AFTER_COMMIT
= true;
111 const bool kFirstPrepareBatch
= true;
112 auto s
= WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
113 name_
, !WRITE_AFTER_COMMIT
);
115 // For each duplicate key we account for a new sub-batch
116 prepare_batch_cnt_
= GetWriteBatch()->SubBatchCnt();
117 // Having AddPrepared in the PreReleaseCallback allows in-order addition of
118 // prepared entries to PreparedHeap and hence enables an optimization. Refer
119 // to SmallestUnCommittedSeq for more details.
120 AddPreparedCallback
add_prepared_callback(
121 wpt_db_
, db_impl_
, prepare_batch_cnt_
,
122 db_impl_
->immutable_db_options().two_write_queues
, kFirstPrepareBatch
);
123 const bool DISABLE_MEMTABLE
= true;
124 uint64_t seq_used
= kMaxSequenceNumber
;
125 s
= db_impl_
->WriteImpl(write_options
, GetWriteBatch()->GetWriteBatch(),
126 /*callback*/ nullptr, &log_number_
, /*log ref*/ 0,
127 !DISABLE_MEMTABLE
, &seq_used
, prepare_batch_cnt_
,
128 &add_prepared_callback
);
129 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
130 auto prepare_seq
= seq_used
;
135 Status
WritePreparedTxn::CommitWithoutPrepareInternal() {
136 // For each duplicate key we account for a new sub-batch
137 const size_t batch_cnt
= GetWriteBatch()->SubBatchCnt();
138 return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt
);
141 Status
WritePreparedTxn::CommitBatchInternal(WriteBatch
* batch
,
143 return wpt_db_
->WriteInternal(write_options_
, batch
, batch_cnt
, this);
146 Status
WritePreparedTxn::CommitInternal() {
147 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
148 "CommitInternal prepare_seq: %" PRIu64
, GetID());
149 // We take the commit-time batch and append the Commit marker.
150 // The Memtable will ignore the Commit marker in non-recovery mode
151 WriteBatch
* working_batch
= GetCommitTimeWriteBatch();
152 const bool empty
= working_batch
->Count() == 0;
153 auto s
= WriteBatchInternal::MarkCommit(working_batch
, name_
);
156 const bool for_recovery
= use_only_the_last_commit_time_batch_for_recovery_
;
158 // When not writing to memtable, we can still cache the latest write batch.
159 // The cached batch will be written to memtable in WriteRecoverableState
160 // during FlushMemTable
162 WriteBatchInternal::SetAsLatestPersistentState(working_batch
);
164 return Status::InvalidArgument(
165 "Commit-time-batch can only be used if "
166 "use_only_the_last_commit_time_batch_for_recovery is true");
170 auto prepare_seq
= GetId();
171 const bool includes_data
= !empty
&& !for_recovery
;
172 assert(prepare_batch_cnt_
);
173 size_t commit_batch_cnt
= 0;
174 if (UNLIKELY(includes_data
)) {
175 ROCKS_LOG_WARN(db_impl_
->immutable_db_options().info_log
,
176 "Duplicate key overhead");
177 SubBatchCounter
counter(*wpt_db_
->GetCFComparatorMap());
178 s
= working_batch
->Iterate(&counter
);
180 commit_batch_cnt
= counter
.BatchCount();
182 const bool disable_memtable
= !includes_data
;
183 const bool do_one_write
=
184 !db_impl_
->immutable_db_options().two_write_queues
|| disable_memtable
;
185 WritePreparedCommitEntryPreReleaseCallback
update_commit_map(
186 wpt_db_
, db_impl_
, prepare_seq
, prepare_batch_cnt_
, commit_batch_cnt
);
187 // This is to call AddPrepared on CommitTimeWriteBatch
188 const bool kFirstPrepareBatch
= true;
189 AddPreparedCallback
add_prepared_callback(
190 wpt_db_
, db_impl_
, commit_batch_cnt
,
191 db_impl_
->immutable_db_options().two_write_queues
, !kFirstPrepareBatch
);
192 PreReleaseCallback
* pre_release_callback
;
194 pre_release_callback
= &update_commit_map
;
196 pre_release_callback
= &add_prepared_callback
;
198 uint64_t seq_used
= kMaxSequenceNumber
;
199 // Since the prepared batch is directly written to memtable, there is already
200 // a connection between the memtable and its WAL, so there is no need to
201 // redundantly reference the log that contains the prepared data.
202 const uint64_t zero_log_number
= 0ull;
203 size_t batch_cnt
= UNLIKELY(commit_batch_cnt
) ? commit_batch_cnt
: 1;
204 // If `two_write_queues && includes_data`, then `do_one_write` is false. The
205 // following `WriteImpl` will insert the data of the commit-time-batch into
206 // the database before updating the commit cache. Therefore, the data of the
207 // commmit-time-batch is considered uncommitted. Furthermore, since data of
208 // the commit-time-batch are not locked, it is possible for two uncommitted
209 // versions of the same key to co-exist for a (short) period of time until
210 // the commit cache is updated by the second write. If the two uncommitted
211 // keys are compacted to the bottommost level in the meantime, it is possible
212 // that compaction iterator will zero out the sequence numbers of both, thus
213 // violating the invariant that an SST does not have two identical internal
214 // keys. To prevent this situation, we should allow the usage of
215 // commit-time-batch only if the user sets
216 // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery to
217 // true. See the comments about GetCommitTimeWriteBatch() in
218 // include/rocksdb/utilities/transaction.h.
219 s
= db_impl_
->WriteImpl(write_options_
, working_batch
, nullptr, nullptr,
220 zero_log_number
, disable_memtable
, &seq_used
,
221 batch_cnt
, pre_release_callback
);
222 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
223 const SequenceNumber commit_batch_seq
= seq_used
;
224 if (LIKELY(do_one_write
|| !s
.ok())) {
225 if (UNLIKELY(!db_impl_
->immutable_db_options().two_write_queues
&&
227 // Note: RemovePrepared should be called after WriteImpl that publishsed
228 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
229 wpt_db_
->RemovePrepared(prepare_seq
, prepare_batch_cnt_
);
230 } // else RemovePrepared is called from within PreReleaseCallback
231 if (UNLIKELY(!do_one_write
)) {
233 // Cleanup the prepared entry we added with add_prepared_callback
234 wpt_db_
->RemovePrepared(commit_batch_seq
, commit_batch_cnt
);
237 } // else do the 2nd write to publish seq
238 // Note: the 2nd write comes with a performance penality. So if we have too
239 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
240 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
241 // two_write_queues should be disabled to avoid many additional writes here.
242 const size_t kZeroData
= 0;
243 // Update commit map only from the 2nd queue
244 WritePreparedCommitEntryPreReleaseCallback
update_commit_map_with_aux_batch(
245 wpt_db_
, db_impl_
, prepare_seq
, prepare_batch_cnt_
, kZeroData
,
246 commit_batch_seq
, commit_batch_cnt
);
247 WriteBatch empty_batch
;
248 s
= empty_batch
.PutLogData(Slice());
250 // In the absence of Prepare markers, use Noop as a batch separator
251 s
= WriteBatchInternal::InsertNoop(&empty_batch
);
253 const bool DISABLE_MEMTABLE
= true;
254 const size_t ONE_BATCH
= 1;
255 const uint64_t NO_REF_LOG
= 0;
256 s
= db_impl_
->WriteImpl(write_options_
, &empty_batch
, nullptr, nullptr,
257 NO_REF_LOG
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
258 &update_commit_map_with_aux_batch
);
259 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
263 Status
WritePreparedTxn::RollbackInternal() {
264 ROCKS_LOG_WARN(db_impl_
->immutable_db_options().info_log
,
265 "RollbackInternal prepare_seq: %" PRIu64
, GetId());
270 WriteBatch
rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */,
271 write_options_
.protection_bytes_per_key
,
272 0 /* default_cf_ts_sz */);
273 assert(GetId() != kMaxSequenceNumber
);
275 auto cf_map_shared_ptr
= wpt_db_
->GetCFHandleMap();
276 auto cf_comp_map_shared_ptr
= wpt_db_
->GetCFComparatorMap();
277 auto read_at_seq
= kMaxSequenceNumber
;
278 ReadOptions roptions
;
279 // to prevent callback's seq to be overrriden inside DBImpk::Get
280 roptions
.snapshot
= wpt_db_
->GetMaxSnapshot();
281 struct RollbackWriteBatchBuilder
: public WriteBatch::Handler
{
283 WritePreparedTxnDB
* const wpt_db_
;
284 WritePreparedTxnReadCallback callback_
;
285 WriteBatch
* rollback_batch_
;
286 std::map
<uint32_t, const Comparator
*>& comparators_
;
287 std::map
<uint32_t, ColumnFamilyHandle
*>& handles_
;
288 using CFKeys
= std::set
<Slice
, SetComparator
>;
289 std::map
<uint32_t, CFKeys
> keys_
;
290 bool rollback_merge_operands_
;
291 ReadOptions roptions_
;
293 RollbackWriteBatchBuilder(
294 DBImpl
* db
, WritePreparedTxnDB
* wpt_db
, SequenceNumber snap_seq
,
295 WriteBatch
* dst_batch
,
296 std::map
<uint32_t, const Comparator
*>& comparators
,
297 std::map
<uint32_t, ColumnFamilyHandle
*>& handles
,
298 bool rollback_merge_operands
, const ReadOptions
& _roptions
)
301 callback_(wpt_db
, snap_seq
), // disable min_uncommitted optimization
302 rollback_batch_(dst_batch
),
303 comparators_(comparators
),
305 rollback_merge_operands_(rollback_merge_operands
),
306 roptions_(_roptions
) {}
308 Status
Rollback(uint32_t cf
, const Slice
& key
) {
310 CFKeys
& cf_keys
= keys_
[cf
];
311 if (cf_keys
.size() == 0) { // just inserted
312 auto cmp
= comparators_
[cf
];
313 keys_
[cf
] = CFKeys(SetComparator(cmp
));
315 auto it
= cf_keys
.insert(key
);
316 // second is false if a element already existed.
317 if (it
.second
== false) {
321 PinnableSlice pinnable_val
;
323 auto cf_handle
= handles_
[cf
];
324 DBImpl::GetImplOptions get_impl_options
;
325 get_impl_options
.column_family
= cf_handle
;
326 get_impl_options
.value
= &pinnable_val
;
327 get_impl_options
.value_found
= ¬_used
;
328 get_impl_options
.callback
= &callback_
;
329 s
= db_
->GetImpl(roptions_
, key
, get_impl_options
);
330 assert(s
.ok() || s
.IsNotFound());
332 s
= rollback_batch_
->Put(cf_handle
, key
, pinnable_val
);
334 } else if (s
.IsNotFound()) {
335 // There has been no readable value before txn. By adding a delete we
336 // make sure that there will be none afterwards either.
337 if (wpt_db_
->ShouldRollbackWithSingleDelete(cf_handle
, key
)) {
338 s
= rollback_batch_
->SingleDelete(cf_handle
, key
);
340 s
= rollback_batch_
->Delete(cf_handle
, key
);
344 // Unexpected status. Return it to the user.
349 Status
PutCF(uint32_t cf
, const Slice
& key
, const Slice
& /*val*/) override
{
350 return Rollback(cf
, key
);
353 Status
DeleteCF(uint32_t cf
, const Slice
& key
) override
{
354 return Rollback(cf
, key
);
357 Status
SingleDeleteCF(uint32_t cf
, const Slice
& key
) override
{
358 return Rollback(cf
, key
);
361 Status
MergeCF(uint32_t cf
, const Slice
& key
,
362 const Slice
& /*val*/) override
{
363 if (rollback_merge_operands_
) {
364 return Rollback(cf
, key
);
370 Status
MarkNoop(bool) override
{ return Status::OK(); }
371 Status
MarkBeginPrepare(bool) override
{ return Status::OK(); }
372 Status
MarkEndPrepare(const Slice
&) override
{ return Status::OK(); }
373 Status
MarkCommit(const Slice
&) override
{ return Status::OK(); }
374 Status
MarkRollback(const Slice
&) override
{
375 return Status::InvalidArgument();
379 Handler::OptionState
WriteAfterCommit() const override
{
380 return Handler::OptionState::kDisabled
;
382 } rollback_handler(db_impl_
, wpt_db_
, read_at_seq
, &rollback_batch
,
383 *cf_comp_map_shared_ptr
.get(), *cf_map_shared_ptr
.get(),
384 wpt_db_
->txn_db_options_
.rollback_merge_operands
,
386 auto s
= GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler
);
390 // The Rollback marker will be used as a batch separator
391 s
= WriteBatchInternal::MarkRollback(&rollback_batch
, name_
);
393 bool do_one_write
= !db_impl_
->immutable_db_options().two_write_queues
;
394 const bool DISABLE_MEMTABLE
= true;
395 const uint64_t NO_REF_LOG
= 0;
396 uint64_t seq_used
= kMaxSequenceNumber
;
397 const size_t ONE_BATCH
= 1;
398 const bool kFirstPrepareBatch
= true;
399 // We commit the rolled back prepared batches. Although this is
400 // counter-intuitive, i) it is safe to do so, since the prepared batches are
401 // already canceled out by the rollback batch, ii) adding the commit entry to
402 // CommitCache will allow us to benefit from the existing mechanism in
403 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
404 // with a live snapshot around so that the live snapshot properly skips the
405 // entry even if its prepare seq is lower than max_evicted_seq_.
406 AddPreparedCallback
add_prepared_callback(
407 wpt_db_
, db_impl_
, ONE_BATCH
,
408 db_impl_
->immutable_db_options().two_write_queues
, !kFirstPrepareBatch
);
409 WritePreparedCommitEntryPreReleaseCallback
update_commit_map(
410 wpt_db_
, db_impl_
, GetId(), prepare_batch_cnt_
, ONE_BATCH
);
411 PreReleaseCallback
* pre_release_callback
;
413 pre_release_callback
= &update_commit_map
;
415 pre_release_callback
= &add_prepared_callback
;
417 // Note: the rollback batch does not need AddPrepared since it is written to
418 // DB in one shot. min_uncommitted still works since it requires capturing
419 // data that is written to DB but not yet committed, while
420 // the rollback batch commits with PreReleaseCallback.
421 s
= db_impl_
->WriteImpl(write_options_
, &rollback_batch
, nullptr, nullptr,
422 NO_REF_LOG
, !DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
423 pre_release_callback
);
424 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
429 assert(!db_impl_
->immutable_db_options().two_write_queues
);
430 wpt_db_
->RemovePrepared(GetId(), prepare_batch_cnt_
);
432 } // else do the 2nd write for commit
433 uint64_t rollback_seq
= seq_used
;
434 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
435 "RollbackInternal 2nd write rollback_seq: %" PRIu64
,
437 // Commit the batch by writing an empty batch to the queue that will release
438 // the commit sequence number to readers.
439 WritePreparedRollbackPreReleaseCallback
update_commit_map_with_prepare(
440 wpt_db_
, db_impl_
, GetId(), rollback_seq
, prepare_batch_cnt_
);
441 WriteBatch empty_batch
;
442 s
= empty_batch
.PutLogData(Slice());
444 // In the absence of Prepare markers, use Noop as a batch separator
445 s
= WriteBatchInternal::InsertNoop(&empty_batch
);
447 s
= db_impl_
->WriteImpl(write_options_
, &empty_batch
, nullptr, nullptr,
448 NO_REF_LOG
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
449 &update_commit_map_with_prepare
);
450 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
451 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
452 "RollbackInternal (status=%s) commit: %" PRIu64
,
453 s
.ToString().c_str(), GetId());
454 // TODO(lth): For WriteUnPrepared that rollback is called frequently,
455 // RemovePrepared could be moved to the callback to reduce lock contention.
457 wpt_db_
->RemovePrepared(GetId(), prepare_batch_cnt_
);
459 // Note: RemovePrepared for prepared batch is called from within
460 // PreReleaseCallback
461 wpt_db_
->RemovePrepared(rollback_seq
, ONE_BATCH
);
466 Status
WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle
* column_family
,
468 SequenceNumber
* tracked_at_seq
) {
471 SequenceNumber min_uncommitted
=
472 static_cast_with_check
<const SnapshotImpl
>(snapshot_
.get())
474 SequenceNumber snap_seq
= snapshot_
->GetSequenceNumber();
475 // tracked_at_seq is either max or the last snapshot with which this key was
476 // trackeed so there is no need to apply the IsInSnapshot to this comparison
477 // here as tracked_at_seq is not a prepare seq.
478 if (*tracked_at_seq
<= snap_seq
) {
479 // If the key has been previous validated at a sequence number earlier
480 // than the curent snapshot's sequence number, we already know it has not
485 *tracked_at_seq
= snap_seq
;
487 ColumnFamilyHandle
* cfh
=
488 column_family
? column_family
: db_impl_
->DefaultColumnFamily();
490 WritePreparedTxnReadCallback
snap_checker(wpt_db_
, snap_seq
, min_uncommitted
,
491 kBackedByDBSnapshot
);
492 // TODO(yanqin): support user-defined timestamp
493 return TransactionUtil::CheckKeyForConflicts(
494 db_impl_
, cfh
, key
.ToString(), snap_seq
, /*ts=*/nullptr,
495 false /* cache_only */, &snap_checker
, min_uncommitted
);
498 void WritePreparedTxn::SetSnapshot() {
499 const bool kForWWConflictCheck
= true;
500 SnapshotImpl
* snapshot
= wpt_db_
->GetSnapshotInternal(kForWWConflictCheck
);
501 SetSnapshotInternal(snapshot
);
504 Status
WritePreparedTxn::RebuildFromWriteBatch(WriteBatch
* src_batch
) {
505 auto ret
= PessimisticTransaction::RebuildFromWriteBatch(src_batch
);
506 prepare_batch_cnt_
= GetWriteBatch()->SubBatchCnt();
510 } // namespace ROCKSDB_NAMESPACE
512 #endif // ROCKSDB_LITE