1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/write_prepared_txn.h"
10 #ifndef __STDC_FORMAT_MACROS
11 #define __STDC_FORMAT_MACROS
18 #include "db/column_family.h"
19 #include "db/db_impl.h"
20 #include "rocksdb/db.h"
21 #include "rocksdb/status.h"
22 #include "rocksdb/utilities/transaction_db.h"
23 #include "util/cast_util.h"
24 #include "utilities/transactions/pessimistic_transaction.h"
25 #include "utilities/transactions/write_prepared_txn_db.h"
31 WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB
* txn_db
,
32 const WriteOptions
& write_options
,
33 const TransactionOptions
& txn_options
)
34 : PessimisticTransaction(txn_db
, write_options
, txn_options
),
37 void WritePreparedTxn::Initialize(const TransactionOptions
& txn_options
) {
38 PessimisticTransaction::Initialize(txn_options
);
39 prepare_batch_cnt_
= 0;
42 Status
WritePreparedTxn::Get(const ReadOptions
& read_options
,
43 ColumnFamilyHandle
* column_family
,
44 const Slice
& key
, PinnableSlice
* pinnable_val
) {
45 auto snapshot
= read_options
.snapshot
;
47 snapshot
!= nullptr ? snapshot
->GetSequenceNumber() : kMaxSequenceNumber
;
48 SequenceNumber min_uncommitted
= 0; // by default disable the optimization
49 if (snapshot
!= nullptr) {
51 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(snapshot
)
55 WritePreparedTxnReadCallback
callback(wpt_db_
, snap_seq
, min_uncommitted
);
56 return write_batch_
.GetFromBatchAndDB(db_
, read_options
, column_family
, key
,
57 pinnable_val
, &callback
);
60 Iterator
* WritePreparedTxn::GetIterator(const ReadOptions
& options
) {
61 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
62 Iterator
* db_iter
= wpt_db_
->NewIterator(options
);
65 return write_batch_
.NewIteratorWithBase(db_iter
);
68 Iterator
* WritePreparedTxn::GetIterator(const ReadOptions
& options
,
69 ColumnFamilyHandle
* column_family
) {
70 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
71 Iterator
* db_iter
= wpt_db_
->NewIterator(options
, column_family
);
74 return write_batch_
.NewIteratorWithBase(column_family
, db_iter
);
77 Status
WritePreparedTxn::PrepareInternal() {
78 WriteOptions write_options
= write_options_
;
79 write_options
.disableWAL
= false;
80 const bool WRITE_AFTER_COMMIT
= true;
81 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_
,
83 // For each duplicate key we account for a new sub-batch
84 prepare_batch_cnt_
= GetWriteBatch()->SubBatchCnt();
85 // AddPrepared better to be called in the pre-release callback otherwise there
86 // is a non-zero chance of max advancing prepare_seq and readers assume the
88 // Also having it in the PreReleaseCallback allows in-order addition of
89 // prepared entries to PrepareHeap and hence enables an optimization. Refer to
90 // SmallestUnCommittedSeq for more details.
91 AddPreparedCallback
add_prepared_callback(
92 wpt_db_
, prepare_batch_cnt_
,
93 db_impl_
->immutable_db_options().two_write_queues
);
94 const bool DISABLE_MEMTABLE
= true;
95 uint64_t seq_used
= kMaxSequenceNumber
;
96 Status s
= db_impl_
->WriteImpl(
97 write_options
, GetWriteBatch()->GetWriteBatch(),
98 /*callback*/ nullptr, &log_number_
, /*log ref*/ 0, !DISABLE_MEMTABLE
,
99 &seq_used
, prepare_batch_cnt_
, &add_prepared_callback
);
100 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
101 auto prepare_seq
= seq_used
;
106 Status
WritePreparedTxn::CommitWithoutPrepareInternal() {
107 // For each duplicate key we account for a new sub-batch
108 const size_t batch_cnt
= GetWriteBatch()->SubBatchCnt();
109 return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt
);
112 Status
WritePreparedTxn::CommitBatchInternal(WriteBatch
* batch
,
114 return wpt_db_
->WriteInternal(write_options_
, batch
, batch_cnt
, this);
117 Status
WritePreparedTxn::CommitInternal() {
118 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
119 "CommitInternal prepare_seq: %" PRIu64
, GetID());
120 // We take the commit-time batch and append the Commit marker.
121 // The Memtable will ignore the Commit marker in non-recovery mode
122 WriteBatch
* working_batch
= GetCommitTimeWriteBatch();
123 const bool empty
= working_batch
->Count() == 0;
124 WriteBatchInternal::MarkCommit(working_batch
, name_
);
126 const bool for_recovery
= use_only_the_last_commit_time_batch_for_recovery_
;
127 if (!empty
&& for_recovery
) {
128 // When not writing to memtable, we can still cache the latest write batch.
129 // The cached batch will be written to memtable in WriteRecoverableState
130 // during FlushMemTable
131 WriteBatchInternal::SetAsLastestPersistentState(working_batch
);
134 auto prepare_seq
= GetId();
135 const bool includes_data
= !empty
&& !for_recovery
;
136 assert(prepare_batch_cnt_
);
137 size_t commit_batch_cnt
= 0;
138 if (UNLIKELY(includes_data
)) {
139 ROCKS_LOG_WARN(db_impl_
->immutable_db_options().info_log
,
140 "Duplicate key overhead");
141 SubBatchCounter
counter(*wpt_db_
->GetCFComparatorMap());
142 auto s
= working_batch
->Iterate(&counter
);
144 commit_batch_cnt
= counter
.BatchCount();
146 const bool disable_memtable
= !includes_data
;
147 const bool do_one_write
=
148 !db_impl_
->immutable_db_options().two_write_queues
|| disable_memtable
;
149 const bool publish_seq
= do_one_write
;
150 // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
151 // DB in one shot. min_uncommitted still works since it requires capturing
152 // data that is written to DB but not yet committed, while
153 // CommitTimeWriteBatch commits with PreReleaseCallback.
154 WritePreparedCommitEntryPreReleaseCallback
update_commit_map(
155 wpt_db_
, db_impl_
, prepare_seq
, prepare_batch_cnt_
, commit_batch_cnt
,
157 uint64_t seq_used
= kMaxSequenceNumber
;
158 // Since the prepared batch is directly written to memtable, there is already
159 // a connection between the memtable and its WAL, so there is no need to
160 // redundantly reference the log that contains the prepared data.
161 const uint64_t zero_log_number
= 0ull;
162 size_t batch_cnt
= UNLIKELY(commit_batch_cnt
) ? commit_batch_cnt
: 1;
163 auto s
= db_impl_
->WriteImpl(write_options_
, working_batch
, nullptr, nullptr,
164 zero_log_number
, disable_memtable
, &seq_used
,
165 batch_cnt
, &update_commit_map
);
166 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
167 if (LIKELY(do_one_write
|| !s
.ok())) {
168 if (LIKELY(s
.ok())) {
169 // Note RemovePrepared should be called after WriteImpl that publishsed
170 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
171 wpt_db_
->RemovePrepared(prepare_seq
, prepare_batch_cnt_
);
174 } // else do the 2nd write to publish seq
175 // Note: the 2nd write comes with a performance penality. So if we have too
176 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
177 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
178 // two_write_queues should be disabled to avoid many additional writes here.
179 class PublishSeqPreReleaseCallback
: public PreReleaseCallback
{
181 explicit PublishSeqPreReleaseCallback(DBImpl
* db_impl
)
182 : db_impl_(db_impl
) {}
183 virtual Status
Callback(SequenceNumber seq
, bool is_mem_disabled
) override
{
185 (void)is_mem_disabled
;
187 assert(is_mem_disabled
);
188 assert(db_impl_
->immutable_db_options().two_write_queues
);
189 db_impl_
->SetLastPublishedSequence(seq
);
195 } publish_seq_callback(db_impl_
);
196 WriteBatch empty_batch
;
197 empty_batch
.PutLogData(Slice());
198 // In the absence of Prepare markers, use Noop as a batch separator
199 WriteBatchInternal::InsertNoop(&empty_batch
);
200 const bool DISABLE_MEMTABLE
= true;
201 const size_t ONE_BATCH
= 1;
202 const uint64_t NO_REF_LOG
= 0;
203 s
= db_impl_
->WriteImpl(write_options_
, &empty_batch
, nullptr, nullptr,
204 NO_REF_LOG
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
205 &publish_seq_callback
);
206 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
207 // Note RemovePrepared should be called after WriteImpl that publishsed the
208 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
209 wpt_db_
->RemovePrepared(prepare_seq
, prepare_batch_cnt_
);
213 Status
WritePreparedTxn::RollbackInternal() {
214 ROCKS_LOG_WARN(db_impl_
->immutable_db_options().info_log
,
215 "RollbackInternal prepare_seq: %" PRIu64
, GetId());
216 WriteBatch rollback_batch
;
217 assert(GetId() != kMaxSequenceNumber
);
219 auto cf_map_shared_ptr
= wpt_db_
->GetCFHandleMap();
220 auto cf_comp_map_shared_ptr
= wpt_db_
->GetCFComparatorMap();
221 // In WritePrepared, the txn is is the same as prepare seq
222 auto last_visible_txn
= GetId() - 1;
223 struct RollbackWriteBatchBuilder
: public WriteBatch::Handler
{
225 ReadOptions roptions
;
226 WritePreparedTxnReadCallback callback
;
227 WriteBatch
* rollback_batch_
;
228 std::map
<uint32_t, const Comparator
*>& comparators_
;
229 std::map
<uint32_t, ColumnFamilyHandle
*>& handles_
;
230 using CFKeys
= std::set
<Slice
, SetComparator
>;
231 std::map
<uint32_t, CFKeys
> keys_
;
232 bool rollback_merge_operands_
;
233 RollbackWriteBatchBuilder(
234 DBImpl
* db
, WritePreparedTxnDB
* wpt_db
, SequenceNumber snap_seq
,
235 WriteBatch
* dst_batch
,
236 std::map
<uint32_t, const Comparator
*>& comparators
,
237 std::map
<uint32_t, ColumnFamilyHandle
*>& handles
,
238 bool rollback_merge_operands
)
240 callback(wpt_db
, snap_seq
,
241 0), // 0 disables min_uncommitted optimization
242 rollback_batch_(dst_batch
),
243 comparators_(comparators
),
245 rollback_merge_operands_(rollback_merge_operands
) {}
247 Status
Rollback(uint32_t cf
, const Slice
& key
) {
249 CFKeys
& cf_keys
= keys_
[cf
];
250 if (cf_keys
.size() == 0) { // just inserted
251 auto cmp
= comparators_
[cf
];
252 keys_
[cf
] = CFKeys(SetComparator(cmp
));
254 auto it
= cf_keys
.insert(key
);
256 false) { // second is false if a element already existed.
260 PinnableSlice pinnable_val
;
262 auto cf_handle
= handles_
[cf
];
263 s
= db_
->GetImpl(roptions
, cf_handle
, key
, &pinnable_val
, ¬_used
,
265 assert(s
.ok() || s
.IsNotFound());
267 s
= rollback_batch_
->Put(cf_handle
, key
, pinnable_val
);
269 } else if (s
.IsNotFound()) {
270 // There has been no readable value before txn. By adding a delete we
271 // make sure that there will be none afterwards either.
272 s
= rollback_batch_
->Delete(cf_handle
, key
);
275 // Unexpected status. Return it to the user.
280 Status
PutCF(uint32_t cf
, const Slice
& key
, const Slice
& /*val*/) override
{
281 return Rollback(cf
, key
);
284 Status
DeleteCF(uint32_t cf
, const Slice
& key
) override
{
285 return Rollback(cf
, key
);
288 Status
SingleDeleteCF(uint32_t cf
, const Slice
& key
) override
{
289 return Rollback(cf
, key
);
292 Status
MergeCF(uint32_t cf
, const Slice
& key
,
293 const Slice
& /*val*/) override
{
294 if (rollback_merge_operands_
) {
295 return Rollback(cf
, key
);
301 Status
MarkNoop(bool) override
{ return Status::OK(); }
302 Status
MarkBeginPrepare(bool) override
{ return Status::OK(); }
303 Status
MarkEndPrepare(const Slice
&) override
{ return Status::OK(); }
304 Status
MarkCommit(const Slice
&) override
{ return Status::OK(); }
305 Status
MarkRollback(const Slice
&) override
{
306 return Status::InvalidArgument();
310 virtual bool WriteAfterCommit() const override
{ return false; }
311 } rollback_handler(db_impl_
, wpt_db_
, last_visible_txn
, &rollback_batch
,
312 *cf_comp_map_shared_ptr
.get(), *cf_map_shared_ptr
.get(),
313 wpt_db_
->txn_db_options_
.rollback_merge_operands
);
314 auto s
= GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler
);
319 // The Rollback marker will be used as a batch separator
320 WriteBatchInternal::MarkRollback(&rollback_batch
, name_
);
321 bool do_one_write
= !db_impl_
->immutable_db_options().two_write_queues
;
322 const bool DISABLE_MEMTABLE
= true;
323 const uint64_t NO_REF_LOG
= 0;
324 uint64_t seq_used
= kMaxSequenceNumber
;
325 const size_t ONE_BATCH
= 1;
326 // We commit the rolled back prepared batches. ALthough this is
327 // counter-intuitive, i) it is safe to do so, since the prepared batches are
328 // already canceled out by the rollback batch, ii) adding the commit entry to
329 // CommitCache will allow us to benefit from the existing mechanism in
330 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
331 // with a live snapshot around so that the live snapshot properly skips the
332 // entry even if its prepare seq is lower than max_evicted_seq_.
333 WritePreparedCommitEntryPreReleaseCallback
update_commit_map(
334 wpt_db_
, db_impl_
, GetId(), prepare_batch_cnt_
, ONE_BATCH
);
335 // Note: the rollback batch does not need AddPrepared since it is written to
336 // DB in one shot. min_uncommitted still works since it requires capturing
337 // data that is written to DB but not yet committed, while
338 // the roolback batch commits with PreReleaseCallback.
339 s
= db_impl_
->WriteImpl(write_options_
, &rollback_batch
, nullptr, nullptr,
340 NO_REF_LOG
, !DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
341 do_one_write
? &update_commit_map
: nullptr);
342 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
347 wpt_db_
->RemovePrepared(GetId(), prepare_batch_cnt_
);
349 } // else do the 2nd write for commit
350 uint64_t& prepare_seq
= seq_used
;
351 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
352 "RollbackInternal 2nd write prepare_seq: %" PRIu64
,
354 // Commit the batch by writing an empty batch to the queue that will release
355 // the commit sequence number to readers.
356 const size_t ZERO_COMMITS
= 0;
357 WritePreparedCommitEntryPreReleaseCallback
update_commit_map_with_prepare(
358 wpt_db_
, db_impl_
, prepare_seq
, ONE_BATCH
, ZERO_COMMITS
);
359 WriteBatch empty_batch
;
360 empty_batch
.PutLogData(Slice());
361 // In the absence of Prepare markers, use Noop as a batch separator
362 WriteBatchInternal::InsertNoop(&empty_batch
);
363 s
= db_impl_
->WriteImpl(write_options_
, &empty_batch
, nullptr, nullptr,
364 NO_REF_LOG
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
365 &update_commit_map_with_prepare
);
366 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
367 // Mark the txn as rolled back
368 uint64_t& rollback_seq
= seq_used
;
370 // Note: it is safe to do it after PreReleaseCallback via WriteImpl since
371 // all the writes by the prpared batch are already blinded by the rollback
372 // batch. The only reason we commit the prepared batch here is to benefit
373 // from the existing mechanism in CommitCache that takes care of the rare
374 // cases that the prepare seq is visible to a snsapshot but max evicted seq
375 // advances that prepare seq.
376 for (size_t i
= 0; i
< prepare_batch_cnt_
; i
++) {
377 wpt_db_
->AddCommitted(GetId() + i
, rollback_seq
);
379 wpt_db_
->RemovePrepared(GetId(), prepare_batch_cnt_
);
385 Status
WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle
* column_family
,
387 SequenceNumber
* tracked_at_seq
) {
390 SequenceNumber min_uncommitted
=
391 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(
394 SequenceNumber snap_seq
= snapshot_
->GetSequenceNumber();
395 // tracked_at_seq is either max or the last snapshot with which this key was
396 // trackeed so there is no need to apply the IsInSnapshot to this comparison
397 // here as tracked_at_seq is not a prepare seq.
398 if (*tracked_at_seq
<= snap_seq
) {
399 // If the key has been previous validated at a sequence number earlier
400 // than the curent snapshot's sequence number, we already know it has not
405 *tracked_at_seq
= snap_seq
;
407 ColumnFamilyHandle
* cfh
=
408 column_family
? column_family
: db_impl_
->DefaultColumnFamily();
410 WritePreparedTxnReadCallback
snap_checker(wpt_db_
, snap_seq
, min_uncommitted
);
411 return TransactionUtil::CheckKeyForConflicts(db_impl_
, cfh
, key
.ToString(),
412 snap_seq
, false /* cache_only */,
416 void WritePreparedTxn::SetSnapshot() {
417 // Note: for this optimization setting the last sequence number and obtaining
418 // the smallest uncommitted seq should be done atomically. However to avoid
419 // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
420 // snapshot. Since we always updated the list of unprepared seq (via
421 // AddPrepared) AFTER the last sequence is updated, this guarantees that the
422 // smallest uncommited seq that we pair with the snapshot is smaller or equal
423 // the value that would be obtained otherwise atomically. That is ok since
424 // this optimization works as long as min_uncommitted is less than or equal
425 // than the smallest uncommitted seq when the snapshot was taken.
426 auto min_uncommitted
= wpt_db_
->SmallestUnCommittedSeq();
427 const bool FOR_WW_CONFLICT_CHECK
= true;
428 SnapshotImpl
* snapshot
= dbimpl_
->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK
);
430 wpt_db_
->EnhanceSnapshot(snapshot
, min_uncommitted
);
431 SetSnapshotInternal(snapshot
);
434 Status
WritePreparedTxn::RebuildFromWriteBatch(WriteBatch
* src_batch
) {
435 auto ret
= PessimisticTransaction::RebuildFromWriteBatch(src_batch
);
436 prepare_batch_cnt_
= GetWriteBatch()->SubBatchCnt();
440 } // namespace rocksdb
442 #endif // ROCKSDB_LITE