1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/write_prepared_txn_db.h"
13 #include <unordered_set>
16 #include "db/arena_wrapped_db_iter.h"
17 #include "db/db_impl/db_impl.h"
18 #include "logging/logging.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "test_util/sync_point.h"
23 #include "util/cast_util.h"
24 #include "util/mutexlock.h"
25 #include "util/string_util.h"
26 #include "utilities/transactions/pessimistic_transaction.h"
27 #include "utilities/transactions/transaction_db_mutex_impl.h"
29 // This function is for testing only. If it returns true, then all entries in
30 // the commit cache will be evicted. Unit and/or stress tests (db_stress)
31 // can implement this function and customize how frequently commit cache
33 // TODO: remove this function once we can configure commit cache to be very
34 // small so that eviction occurs very frequently. This requires the commit
35 // cache entry to be able to encode prepare and commit sequence numbers so that
36 // the commit sequence number does not have to be within a certain range of
37 // prepare sequence number.
38 extern "C" bool rocksdb_write_prepared_TEST_ShouldClearCommitCache(void)
39 __attribute__((__weak__
));
41 namespace ROCKSDB_NAMESPACE
{
43 Status
WritePreparedTxnDB::Initialize(
44 const std::vector
<size_t>& compaction_enabled_cf_indices
,
45 const std::vector
<ColumnFamilyHandle
*>& handles
) {
46 auto dbimpl
= static_cast_with_check
<DBImpl
>(GetRootDB());
47 assert(dbimpl
!= nullptr);
48 auto rtxns
= dbimpl
->recovered_transactions();
49 std::map
<SequenceNumber
, SequenceNumber
> ordered_seq_cnt
;
50 for (auto rtxn
: rtxns
) {
51 // There should only one batch for WritePrepared policy.
52 assert(rtxn
.second
->batches_
.size() == 1);
53 const auto& seq
= rtxn
.second
->batches_
.begin()->first
;
54 const auto& batch_info
= rtxn
.second
->batches_
.begin()->second
;
55 auto cnt
= batch_info
.batch_cnt_
? batch_info
.batch_cnt_
: 1;
56 ordered_seq_cnt
[seq
] = cnt
;
58 // AddPrepared must be called in order
59 for (auto seq_cnt
: ordered_seq_cnt
) {
60 auto seq
= seq_cnt
.first
;
61 auto cnt
= seq_cnt
.second
;
62 for (size_t i
= 0; i
< cnt
; i
++) {
66 SequenceNumber prev_max
= max_evicted_seq_
;
67 SequenceNumber last_seq
= db_impl_
->GetLatestSequenceNumber();
68 AdvanceMaxEvictedSeq(prev_max
, last_seq
);
69 // Create a gap between max and the next snapshot. This simplifies the logic
70 // in IsInSnapshot by not having to consider the special case of max ==
71 // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
73 db_impl_
->versions_
->SetLastAllocatedSequence(last_seq
+ 1);
74 db_impl_
->versions_
->SetLastSequence(last_seq
+ 1);
75 db_impl_
->versions_
->SetLastPublishedSequence(last_seq
+ 1);
78 db_impl_
->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
79 // A callback to commit a single sub-batch
80 class CommitSubBatchPreReleaseCallback
: public PreReleaseCallback
{
82 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB
* db
)
84 Status
Callback(SequenceNumber commit_seq
,
85 bool is_mem_disabled
__attribute__((__unused__
)), uint64_t,
86 size_t /*index*/, size_t /*total*/) override
{
87 assert(!is_mem_disabled
);
88 db_
->AddCommitted(commit_seq
, commit_seq
);
93 WritePreparedTxnDB
* db_
;
95 db_impl_
->SetRecoverableStatePreReleaseCallback(
96 new CommitSubBatchPreReleaseCallback(this));
98 auto s
= PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices
,
103 Status
WritePreparedTxnDB::VerifyCFOptions(
104 const ColumnFamilyOptions
& cf_options
) {
105 Status s
= PessimisticTransactionDB::VerifyCFOptions(cf_options
);
109 if (!cf_options
.memtable_factory
->CanHandleDuplicatedKey()) {
110 return Status::InvalidArgument(
111 "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
112 "WritePrpeared transactions");
117 Transaction
* WritePreparedTxnDB::BeginTransaction(
118 const WriteOptions
& write_options
, const TransactionOptions
& txn_options
,
119 Transaction
* old_txn
) {
120 if (old_txn
!= nullptr) {
121 ReinitializeTransaction(old_txn
, write_options
, txn_options
);
124 return new WritePreparedTxn(this, write_options
, txn_options
);
128 Status
WritePreparedTxnDB::Write(const WriteOptions
& opts
,
129 WriteBatch
* updates
) {
130 if (txn_db_options_
.skip_concurrency_control
) {
131 // Skip locking the rows
132 const size_t UNKNOWN_BATCH_CNT
= 0;
133 WritePreparedTxn
* NO_TXN
= nullptr;
134 return WriteInternal(opts
, updates
, UNKNOWN_BATCH_CNT
, NO_TXN
);
136 return PessimisticTransactionDB::WriteWithConcurrencyControl(opts
, updates
);
140 Status
WritePreparedTxnDB::Write(
141 const WriteOptions
& opts
,
142 const TransactionDBWriteOptimizations
& optimizations
, WriteBatch
* updates
) {
143 if (optimizations
.skip_concurrency_control
) {
144 // Skip locking the rows
145 const size_t UNKNOWN_BATCH_CNT
= 0;
146 const size_t ONE_BATCH_CNT
= 1;
147 const size_t batch_cnt
= optimizations
.skip_duplicate_key_check
150 WritePreparedTxn
* NO_TXN
= nullptr;
151 return WriteInternal(opts
, updates
, batch_cnt
, NO_TXN
);
153 // TODO(myabandeh): Make use of skip_duplicate_key_check hint
154 // Fall back to unoptimized version
155 return PessimisticTransactionDB::WriteWithConcurrencyControl(opts
, updates
);
159 Status
WritePreparedTxnDB::WriteInternal(const WriteOptions
& write_options_orig
,
160 WriteBatch
* batch
, size_t batch_cnt
,
161 WritePreparedTxn
* txn
) {
162 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
163 "CommitBatchInternal");
164 if (batch
->Count() == 0) {
165 // Otherwise our 1 seq per batch logic will break since there is no seq
166 // increased for this batch.
170 if (write_options_orig
.protection_bytes_per_key
> 0) {
171 auto s
= WriteBatchInternal::UpdateProtectionInfo(
172 batch
, write_options_orig
.protection_bytes_per_key
);
178 if (batch_cnt
== 0) { // not provided, then compute it
179 // TODO(myabandeh): add an option to allow user skipping this cost
180 SubBatchCounter
counter(*GetCFComparatorMap());
181 auto s
= batch
->Iterate(&counter
);
185 batch_cnt
= counter
.BatchCount();
186 WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD
);
187 ROCKS_LOG_DETAILS(info_log_
, "Duplicate key overhead: %" PRIu64
" batches",
188 static_cast<uint64_t>(batch_cnt
));
192 bool do_one_write
= !db_impl_
->immutable_db_options().two_write_queues
;
193 WriteOptions
write_options(write_options_orig
);
194 // In the absence of Prepare markers, use Noop as a batch separator
195 auto s
= WriteBatchInternal::InsertNoop(batch
);
197 const bool DISABLE_MEMTABLE
= true;
198 const uint64_t no_log_ref
= 0;
199 uint64_t seq_used
= kMaxSequenceNumber
;
200 const size_t ZERO_PREPARES
= 0;
201 const bool kSeperatePrepareCommitBatches
= true;
202 // Since this is not 2pc, there is no need for AddPrepared but having it in
203 // the PreReleaseCallback enables an optimization. Refer to
204 // SmallestUnCommittedSeq for more details.
205 AddPreparedCallback
add_prepared_callback(
206 this, db_impl_
, batch_cnt
,
207 db_impl_
->immutable_db_options().two_write_queues
,
208 !kSeperatePrepareCommitBatches
);
209 WritePreparedCommitEntryPreReleaseCallback
update_commit_map(
210 this, db_impl_
, kMaxSequenceNumber
, ZERO_PREPARES
, batch_cnt
);
211 PreReleaseCallback
* pre_release_callback
;
213 pre_release_callback
= &update_commit_map
;
215 pre_release_callback
= &add_prepared_callback
;
217 s
= db_impl_
->WriteImpl(write_options
, batch
, nullptr, nullptr, no_log_ref
,
218 !DISABLE_MEMTABLE
, &seq_used
, batch_cnt
,
219 pre_release_callback
);
220 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
221 uint64_t prepare_seq
= seq_used
;
222 if (txn
!= nullptr) {
223 txn
->SetId(prepare_seq
);
230 } // else do the 2nd write for commit
231 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
232 "CommitBatchInternal 2nd write prepare_seq: %" PRIu64
,
234 // Commit the batch by writing an empty batch to the 2nd queue that will
235 // release the commit sequence number to readers.
236 const size_t ZERO_COMMITS
= 0;
237 WritePreparedCommitEntryPreReleaseCallback
update_commit_map_with_prepare(
238 this, db_impl_
, prepare_seq
, batch_cnt
, ZERO_COMMITS
);
239 WriteBatch empty_batch
;
240 write_options
.disableWAL
= true;
241 write_options
.sync
= false;
242 const size_t ONE_BATCH
= 1; // Just to inc the seq
243 s
= db_impl_
->WriteImpl(write_options
, &empty_batch
, nullptr, nullptr,
244 no_log_ref
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
245 &update_commit_map_with_prepare
);
246 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
247 // Note: RemovePrepared is called from within PreReleaseCallback
251 Status
WritePreparedTxnDB::Get(const ReadOptions
& options
,
252 ColumnFamilyHandle
* column_family
,
253 const Slice
& key
, PinnableSlice
* value
) {
254 SequenceNumber min_uncommitted
, snap_seq
;
255 const SnapshotBackup backed_by_snapshot
=
256 AssignMinMaxSeqs(options
.snapshot
, &min_uncommitted
, &snap_seq
);
257 WritePreparedTxnReadCallback
callback(this, snap_seq
, min_uncommitted
,
259 bool* dont_care
= nullptr;
260 DBImpl::GetImplOptions get_impl_options
;
261 get_impl_options
.column_family
= column_family
;
262 get_impl_options
.value
= value
;
263 get_impl_options
.value_found
= dont_care
;
264 get_impl_options
.callback
= &callback
;
265 auto res
= db_impl_
->GetImpl(options
, key
, get_impl_options
);
266 if (LIKELY(callback
.valid() && ValidateSnapshot(callback
.max_visible_seq(),
267 backed_by_snapshot
))) {
270 res
.PermitUncheckedError();
271 WPRecordTick(TXN_GET_TRY_AGAIN
);
272 return Status::TryAgain();
276 void WritePreparedTxnDB::UpdateCFComparatorMap(
277 const std::vector
<ColumnFamilyHandle
*>& handles
) {
278 auto cf_map
= new std::map
<uint32_t, const Comparator
*>();
279 auto handle_map
= new std::map
<uint32_t, ColumnFamilyHandle
*>();
280 for (auto h
: handles
) {
281 auto id
= h
->GetID();
282 const Comparator
* comparator
= h
->GetComparator();
283 (*cf_map
)[id
] = comparator
;
285 (*handle_map
)[id
] = h
;
287 // The pointer to the default cf handle in the handles will be deleted.
288 // Use the pointer maintained by the db instead.
289 (*handle_map
)[id
] = DefaultColumnFamily();
292 cf_map_
.reset(cf_map
);
293 handle_map_
.reset(handle_map
);
296 void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle
* h
) {
297 auto old_cf_map_ptr
= cf_map_
.get();
298 assert(old_cf_map_ptr
);
299 auto cf_map
= new std::map
<uint32_t, const Comparator
*>(*old_cf_map_ptr
);
300 auto old_handle_map_ptr
= handle_map_
.get();
301 assert(old_handle_map_ptr
);
303 new std::map
<uint32_t, ColumnFamilyHandle
*>(*old_handle_map_ptr
);
304 auto id
= h
->GetID();
305 const Comparator
* comparator
= h
->GetComparator();
306 (*cf_map
)[id
] = comparator
;
307 (*handle_map
)[id
] = h
;
308 cf_map_
.reset(cf_map
);
309 handle_map_
.reset(handle_map
);
312 std::vector
<Status
> WritePreparedTxnDB::MultiGet(
313 const ReadOptions
& options
,
314 const std::vector
<ColumnFamilyHandle
*>& column_family
,
315 const std::vector
<Slice
>& keys
, std::vector
<std::string
>* values
) {
317 size_t num_keys
= keys
.size();
318 values
->resize(num_keys
);
320 std::vector
<Status
> stat_list(num_keys
);
321 for (size_t i
= 0; i
< num_keys
; ++i
) {
322 stat_list
[i
] = this->Get(options
, column_family
[i
], keys
[i
], &(*values
)[i
]);
327 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
328 struct WritePreparedTxnDB::IteratorState
{
329 IteratorState(WritePreparedTxnDB
* txn_db
, SequenceNumber sequence
,
330 std::shared_ptr
<ManagedSnapshot
> s
,
331 SequenceNumber min_uncommitted
)
332 : callback(txn_db
, sequence
, min_uncommitted
, kBackedByDBSnapshot
),
335 WritePreparedTxnReadCallback callback
;
336 std::shared_ptr
<ManagedSnapshot
> snapshot
;
340 static void CleanupWritePreparedTxnDBIterator(void* arg1
, void* /*arg2*/) {
341 delete reinterpret_cast<WritePreparedTxnDB::IteratorState
*>(arg1
);
343 } // anonymous namespace
345 Iterator
* WritePreparedTxnDB::NewIterator(const ReadOptions
& options
,
346 ColumnFamilyHandle
* column_family
) {
347 constexpr bool expose_blob_index
= false;
348 constexpr bool allow_refresh
= false;
349 std::shared_ptr
<ManagedSnapshot
> own_snapshot
= nullptr;
350 SequenceNumber snapshot_seq
= kMaxSequenceNumber
;
351 SequenceNumber min_uncommitted
= 0;
352 if (options
.snapshot
!= nullptr) {
353 snapshot_seq
= options
.snapshot
->GetSequenceNumber();
355 static_cast_with_check
<const SnapshotImpl
>(options
.snapshot
)
358 auto* snapshot
= GetSnapshot();
359 // We take a snapshot to make sure that the related data in the commit map
361 snapshot_seq
= snapshot
->GetSequenceNumber();
363 static_cast_with_check
<const SnapshotImpl
>(snapshot
)->min_uncommitted_
;
364 own_snapshot
= std::make_shared
<ManagedSnapshot
>(db_impl_
, snapshot
);
366 assert(snapshot_seq
!= kMaxSequenceNumber
);
368 static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
)->cfd();
370 new IteratorState(this, snapshot_seq
, own_snapshot
, min_uncommitted
);
372 db_impl_
->NewIteratorImpl(options
, cfd
, snapshot_seq
, &state
->callback
,
373 expose_blob_index
, allow_refresh
);
374 db_iter
->RegisterCleanup(CleanupWritePreparedTxnDBIterator
, state
, nullptr);
378 Status
WritePreparedTxnDB::NewIterators(
379 const ReadOptions
& options
,
380 const std::vector
<ColumnFamilyHandle
*>& column_families
,
381 std::vector
<Iterator
*>* iterators
) {
382 constexpr bool expose_blob_index
= false;
383 constexpr bool allow_refresh
= false;
384 std::shared_ptr
<ManagedSnapshot
> own_snapshot
= nullptr;
385 SequenceNumber snapshot_seq
= kMaxSequenceNumber
;
386 SequenceNumber min_uncommitted
= 0;
387 if (options
.snapshot
!= nullptr) {
388 snapshot_seq
= options
.snapshot
->GetSequenceNumber();
390 static_cast_with_check
<const SnapshotImpl
>(options
.snapshot
)
393 auto* snapshot
= GetSnapshot();
394 // We take a snapshot to make sure that the related data in the commit map
396 snapshot_seq
= snapshot
->GetSequenceNumber();
397 own_snapshot
= std::make_shared
<ManagedSnapshot
>(db_impl_
, snapshot
);
399 static_cast_with_check
<const SnapshotImpl
>(snapshot
)->min_uncommitted_
;
402 iterators
->reserve(column_families
.size());
403 for (auto* column_family
: column_families
) {
405 static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
)->cfd();
407 new IteratorState(this, snapshot_seq
, own_snapshot
, min_uncommitted
);
409 db_impl_
->NewIteratorImpl(options
, cfd
, snapshot_seq
, &state
->callback
,
410 expose_blob_index
, allow_refresh
);
411 db_iter
->RegisterCleanup(CleanupWritePreparedTxnDBIterator
, state
, nullptr);
412 iterators
->push_back(db_iter
);
417 void WritePreparedTxnDB::Init(const TransactionDBOptions
& txn_db_opts
) {
418 // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
420 INC_STEP_FOR_MAX_EVICTED
=
421 std::max(COMMIT_CACHE_SIZE
/ 100, static_cast<size_t>(1));
422 snapshot_cache_
= std::unique_ptr
<std::atomic
<SequenceNumber
>[]>(
423 new std::atomic
<SequenceNumber
>[SNAPSHOT_CACHE_SIZE
] {});
424 commit_cache_
= std::unique_ptr
<std::atomic
<CommitEntry64b
>[]>(
425 new std::atomic
<CommitEntry64b
>[COMMIT_CACHE_SIZE
] {});
426 dummy_max_snapshot_
.number_
= kMaxSequenceNumber
;
427 rollback_deletion_type_callback_
=
428 txn_db_opts
.rollback_deletion_type_callback
;
431 void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max
,
433 // When max_evicted_seq_ advances, move older entries from prepared_txns_
434 // to delayed_prepared_. This guarantees that if a seq is lower than max,
435 // then it is not in prepared_txns_ and save an expensive, synchronized
436 // lookup from a shared set. delayed_prepared_ is expected to be empty in
440 "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64
,
441 prepared_txns_
.empty(),
442 prepared_txns_
.empty() ? 0 : prepared_txns_
.top());
443 const SequenceNumber prepared_top
= prepared_txns_
.top();
444 const bool empty
= prepared_top
== kMaxSequenceNumber
;
445 // Preliminary check to avoid the synchronization cost
446 if (!empty
&& prepared_top
<= new_max
) {
448 // Needed to avoid double locking in pop().
449 prepared_txns_
.push_pop_mutex()->Unlock();
451 WriteLock
wl(&prepared_mutex_
);
452 // Need to fetch fresh values of ::top after mutex is acquired
453 while (!prepared_txns_
.empty() && prepared_txns_
.top() <= new_max
) {
454 auto to_be_popped
= prepared_txns_
.top();
455 delayed_prepared_
.insert(to_be_popped
);
456 ROCKS_LOG_WARN(info_log_
,
457 "prepared_mutex_ overhead %" PRIu64
" (prep=%" PRIu64
458 " new_max=%" PRIu64
")",
459 static_cast<uint64_t>(delayed_prepared_
.size()),
460 to_be_popped
, new_max
);
461 delayed_prepared_empty_
.store(false, std::memory_order_release
);
462 // Update prepared_txns_ after updating delayed_prepared_empty_ otherwise
463 // there will be a point in time that the entry is neither in
464 // prepared_txns_ nor in delayed_prepared_, which will not be checked if
465 // delayed_prepared_empty_ is false.
466 prepared_txns_
.pop();
469 prepared_txns_
.push_pop_mutex()->Lock();
474 void WritePreparedTxnDB::AddPrepared(uint64_t seq
, bool locked
) {
475 ROCKS_LOG_DETAILS(info_log_
, "Txn %" PRIu64
" Preparing with max %" PRIu64
,
476 seq
, max_evicted_seq_
.load());
477 TEST_SYNC_POINT("AddPrepared::begin:pause");
478 TEST_SYNC_POINT("AddPrepared::begin:resume");
480 prepared_txns_
.push_pop_mutex()->Lock();
482 prepared_txns_
.push_pop_mutex()->AssertHeld();
483 prepared_txns_
.push(seq
);
484 auto new_max
= future_max_evicted_seq_
.load();
485 if (UNLIKELY(seq
<= new_max
)) {
486 // This should not happen in normal case
489 "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
492 CheckPreparedAgainstMax(new_max
, true /*locked*/);
495 prepared_txns_
.push_pop_mutex()->Unlock();
497 TEST_SYNC_POINT("AddPrepared::end");
500 void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq
, uint64_t commit_seq
,
502 ROCKS_LOG_DETAILS(info_log_
, "Txn %" PRIu64
" Committing with %" PRIu64
,
503 prepare_seq
, commit_seq
);
504 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
505 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
506 auto indexed_seq
= prepare_seq
% COMMIT_CACHE_SIZE
;
507 CommitEntry64b evicted_64b
;
509 bool to_be_evicted
= GetCommitEntry(indexed_seq
, &evicted_64b
, &evicted
);
510 if (LIKELY(to_be_evicted
)) {
511 assert(evicted
.prep_seq
!= prepare_seq
);
512 auto prev_max
= max_evicted_seq_
.load(std::memory_order_acquire
);
513 ROCKS_LOG_DETAILS(info_log_
,
514 "Evicting %" PRIu64
",%" PRIu64
" with max %" PRIu64
,
515 evicted
.prep_seq
, evicted
.commit_seq
, prev_max
);
516 if (prev_max
< evicted
.commit_seq
) {
517 auto last
= db_impl_
->GetLastPublishedSequence(); // could be 0
518 SequenceNumber max_evicted_seq
;
519 if (LIKELY(evicted
.commit_seq
< last
)) {
521 // Inc max in larger steps to avoid frequent updates
523 std::min(evicted
.commit_seq
+ INC_STEP_FOR_MAX_EVICTED
, last
- 1);
525 // legit when a commit entry in a write batch overwrite the previous one
526 max_evicted_seq
= evicted
.commit_seq
;
529 if (rocksdb_write_prepared_TEST_ShouldClearCommitCache
&&
530 rocksdb_write_prepared_TEST_ShouldClearCommitCache()) {
531 max_evicted_seq
= last
;
534 ROCKS_LOG_DETAILS(info_log_
,
535 "%lu Evicting %" PRIu64
",%" PRIu64
" with max %" PRIu64
537 prepare_seq
, evicted
.prep_seq
, evicted
.commit_seq
,
538 prev_max
, max_evicted_seq
);
539 AdvanceMaxEvictedSeq(prev_max
, max_evicted_seq
);
541 if (UNLIKELY(!delayed_prepared_empty_
.load(std::memory_order_acquire
))) {
542 WriteLock
wl(&prepared_mutex_
);
543 auto dp_iter
= delayed_prepared_
.find(evicted
.prep_seq
);
544 if (dp_iter
!= delayed_prepared_
.end()) {
545 // This is a rare case that txn is committed but prepared_txns_ is not
546 // cleaned up yet. Refer to delayed_prepared_commits_ definition for
547 // why it should be kept updated.
548 delayed_prepared_commits_
[evicted
.prep_seq
] = evicted
.commit_seq
;
549 ROCKS_LOG_DEBUG(info_log_
,
550 "delayed_prepared_commits_[%" PRIu64
"]=%" PRIu64
,
551 evicted
.prep_seq
, evicted
.commit_seq
);
554 // After each eviction from commit cache, check if the commit entry should
555 // be kept around because it overlaps with a live snapshot.
556 CheckAgainstSnapshots(evicted
);
559 ExchangeCommitEntry(indexed_seq
, evicted_64b
, {prepare_seq
, commit_seq
});
560 if (UNLIKELY(!succ
)) {
561 ROCKS_LOG_ERROR(info_log_
,
562 "ExchangeCommitEntry failed on [%" PRIu64
"] %" PRIu64
563 ",%" PRIu64
" retrying...",
564 indexed_seq
, prepare_seq
, commit_seq
);
565 // A very rare event, in which the commit entry is updated before we do.
566 // Here we apply a very simple solution of retrying.
567 if (loop_cnt
> 100) {
568 throw std::runtime_error("Infinite loop in AddCommitted!");
570 AddCommitted(prepare_seq
, commit_seq
, ++loop_cnt
);
573 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
574 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
577 void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq
,
578 const size_t batch_cnt
) {
579 TEST_SYNC_POINT_CALLBACK(
580 "RemovePrepared:Start",
581 const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq
)));
582 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause");
583 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume");
584 ROCKS_LOG_DETAILS(info_log_
,
585 "RemovePrepared %" PRIu64
" cnt: %" ROCKSDB_PRIszt
,
586 prepare_seq
, batch_cnt
);
587 WriteLock
wl(&prepared_mutex_
);
588 for (size_t i
= 0; i
< batch_cnt
; i
++) {
589 prepared_txns_
.erase(prepare_seq
+ i
);
590 bool was_empty
= delayed_prepared_
.empty();
592 delayed_prepared_
.erase(prepare_seq
+ i
);
593 auto it
= delayed_prepared_commits_
.find(prepare_seq
+ i
);
594 if (it
!= delayed_prepared_commits_
.end()) {
595 ROCKS_LOG_DETAILS(info_log_
, "delayed_prepared_commits_.erase %" PRIu64
,
597 delayed_prepared_commits_
.erase(it
);
599 bool is_empty
= delayed_prepared_
.empty();
600 if (was_empty
!= is_empty
) {
601 delayed_prepared_empty_
.store(is_empty
, std::memory_order_release
);
607 bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq
,
608 CommitEntry64b
* entry_64b
,
609 CommitEntry
* entry
) const {
610 *entry_64b
= commit_cache_
[static_cast<size_t>(indexed_seq
)].load(
611 std::memory_order_acquire
);
612 bool valid
= entry_64b
->Parse(indexed_seq
, entry
, FORMAT
);
616 bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq
,
617 const CommitEntry
& new_entry
,
618 CommitEntry
* evicted_entry
) {
619 CommitEntry64b
new_entry_64b(new_entry
, FORMAT
);
620 CommitEntry64b evicted_entry_64b
=
621 commit_cache_
[static_cast<size_t>(indexed_seq
)].exchange(
622 new_entry_64b
, std::memory_order_acq_rel
);
623 bool valid
= evicted_entry_64b
.Parse(indexed_seq
, evicted_entry
, FORMAT
);
627 bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq
,
628 CommitEntry64b
& expected_entry_64b
,
629 const CommitEntry
& new_entry
) {
630 auto& atomic_entry
= commit_cache_
[static_cast<size_t>(indexed_seq
)];
631 CommitEntry64b
new_entry_64b(new_entry
, FORMAT
);
632 bool succ
= atomic_entry
.compare_exchange_strong(
633 expected_entry_64b
, new_entry_64b
, std::memory_order_acq_rel
,
634 std::memory_order_acquire
);
638 void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber
& prev_max
,
639 const SequenceNumber
& new_max
) {
640 ROCKS_LOG_DETAILS(info_log_
,
641 "AdvanceMaxEvictedSeq overhead %" PRIu64
" => %" PRIu64
,
643 // Declare the intention before getting snapshot from the DB. This helps a
644 // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
645 // it has not already. Otherwise the new snapshot is when we ask DB for
646 // snapshots smaller than future max.
647 auto updated_future_max
= prev_max
;
648 while (updated_future_max
< new_max
&&
649 !future_max_evicted_seq_
.compare_exchange_weak(
650 updated_future_max
, new_max
, std::memory_order_acq_rel
,
651 std::memory_order_relaxed
)) {
654 CheckPreparedAgainstMax(new_max
, false /*locked*/);
656 // With each change to max_evicted_seq_ fetch the live snapshots behind it.
657 // We use max as the version of snapshots to identify how fresh are the
658 // snapshot list. This works because the snapshots are between 0 and
659 // max, so the larger the max, the more complete they are.
660 SequenceNumber new_snapshots_version
= new_max
;
661 std::vector
<SequenceNumber
> snapshots
;
662 bool update_snapshots
= false;
663 if (new_snapshots_version
> snapshots_version_
) {
664 // This is to avoid updating the snapshots_ if it already updated
665 // with a more recent vesion by a concrrent thread
666 update_snapshots
= true;
667 // We only care about snapshots lower then max
668 snapshots
= GetSnapshotListFromDB(new_max
);
670 if (update_snapshots
) {
671 UpdateSnapshots(snapshots
, new_snapshots_version
);
672 if (!snapshots
.empty()) {
673 WriteLock
wl(&old_commit_map_mutex_
);
674 for (auto snap
: snapshots
) {
675 // This allows IsInSnapshot to tell apart the reads from in valid
676 // snapshots from the reads from committed values in valid snapshots.
677 old_commit_map_
[snap
];
679 old_commit_map_empty_
.store(false, std::memory_order_release
);
682 auto updated_prev_max
= prev_max
;
683 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause");
684 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume");
685 while (updated_prev_max
< new_max
&&
686 !max_evicted_seq_
.compare_exchange_weak(updated_prev_max
, new_max
,
687 std::memory_order_acq_rel
,
688 std::memory_order_relaxed
)) {
692 const Snapshot
* WritePreparedTxnDB::GetSnapshot() {
693 const bool kForWWConflictCheck
= true;
694 return GetSnapshotInternal(!kForWWConflictCheck
);
697 SnapshotImpl
* WritePreparedTxnDB::GetSnapshotInternal(
698 bool for_ww_conflict_check
) {
699 // Note: for this optimization setting the last sequence number and obtaining
700 // the smallest uncommitted seq should be done atomically. However to avoid
701 // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
702 // snapshot. Since we always updated the list of unprepared seq (via
703 // AddPrepared) AFTER the last sequence is updated, this guarantees that the
704 // smallest uncommitted seq that we pair with the snapshot is smaller or equal
705 // the value that would be obtained otherwise atomically. That is ok since
706 // this optimization works as long as min_uncommitted is less than or equal
707 // than the smallest uncommitted seq when the snapshot was taken.
708 auto min_uncommitted
= WritePreparedTxnDB::SmallestUnCommittedSeq();
709 SnapshotImpl
* snap_impl
= db_impl_
->GetSnapshotImpl(for_ww_conflict_check
);
710 TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:first");
712 SequenceNumber snap_seq
= snap_impl
->GetSequenceNumber();
713 // Note: Check against future_max_evicted_seq_ (in contrast with
714 // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq.
715 if (UNLIKELY(snap_seq
!= 0 && snap_seq
<= future_max_evicted_seq_
)) {
716 // There is a very rare case in which the commit entry evicts another commit
717 // entry that is not published yet thus advancing max evicted seq beyond the
718 // last published seq. This case is not likely in real-world setup so we
719 // handle it with a few retries.
722 while ((max
= future_max_evicted_seq_
.load()) != 0 &&
723 snap_impl
->GetSequenceNumber() <= max
&& retry
< 100) {
724 ROCKS_LOG_WARN(info_log_
,
725 "GetSnapshot snap: %" PRIu64
" max: %" PRIu64
726 " retry %" ROCKSDB_PRIszt
,
727 snap_impl
->GetSequenceNumber(), max
, retry
);
728 ReleaseSnapshot(snap_impl
);
729 // Wait for last visible seq to catch up with max, and also go beyond it
732 snap_impl
= db_impl_
->GetSnapshotImpl(for_ww_conflict_check
);
736 assert(snap_impl
->GetSequenceNumber() > max
);
737 if (snap_impl
->GetSequenceNumber() <= max
) {
738 throw std::runtime_error(
739 "Snapshot seq " + std::to_string(snap_impl
->GetSequenceNumber()) +
740 " after " + std::to_string(retry
) +
741 " retries is still less than futre_max_evicted_seq_" +
742 std::to_string(max
));
745 EnhanceSnapshot(snap_impl
, min_uncommitted
);
747 db_impl_
->immutable_db_options().info_log
,
748 "GetSnapshot %" PRIu64
" ww:%" PRIi32
" min_uncommitted: %" PRIu64
,
749 snap_impl
->GetSequenceNumber(), for_ww_conflict_check
, min_uncommitted
);
750 TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:end");
754 void WritePreparedTxnDB::AdvanceSeqByOne() {
755 // Inserting an empty value will i) let the max evicted entry to be
756 // published, i.e., max == last_published, increase the last published to
757 // be one beyond max, i.e., max < last_published.
758 WriteOptions woptions
;
759 TransactionOptions txn_options
;
760 Transaction
* txn0
= BeginTransaction(woptions
, txn_options
, nullptr);
761 std::hash
<std::thread::id
> hasher
;
763 snprintf(name
, 64, "txn%" ROCKSDB_PRIszt
, hasher(std::this_thread::get_id()));
764 assert(strlen(name
) < 64 - 1);
765 Status s
= txn0
->SetName(name
);
768 // Without prepare it would simply skip the commit
779 const std::vector
<SequenceNumber
> WritePreparedTxnDB::GetSnapshotListFromDB(
780 SequenceNumber max
) {
781 ROCKS_LOG_DETAILS(info_log_
, "GetSnapshotListFromDB with max %" PRIu64
, max
);
782 InstrumentedMutexLock
dblock(db_impl_
->mutex());
783 db_impl_
->mutex()->AssertHeld();
784 return db_impl_
->snapshots().GetAll(nullptr, max
);
787 void WritePreparedTxnDB::ReleaseSnapshotInternal(
788 const SequenceNumber snap_seq
) {
789 // TODO(myabandeh): relax should enough since the synchronizatin is already
790 // done by snapshots_mutex_ under which this function is called.
791 if (snap_seq
<= max_evicted_seq_
.load(std::memory_order_acquire
)) {
792 // Then this is a rare case that transaction did not finish before max
793 // advances. It is expected for a few read-only backup snapshots. For such
794 // snapshots we might have kept around a couple of entries in the
795 // old_commit_map_. Check and do garbage collection if that is the case.
796 bool need_gc
= false;
798 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD
);
799 ROCKS_LOG_WARN(info_log_
, "old_commit_map_mutex_ overhead for %" PRIu64
,
801 ReadLock
rl(&old_commit_map_mutex_
);
802 auto prep_set_entry
= old_commit_map_
.find(snap_seq
);
803 need_gc
= prep_set_entry
!= old_commit_map_
.end();
806 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD
);
807 ROCKS_LOG_WARN(info_log_
, "old_commit_map_mutex_ overhead for %" PRIu64
,
809 WriteLock
wl(&old_commit_map_mutex_
);
810 old_commit_map_
.erase(snap_seq
);
811 old_commit_map_empty_
.store(old_commit_map_
.empty(),
812 std::memory_order_release
);
817 void WritePreparedTxnDB::CleanupReleasedSnapshots(
818 const std::vector
<SequenceNumber
>& new_snapshots
,
819 const std::vector
<SequenceNumber
>& old_snapshots
) {
820 auto newi
= new_snapshots
.begin();
821 auto oldi
= old_snapshots
.begin();
822 for (; newi
!= new_snapshots
.end() && oldi
!= old_snapshots
.end();) {
823 assert(*newi
>= *oldi
); // cannot have new snapshots with lower seq
824 if (*newi
== *oldi
) { // still not released
826 while (newi
!= new_snapshots
.end() && *newi
== value
) {
829 while (oldi
!= old_snapshots
.end() && *oldi
== value
) {
833 assert(*newi
> *oldi
); // *oldi is released
834 ReleaseSnapshotInternal(*oldi
);
838 // Everything remained in old_snapshots is released and must be cleaned up
839 for (; oldi
!= old_snapshots
.end(); oldi
++) {
840 ReleaseSnapshotInternal(*oldi
);
844 void WritePreparedTxnDB::UpdateSnapshots(
845 const std::vector
<SequenceNumber
>& snapshots
,
846 const SequenceNumber
& version
) {
847 ROCKS_LOG_DETAILS(info_log_
, "UpdateSnapshots with version %" PRIu64
,
849 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
850 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
854 ROCKS_LOG_DETAILS(info_log_
, "snapshots_mutex_ overhead");
855 WriteLock
wl(&snapshots_mutex_
);
856 snapshots_version_
= version
;
857 // We update the list concurrently with the readers.
858 // Both new and old lists are sorted and the new list is subset of the
859 // previous list plus some new items. Thus if a snapshot repeats in
860 // both new and old lists, it will appear upper in the new list. So if
861 // we simply insert the new snapshots in order, if an overwritten item
862 // is still valid in the new list is either written to the same place in
863 // the array or it is written in a higher palce before it gets
864 // overwritten by another item. This guarantess a reader that reads the
865 // list bottom-up will eventaully see a snapshot that repeats in the
866 // update, either before it gets overwritten by the writer or
869 auto it
= snapshots
.begin();
870 for (; it
!= snapshots
.end() && i
< SNAPSHOT_CACHE_SIZE
; ++it
, ++i
) {
871 snapshot_cache_
[i
].store(*it
, std::memory_order_release
);
872 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i
);
873 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i
);
876 // Release the remaining sync points since they are useless given that the
877 // reader would also use lock to access snapshots
878 for (++sync_i
; sync_i
<= 10; ++sync_i
) {
879 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i
);
880 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i
);
884 for (; it
!= snapshots
.end(); ++it
) {
885 // Insert them to a vector that is less efficient to access
887 snapshots_
.push_back(*it
);
889 // Update the size at the end. Otherwise a parallel reader might read
890 // items that are not set yet.
891 snapshots_total_
.store(snapshots
.size(), std::memory_order_release
);
893 // Note: this must be done after the snapshots data structures are updated
894 // with the new list of snapshots.
895 CleanupReleasedSnapshots(snapshots
, snapshots_all_
);
896 snapshots_all_
= snapshots
;
898 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
899 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
902 void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry
& evicted
) {
903 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
904 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
908 // First check the snapshot cache that is efficient for concurrent access
909 auto cnt
= snapshots_total_
.load(std::memory_order_acquire
);
910 // The list might get updated concurrently as we are reading from it. The
911 // reader should be able to read all the snapshots that are still valid
912 // after the update. Since the survived snapshots are written in a higher
913 // place before gets overwritten the reader that reads bottom-up will
915 const bool next_is_larger
= true;
916 // We will set to true if the border line snapshot suggests that.
917 bool search_larger_list
= false;
918 size_t ip1
= std::min(cnt
, SNAPSHOT_CACHE_SIZE
);
919 for (; 0 < ip1
; ip1
--) {
920 SequenceNumber snapshot_seq
=
921 snapshot_cache_
[ip1
- 1].load(std::memory_order_acquire
);
922 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
924 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i
);
925 if (ip1
== SNAPSHOT_CACHE_SIZE
) { // border line snapshot
926 // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
927 // then later also continue the search to larger snapshots
928 search_larger_list
= snapshot_seq
< evicted
.commit_seq
;
930 if (!MaybeUpdateOldCommitMap(evicted
.prep_seq
, evicted
.commit_seq
,
931 snapshot_seq
, !next_is_larger
)) {
936 // Release the remaining sync points before accquiring the lock
937 for (++sync_i
; sync_i
<= 10; ++sync_i
) {
938 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i
);
939 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i
);
942 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
943 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
944 if (UNLIKELY(SNAPSHOT_CACHE_SIZE
< cnt
&& search_larger_list
)) {
945 // Then access the less efficient list of snapshots_
946 WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD
);
947 ROCKS_LOG_WARN(info_log_
,
948 "snapshots_mutex_ overhead for <%" PRIu64
",%" PRIu64
949 "> with %" ROCKSDB_PRIszt
" snapshots",
950 evicted
.prep_seq
, evicted
.commit_seq
, cnt
);
951 ReadLock
rl(&snapshots_mutex_
);
952 // Items could have moved from the snapshots_ to snapshot_cache_ before
953 // accquiring the lock. To make sure that we do not miss a valid snapshot,
954 // read snapshot_cache_ again while holding the lock.
955 for (size_t i
= 0; i
< SNAPSHOT_CACHE_SIZE
; i
++) {
956 SequenceNumber snapshot_seq
=
957 snapshot_cache_
[i
].load(std::memory_order_acquire
);
958 if (!MaybeUpdateOldCommitMap(evicted
.prep_seq
, evicted
.commit_seq
,
959 snapshot_seq
, next_is_larger
)) {
963 for (auto snapshot_seq_2
: snapshots_
) {
964 if (!MaybeUpdateOldCommitMap(evicted
.prep_seq
, evicted
.commit_seq
,
965 snapshot_seq_2
, next_is_larger
)) {
972 bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
973 const uint64_t& prep_seq
, const uint64_t& commit_seq
,
974 const uint64_t& snapshot_seq
, const bool next_is_larger
= true) {
975 // If we do not store an entry in old_commit_map_ we assume it is committed in
976 // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
977 // the snapshot so we need not to keep the entry around for this snapshot.
978 if (commit_seq
<= snapshot_seq
) {
979 // continue the search if the next snapshot could be smaller than commit_seq
980 return !next_is_larger
;
982 // then snapshot_seq < commit_seq
983 if (prep_seq
<= snapshot_seq
) { // overlapping range
984 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD
);
985 ROCKS_LOG_WARN(info_log_
,
986 "old_commit_map_mutex_ overhead for %" PRIu64
987 " commit entry: <%" PRIu64
",%" PRIu64
">",
988 snapshot_seq
, prep_seq
, commit_seq
);
989 WriteLock
wl(&old_commit_map_mutex_
);
990 old_commit_map_empty_
.store(false, std::memory_order_release
);
991 auto& vec
= old_commit_map_
[snapshot_seq
];
992 vec
.insert(std::upper_bound(vec
.begin(), vec
.end(), prep_seq
), prep_seq
);
993 // We need to store it once for each overlapping snapshot. Returning true to
994 // continue the search if there is more overlapping snapshot.
997 // continue the search if the next snapshot could be larger than prep_seq
998 return next_is_larger
;
1001 WritePreparedTxnDB::~WritePreparedTxnDB() {
1002 // At this point there could be running compaction/flush holding a
1003 // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
1004 // Make sure those jobs finished before destructing WritePreparedTxnDB.
1005 if (!db_impl_
->shutting_down_
) {
1006 db_impl_
->CancelAllBackgroundWork(true /*wait*/);
1010 void SubBatchCounter::InitWithComp(const uint32_t cf
) {
1011 auto cmp
= comparators_
[cf
];
1012 keys_
[cf
] = CFKeys(SetComparator(cmp
));
1015 void SubBatchCounter::AddKey(const uint32_t cf
, const Slice
& key
) {
1016 CFKeys
& cf_keys
= keys_
[cf
];
1017 if (cf_keys
.size() == 0) { // just inserted
1020 auto it
= cf_keys
.insert(key
);
1021 if (it
.second
== false) { // second is false if a element already existed.
1025 keys_
[cf
].insert(key
);
1029 } // namespace ROCKSDB_NAMESPACE
1030 #endif // ROCKSDB_LITE