1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/write_unprepared_txn.h"
9 #include "db/db_impl.h"
10 #include "util/cast_util.h"
11 #include "utilities/transactions/write_unprepared_txn_db.h"
13 #ifndef __STDC_FORMAT_MACROS
14 #define __STDC_FORMAT_MACROS
19 bool WriteUnpreparedTxnReadCallback::IsVisible(SequenceNumber seq
) {
20 auto unprep_seqs
= txn_
->GetUnpreparedSequenceNumbers();
22 // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
23 // in unprep_seqs, we have to check if seq is equal to prep_seq or any of
24 // the prepare_batch_cnt seq nums after it.
26 // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
28 for (const auto& it
: unprep_seqs
) {
29 if (it
.first
<= seq
&& seq
< it
.first
+ it
.second
) {
34 return db_
->IsInSnapshot(seq
, snapshot_
, min_uncommitted_
);
37 SequenceNumber
WriteUnpreparedTxnReadCallback::MaxUnpreparedSequenceNumber() {
38 auto unprep_seqs
= txn_
->GetUnpreparedSequenceNumbers();
39 if (unprep_seqs
.size()) {
40 return unprep_seqs
.rbegin()->first
+ unprep_seqs
.rbegin()->second
- 1;
46 WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB
* txn_db
,
47 const WriteOptions
& write_options
,
48 const TransactionOptions
& txn_options
)
49 : WritePreparedTxn(txn_db
, write_options
, txn_options
), wupt_db_(txn_db
) {
50 max_write_batch_size_
= txn_options
.max_write_batch_size
;
51 // We set max bytes to zero so that we don't get a memory limit error.
52 // Instead of trying to keep write batch strictly under the size limit, we
53 // just flush to DB when the limit is exceeded in write unprepared, to avoid
54 // having retry logic. This also allows very big key-value pairs that exceed
55 // max bytes to succeed.
56 write_batch_
.SetMaxBytes(0);
59 WriteUnpreparedTxn::~WriteUnpreparedTxn() {
60 if (!unprep_seqs_
.empty()) {
61 assert(log_number_
> 0);
63 assert(!name_
.empty());
65 // We should rollback regardless of GetState, but some unit tests that
66 // test crash recovery run the destructor assuming that rollback does not
67 // happen, so that rollback during recovery can be exercised.
68 if (GetState() == STARTED
) {
69 auto s
__attribute__((__unused__
)) = RollbackInternal();
70 // TODO(lth): Better error handling.
72 dbimpl_
->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
78 void WriteUnpreparedTxn::Initialize(const TransactionOptions
& txn_options
) {
79 PessimisticTransaction::Initialize(txn_options
);
80 max_write_batch_size_
= txn_options
.max_write_batch_size
;
81 write_batch_
.SetMaxBytes(0);
83 write_set_keys_
.clear();
86 Status
WriteUnpreparedTxn::Put(ColumnFamilyHandle
* column_family
,
87 const Slice
& key
, const Slice
& value
) {
88 Status s
= MaybeFlushWriteBatchToDB();
92 return TransactionBaseImpl::Put(column_family
, key
, value
);
95 Status
WriteUnpreparedTxn::Put(ColumnFamilyHandle
* column_family
,
96 const SliceParts
& key
, const SliceParts
& value
) {
97 Status s
= MaybeFlushWriteBatchToDB();
101 return TransactionBaseImpl::Put(column_family
, key
, value
);
104 Status
WriteUnpreparedTxn::Merge(ColumnFamilyHandle
* column_family
,
105 const Slice
& key
, const Slice
& value
) {
106 Status s
= MaybeFlushWriteBatchToDB();
110 return TransactionBaseImpl::Merge(column_family
, key
, value
);
113 Status
WriteUnpreparedTxn::Delete(ColumnFamilyHandle
* column_family
,
115 Status s
= MaybeFlushWriteBatchToDB();
119 return TransactionBaseImpl::Delete(column_family
, key
);
122 Status
WriteUnpreparedTxn::Delete(ColumnFamilyHandle
* column_family
,
123 const SliceParts
& key
) {
124 Status s
= MaybeFlushWriteBatchToDB();
128 return TransactionBaseImpl::Delete(column_family
, key
);
131 Status
WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle
* column_family
,
133 Status s
= MaybeFlushWriteBatchToDB();
137 return TransactionBaseImpl::SingleDelete(column_family
, key
);
140 Status
WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle
* column_family
,
141 const SliceParts
& key
) {
142 Status s
= MaybeFlushWriteBatchToDB();
146 return TransactionBaseImpl::SingleDelete(column_family
, key
);
149 Status
WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
150 const bool kPrepared
= true;
153 bool needs_mark
= (log_number_
== 0);
155 if (max_write_batch_size_
!= 0 &&
156 write_batch_
.GetDataSize() > max_write_batch_size_
) {
157 assert(GetState() != PREPARED
);
158 s
= FlushWriteBatchToDB(!kPrepared
);
160 assert(log_number_
> 0);
161 // This is done to prevent WAL files after log_number_ from being
162 // deleted, because they could potentially contain unprepared batches.
164 dbimpl_
->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
172 void WriteUnpreparedTxn::UpdateWriteKeySet(uint32_t cfid
, const Slice
& key
) {
173 // TODO(lth): write_set_keys_ can just be a std::string instead of a vector.
174 write_set_keys_
[cfid
].push_back(key
.ToString());
177 Status
WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared
) {
179 return Status::InvalidArgument("Cannot write to DB without SetName.");
182 // Update write_key_set_ for rollback purposes.
183 KeySetBuilder
keyset_handler(
184 this, wupt_db_
->txn_db_options_
.rollback_merge_operands
);
185 auto s
= GetWriteBatch()->GetWriteBatch()->Iterate(&keyset_handler
);
191 // TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
192 WriteOptions write_options
= write_options_
;
193 write_options
.disableWAL
= false;
194 const bool WRITE_AFTER_COMMIT
= true;
195 // MarkEndPrepare will change Noop marker to the appropriate marker.
196 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_
,
197 !WRITE_AFTER_COMMIT
, !prepared
);
198 // For each duplicate key we account for a new sub-batch
199 prepare_batch_cnt_
= GetWriteBatch()->SubBatchCnt();
200 // AddPrepared better to be called in the pre-release callback otherwise there
201 // is a non-zero chance of max advancing prepare_seq and readers assume the
202 // data as committed.
203 // Also having it in the PreReleaseCallback allows in-order addition of
204 // prepared entries to PrepareHeap and hence enables an optimization. Refer to
205 // SmallestUnCommittedSeq for more details.
206 AddPreparedCallback
add_prepared_callback(
207 wpt_db_
, prepare_batch_cnt_
,
208 db_impl_
->immutable_db_options().two_write_queues
);
209 const bool DISABLE_MEMTABLE
= true;
210 uint64_t seq_used
= kMaxSequenceNumber
;
211 // log_number_ should refer to the oldest log containing uncommitted data
212 // from the current transaction. This means that if log_number_ is set,
213 // WriteImpl should not overwrite that value, so set log_used to nullptr if
214 // log_number_ is already set.
215 uint64_t* log_used
= log_number_
? nullptr : &log_number_
;
216 s
= db_impl_
->WriteImpl(write_options
, GetWriteBatch()->GetWriteBatch(),
217 /*callback*/ nullptr, log_used
, /*log ref*/
218 0, !DISABLE_MEMTABLE
, &seq_used
, prepare_batch_cnt_
,
219 &add_prepared_callback
);
220 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
221 auto prepare_seq
= seq_used
;
223 // Only call SetId if it hasn't been set yet.
227 // unprep_seqs_ will also contain prepared seqnos since they are treated in
228 // the same way in the prepare/commit callbacks. See the comment on the
229 // definition of unprep_seqs_.
230 unprep_seqs_
[prepare_seq
] = prepare_batch_cnt_
;
232 // Reset transaction state.
234 prepare_batch_cnt_
= 0;
235 write_batch_
.Clear();
236 WriteBatchInternal::InsertNoop(write_batch_
.GetWriteBatch());
242 Status
WriteUnpreparedTxn::PrepareInternal() {
243 const bool kPrepared
= true;
244 return FlushWriteBatchToDB(kPrepared
);
247 Status
WriteUnpreparedTxn::CommitWithoutPrepareInternal() {
248 if (unprep_seqs_
.empty()) {
249 assert(log_number_
== 0);
250 assert(GetId() == 0);
251 return WritePreparedTxn::CommitWithoutPrepareInternal();
254 // TODO(lth): We should optimize commit without prepare to not perform
255 // a prepare under the hood.
256 auto s
= PrepareInternal();
260 return CommitInternal();
263 Status
WriteUnpreparedTxn::CommitInternal() {
264 // TODO(lth): Reduce duplicate code with WritePrepared commit logic.
266 // We take the commit-time batch and append the Commit marker. The Memtable
267 // will ignore the Commit marker in non-recovery mode
268 WriteBatch
* working_batch
= GetCommitTimeWriteBatch();
269 const bool empty
= working_batch
->Count() == 0;
270 WriteBatchInternal::MarkCommit(working_batch
, name_
);
272 const bool for_recovery
= use_only_the_last_commit_time_batch_for_recovery_
;
273 if (!empty
&& for_recovery
) {
274 // When not writing to memtable, we can still cache the latest write batch.
275 // The cached batch will be written to memtable in WriteRecoverableState
276 // during FlushMemTable
277 WriteBatchInternal::SetAsLastestPersistentState(working_batch
);
280 const bool includes_data
= !empty
&& !for_recovery
;
281 size_t commit_batch_cnt
= 0;
282 if (UNLIKELY(includes_data
)) {
283 ROCKS_LOG_WARN(db_impl_
->immutable_db_options().info_log
,
284 "Duplicate key overhead");
285 SubBatchCounter
counter(*wpt_db_
->GetCFComparatorMap());
286 auto s
= working_batch
->Iterate(&counter
);
288 commit_batch_cnt
= counter
.BatchCount();
290 const bool disable_memtable
= !includes_data
;
291 const bool do_one_write
=
292 !db_impl_
->immutable_db_options().two_write_queues
|| disable_memtable
;
293 const bool publish_seq
= do_one_write
;
294 // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
295 // DB in one shot. min_uncommitted still works since it requires capturing
296 // data that is written to DB but not yet committed, while
297 // CommitTimeWriteBatch commits with PreReleaseCallback.
298 WriteUnpreparedCommitEntryPreReleaseCallback
update_commit_map(
299 wpt_db_
, db_impl_
, unprep_seqs_
, commit_batch_cnt
, publish_seq
);
300 uint64_t seq_used
= kMaxSequenceNumber
;
301 // Since the prepared batch is directly written to memtable, there is already
302 // a connection between the memtable and its WAL, so there is no need to
303 // redundantly reference the log that contains the prepared data.
304 const uint64_t zero_log_number
= 0ull;
305 size_t batch_cnt
= UNLIKELY(commit_batch_cnt
) ? commit_batch_cnt
: 1;
306 auto s
= db_impl_
->WriteImpl(write_options_
, working_batch
, nullptr, nullptr,
307 zero_log_number
, disable_memtable
, &seq_used
,
308 batch_cnt
, &update_commit_map
);
309 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
310 if (LIKELY(do_one_write
|| !s
.ok())) {
311 if (LIKELY(s
.ok())) {
312 // Note RemovePrepared should be called after WriteImpl that publishsed
313 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
314 for (const auto& seq
: unprep_seqs_
) {
315 wpt_db_
->RemovePrepared(seq
.first
, seq
.second
);
318 unprep_seqs_
.clear();
319 write_set_keys_
.clear();
321 } // else do the 2nd write to publish seq
322 // Note: the 2nd write comes with a performance penality. So if we have too
323 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
324 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
325 // two_write_queues should be disabled to avoid many additional writes here.
326 class PublishSeqPreReleaseCallback
: public PreReleaseCallback
{
328 explicit PublishSeqPreReleaseCallback(DBImpl
* db_impl
)
329 : db_impl_(db_impl
) {}
330 virtual Status
Callback(SequenceNumber seq
, bool is_mem_disabled
331 __attribute__((__unused__
))) override
{
332 assert(is_mem_disabled
);
333 assert(db_impl_
->immutable_db_options().two_write_queues
);
334 db_impl_
->SetLastPublishedSequence(seq
);
340 } publish_seq_callback(db_impl_
);
341 WriteBatch empty_batch
;
342 empty_batch
.PutLogData(Slice());
343 // In the absence of Prepare markers, use Noop as a batch separator
344 WriteBatchInternal::InsertNoop(&empty_batch
);
345 const bool DISABLE_MEMTABLE
= true;
346 const size_t ONE_BATCH
= 1;
347 const uint64_t NO_REF_LOG
= 0;
348 s
= db_impl_
->WriteImpl(write_options_
, &empty_batch
, nullptr, nullptr,
349 NO_REF_LOG
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
350 &publish_seq_callback
);
351 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
352 // Note RemovePrepared should be called after WriteImpl that publishsed the
353 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
354 for (const auto& seq
: unprep_seqs_
) {
355 wpt_db_
->RemovePrepared(seq
.first
, seq
.second
);
357 unprep_seqs_
.clear();
358 write_set_keys_
.clear();
362 Status
WriteUnpreparedTxn::RollbackInternal() {
363 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
364 WriteBatchWithIndex
rollback_batch(
365 wpt_db_
->DefaultColumnFamily()->GetComparator(), 0, true, 0);
366 assert(GetId() != kMaxSequenceNumber
);
368 const auto& cf_map
= *wupt_db_
->GetCFHandleMap();
369 // In WritePrepared, the txn is is the same as prepare seq
370 auto last_visible_txn
= GetId() - 1;
373 ReadOptions roptions
;
374 // Note that we do not use WriteUnpreparedTxnReadCallback because we do not
375 // need to read our own writes when reading prior versions of the key for
377 WritePreparedTxnReadCallback
callback(wpt_db_
, last_visible_txn
, 0);
378 for (const auto& cfkey
: write_set_keys_
) {
379 const auto cfid
= cfkey
.first
;
380 const auto& keys
= cfkey
.second
;
381 for (const auto& key
: keys
) {
382 const auto& cf_handle
= cf_map
.at(cfid
);
383 PinnableSlice pinnable_val
;
385 s
= db_impl_
->GetImpl(roptions
, cf_handle
, key
, &pinnable_val
, ¬_used
,
389 s
= rollback_batch
.Put(cf_handle
, key
, pinnable_val
);
391 } else if (s
.IsNotFound()) {
392 s
= rollback_batch
.Delete(cf_handle
, key
);
400 // The Rollback marker will be used as a batch separator
401 WriteBatchInternal::MarkRollback(rollback_batch
.GetWriteBatch(), name_
);
402 bool do_one_write
= !db_impl_
->immutable_db_options().two_write_queues
;
403 const bool DISABLE_MEMTABLE
= true;
404 const uint64_t NO_REF_LOG
= 0;
405 uint64_t seq_used
= kMaxSequenceNumber
;
406 // TODO(lth): We write rollback batch all in a single batch here, but this
407 // should be subdivded into multiple batches as well. In phase 2, when key
408 // sets are read from WAL, this will happen naturally.
409 const size_t ONE_BATCH
= 1;
410 // We commit the rolled back prepared batches. ALthough this is
411 // counter-intuitive, i) it is safe to do so, since the prepared batches are
412 // already canceled out by the rollback batch, ii) adding the commit entry to
413 // CommitCache will allow us to benefit from the existing mechanism in
414 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
415 // with a live snapshot around so that the live snapshot properly skips the
416 // entry even if its prepare seq is lower than max_evicted_seq_.
417 WriteUnpreparedCommitEntryPreReleaseCallback
update_commit_map(
418 wpt_db_
, db_impl_
, unprep_seqs_
, ONE_BATCH
);
419 // Note: the rollback batch does not need AddPrepared since it is written to
420 // DB in one shot. min_uncommitted still works since it requires capturing
421 // data that is written to DB but not yet committed, while the roolback
422 // batch commits with PreReleaseCallback.
423 s
= db_impl_
->WriteImpl(write_options_
, rollback_batch
.GetWriteBatch(),
424 nullptr, nullptr, NO_REF_LOG
, !DISABLE_MEMTABLE
,
425 &seq_used
, rollback_batch
.SubBatchCnt(),
426 do_one_write
? &update_commit_map
: nullptr);
427 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
432 for (const auto& seq
: unprep_seqs_
) {
433 wpt_db_
->RemovePrepared(seq
.first
, seq
.second
);
435 unprep_seqs_
.clear();
436 write_set_keys_
.clear();
438 } // else do the 2nd write for commit
439 uint64_t& prepare_seq
= seq_used
;
440 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
441 "RollbackInternal 2nd write prepare_seq: %" PRIu64
,
443 // Commit the batch by writing an empty batch to the queue that will release
444 // the commit sequence number to readers.
445 const size_t ZERO_COMMITS
= 0;
446 WritePreparedCommitEntryPreReleaseCallback
update_commit_map_with_prepare(
447 wpt_db_
, db_impl_
, prepare_seq
, ONE_BATCH
, ZERO_COMMITS
);
448 WriteBatch empty_batch
;
449 empty_batch
.PutLogData(Slice());
450 // In the absence of Prepare markers, use Noop as a batch separator
451 WriteBatchInternal::InsertNoop(&empty_batch
);
452 s
= db_impl_
->WriteImpl(write_options_
, &empty_batch
, nullptr, nullptr,
453 NO_REF_LOG
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
454 &update_commit_map_with_prepare
);
455 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
456 // Mark the txn as rolled back
457 uint64_t& rollback_seq
= seq_used
;
459 // Note: it is safe to do it after PreReleaseCallback via WriteImpl since
460 // all the writes by the prpared batch are already blinded by the rollback
461 // batch. The only reason we commit the prepared batch here is to benefit
462 // from the existing mechanism in CommitCache that takes care of the rare
463 // cases that the prepare seq is visible to a snsapshot but max evicted seq
464 // advances that prepare seq.
465 for (const auto& seq
: unprep_seqs_
) {
466 for (size_t i
= 0; i
< seq
.second
; i
++) {
467 wpt_db_
->AddCommitted(seq
.first
+ i
, rollback_seq
);
470 for (const auto& seq
: unprep_seqs_
) {
471 wpt_db_
->RemovePrepared(seq
.first
, seq
.second
);
475 unprep_seqs_
.clear();
476 write_set_keys_
.clear();
480 Status
WriteUnpreparedTxn::Get(const ReadOptions
& options
,
481 ColumnFamilyHandle
* column_family
,
482 const Slice
& key
, PinnableSlice
* value
) {
483 auto snapshot
= options
.snapshot
;
485 snapshot
!= nullptr ? snapshot
->GetSequenceNumber() : kMaxSequenceNumber
;
486 SequenceNumber min_uncommitted
= 0; // by default disable the optimization
487 if (snapshot
!= nullptr) {
489 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(snapshot
)
493 WriteUnpreparedTxnReadCallback
callback(wupt_db_
, snap_seq
, min_uncommitted
,
495 return write_batch_
.GetFromBatchAndDB(db_
, options
, column_family
, key
, value
,
499 Iterator
* WriteUnpreparedTxn::GetIterator(const ReadOptions
& options
) {
500 return GetIterator(options
, wupt_db_
->DefaultColumnFamily());
503 Iterator
* WriteUnpreparedTxn::GetIterator(const ReadOptions
& options
,
504 ColumnFamilyHandle
* column_family
) {
505 // Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
506 Iterator
* db_iter
= wupt_db_
->NewIterator(options
, column_family
, this);
509 return write_batch_
.NewIteratorWithBase(column_family
, db_iter
);
512 const std::map
<SequenceNumber
, size_t>&
513 WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
517 } // namespace rocksdb
519 #endif // ROCKSDB_LITE