1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/write_prepared_txn_db.h"
13 #include <unordered_set>
16 #include "db/arena_wrapped_db_iter.h"
17 #include "db/db_impl/db_impl.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/options.h"
20 #include "rocksdb/utilities/transaction_db.h"
21 #include "test_util/sync_point.h"
22 #include "util/cast_util.h"
23 #include "util/mutexlock.h"
24 #include "util/string_util.h"
25 #include "utilities/transactions/pessimistic_transaction.h"
26 #include "utilities/transactions/transaction_db_mutex_impl.h"
28 namespace ROCKSDB_NAMESPACE
{
30 Status
WritePreparedTxnDB::Initialize(
31 const std::vector
<size_t>& compaction_enabled_cf_indices
,
32 const std::vector
<ColumnFamilyHandle
*>& handles
) {
33 auto dbimpl
= static_cast_with_check
<DBImpl
>(GetRootDB());
34 assert(dbimpl
!= nullptr);
35 auto rtxns
= dbimpl
->recovered_transactions();
36 std::map
<SequenceNumber
, SequenceNumber
> ordered_seq_cnt
;
37 for (auto rtxn
: rtxns
) {
38 // There should only one batch for WritePrepared policy.
39 assert(rtxn
.second
->batches_
.size() == 1);
40 const auto& seq
= rtxn
.second
->batches_
.begin()->first
;
41 const auto& batch_info
= rtxn
.second
->batches_
.begin()->second
;
42 auto cnt
= batch_info
.batch_cnt_
? batch_info
.batch_cnt_
: 1;
43 ordered_seq_cnt
[seq
] = cnt
;
45 // AddPrepared must be called in order
46 for (auto seq_cnt
: ordered_seq_cnt
) {
47 auto seq
= seq_cnt
.first
;
48 auto cnt
= seq_cnt
.second
;
49 for (size_t i
= 0; i
< cnt
; i
++) {
53 SequenceNumber prev_max
= max_evicted_seq_
;
54 SequenceNumber last_seq
= db_impl_
->GetLatestSequenceNumber();
55 AdvanceMaxEvictedSeq(prev_max
, last_seq
);
56 // Create a gap between max and the next snapshot. This simplifies the logic
57 // in IsInSnapshot by not having to consider the special case of max ==
58 // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
60 db_impl_
->versions_
->SetLastAllocatedSequence(last_seq
+ 1);
61 db_impl_
->versions_
->SetLastSequence(last_seq
+ 1);
62 db_impl_
->versions_
->SetLastPublishedSequence(last_seq
+ 1);
65 db_impl_
->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
66 // A callback to commit a single sub-batch
67 class CommitSubBatchPreReleaseCallback
: public PreReleaseCallback
{
69 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB
* db
)
71 Status
Callback(SequenceNumber commit_seq
,
72 bool is_mem_disabled
__attribute__((__unused__
)), uint64_t,
73 size_t /*index*/, size_t /*total*/) override
{
74 assert(!is_mem_disabled
);
75 db_
->AddCommitted(commit_seq
, commit_seq
);
80 WritePreparedTxnDB
* db_
;
82 db_impl_
->SetRecoverableStatePreReleaseCallback(
83 new CommitSubBatchPreReleaseCallback(this));
85 auto s
= PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices
,
90 Status
WritePreparedTxnDB::VerifyCFOptions(
91 const ColumnFamilyOptions
& cf_options
) {
92 Status s
= PessimisticTransactionDB::VerifyCFOptions(cf_options
);
96 if (!cf_options
.memtable_factory
->CanHandleDuplicatedKey()) {
97 return Status::InvalidArgument(
98 "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
99 "WritePrpeared transactions");
104 Transaction
* WritePreparedTxnDB::BeginTransaction(
105 const WriteOptions
& write_options
, const TransactionOptions
& txn_options
,
106 Transaction
* old_txn
) {
107 if (old_txn
!= nullptr) {
108 ReinitializeTransaction(old_txn
, write_options
, txn_options
);
111 return new WritePreparedTxn(this, write_options
, txn_options
);
115 Status
WritePreparedTxnDB::Write(const WriteOptions
& opts
,
116 WriteBatch
* updates
) {
117 if (txn_db_options_
.skip_concurrency_control
) {
118 // Skip locking the rows
119 const size_t UNKNOWN_BATCH_CNT
= 0;
120 WritePreparedTxn
* NO_TXN
= nullptr;
121 return WriteInternal(opts
, updates
, UNKNOWN_BATCH_CNT
, NO_TXN
);
123 return PessimisticTransactionDB::WriteWithConcurrencyControl(opts
, updates
);
127 Status
WritePreparedTxnDB::Write(
128 const WriteOptions
& opts
,
129 const TransactionDBWriteOptimizations
& optimizations
, WriteBatch
* updates
) {
130 if (optimizations
.skip_concurrency_control
) {
131 // Skip locking the rows
132 const size_t UNKNOWN_BATCH_CNT
= 0;
133 const size_t ONE_BATCH_CNT
= 1;
134 const size_t batch_cnt
= optimizations
.skip_duplicate_key_check
137 WritePreparedTxn
* NO_TXN
= nullptr;
138 return WriteInternal(opts
, updates
, batch_cnt
, NO_TXN
);
140 // TODO(myabandeh): Make use of skip_duplicate_key_check hint
141 // Fall back to unoptimized version
142 return PessimisticTransactionDB::WriteWithConcurrencyControl(opts
, updates
);
146 Status
WritePreparedTxnDB::WriteInternal(const WriteOptions
& write_options_orig
,
147 WriteBatch
* batch
, size_t batch_cnt
,
148 WritePreparedTxn
* txn
) {
149 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
150 "CommitBatchInternal");
151 if (batch
->Count() == 0) {
152 // Otherwise our 1 seq per batch logic will break since there is no seq
153 // increased for this batch.
156 if (batch_cnt
== 0) { // not provided, then compute it
157 // TODO(myabandeh): add an option to allow user skipping this cost
158 SubBatchCounter
counter(*GetCFComparatorMap());
159 auto s
= batch
->Iterate(&counter
);
163 batch_cnt
= counter
.BatchCount();
164 WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD
);
165 ROCKS_LOG_DETAILS(info_log_
, "Duplicate key overhead: %" PRIu64
" batches",
166 static_cast<uint64_t>(batch_cnt
));
170 bool do_one_write
= !db_impl_
->immutable_db_options().two_write_queues
;
171 WriteOptions
write_options(write_options_orig
);
172 // In the absence of Prepare markers, use Noop as a batch separator
173 auto s
= WriteBatchInternal::InsertNoop(batch
);
175 const bool DISABLE_MEMTABLE
= true;
176 const uint64_t no_log_ref
= 0;
177 uint64_t seq_used
= kMaxSequenceNumber
;
178 const size_t ZERO_PREPARES
= 0;
179 const bool kSeperatePrepareCommitBatches
= true;
180 // Since this is not 2pc, there is no need for AddPrepared but having it in
181 // the PreReleaseCallback enables an optimization. Refer to
182 // SmallestUnCommittedSeq for more details.
183 AddPreparedCallback
add_prepared_callback(
184 this, db_impl_
, batch_cnt
,
185 db_impl_
->immutable_db_options().two_write_queues
,
186 !kSeperatePrepareCommitBatches
);
187 WritePreparedCommitEntryPreReleaseCallback
update_commit_map(
188 this, db_impl_
, kMaxSequenceNumber
, ZERO_PREPARES
, batch_cnt
);
189 PreReleaseCallback
* pre_release_callback
;
191 pre_release_callback
= &update_commit_map
;
193 pre_release_callback
= &add_prepared_callback
;
195 s
= db_impl_
->WriteImpl(write_options
, batch
, nullptr, nullptr, no_log_ref
,
196 !DISABLE_MEMTABLE
, &seq_used
, batch_cnt
,
197 pre_release_callback
);
198 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
199 uint64_t prepare_seq
= seq_used
;
200 if (txn
!= nullptr) {
201 txn
->SetId(prepare_seq
);
208 } // else do the 2nd write for commit
209 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
210 "CommitBatchInternal 2nd write prepare_seq: %" PRIu64
,
212 // Commit the batch by writing an empty batch to the 2nd queue that will
213 // release the commit sequence number to readers.
214 const size_t ZERO_COMMITS
= 0;
215 WritePreparedCommitEntryPreReleaseCallback
update_commit_map_with_prepare(
216 this, db_impl_
, prepare_seq
, batch_cnt
, ZERO_COMMITS
);
217 WriteBatch empty_batch
;
218 write_options
.disableWAL
= true;
219 write_options
.sync
= false;
220 const size_t ONE_BATCH
= 1; // Just to inc the seq
221 s
= db_impl_
->WriteImpl(write_options
, &empty_batch
, nullptr, nullptr,
222 no_log_ref
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
223 &update_commit_map_with_prepare
);
224 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
225 // Note: RemovePrepared is called from within PreReleaseCallback
229 Status
WritePreparedTxnDB::Get(const ReadOptions
& options
,
230 ColumnFamilyHandle
* column_family
,
231 const Slice
& key
, PinnableSlice
* value
) {
232 SequenceNumber min_uncommitted
, snap_seq
;
233 const SnapshotBackup backed_by_snapshot
=
234 AssignMinMaxSeqs(options
.snapshot
, &min_uncommitted
, &snap_seq
);
235 WritePreparedTxnReadCallback
callback(this, snap_seq
, min_uncommitted
,
237 bool* dont_care
= nullptr;
238 DBImpl::GetImplOptions get_impl_options
;
239 get_impl_options
.column_family
= column_family
;
240 get_impl_options
.value
= value
;
241 get_impl_options
.value_found
= dont_care
;
242 get_impl_options
.callback
= &callback
;
243 auto res
= db_impl_
->GetImpl(options
, key
, get_impl_options
);
244 if (LIKELY(callback
.valid() && ValidateSnapshot(callback
.max_visible_seq(),
245 backed_by_snapshot
))) {
248 WPRecordTick(TXN_GET_TRY_AGAIN
);
249 return Status::TryAgain();
253 void WritePreparedTxnDB::UpdateCFComparatorMap(
254 const std::vector
<ColumnFamilyHandle
*>& handles
) {
255 auto cf_map
= new std::map
<uint32_t, const Comparator
*>();
256 auto handle_map
= new std::map
<uint32_t, ColumnFamilyHandle
*>();
257 for (auto h
: handles
) {
258 auto id
= h
->GetID();
259 const Comparator
* comparator
= h
->GetComparator();
260 (*cf_map
)[id
] = comparator
;
262 (*handle_map
)[id
] = h
;
264 // The pointer to the default cf handle in the handles will be deleted.
265 // Use the pointer maintained by the db instead.
266 (*handle_map
)[id
] = DefaultColumnFamily();
269 cf_map_
.reset(cf_map
);
270 handle_map_
.reset(handle_map
);
273 void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle
* h
) {
274 auto old_cf_map_ptr
= cf_map_
.get();
275 assert(old_cf_map_ptr
);
276 auto cf_map
= new std::map
<uint32_t, const Comparator
*>(*old_cf_map_ptr
);
277 auto old_handle_map_ptr
= handle_map_
.get();
278 assert(old_handle_map_ptr
);
280 new std::map
<uint32_t, ColumnFamilyHandle
*>(*old_handle_map_ptr
);
281 auto id
= h
->GetID();
282 const Comparator
* comparator
= h
->GetComparator();
283 (*cf_map
)[id
] = comparator
;
284 (*handle_map
)[id
] = h
;
285 cf_map_
.reset(cf_map
);
286 handle_map_
.reset(handle_map
);
290 std::vector
<Status
> WritePreparedTxnDB::MultiGet(
291 const ReadOptions
& options
,
292 const std::vector
<ColumnFamilyHandle
*>& column_family
,
293 const std::vector
<Slice
>& keys
, std::vector
<std::string
>* values
) {
295 size_t num_keys
= keys
.size();
296 values
->resize(num_keys
);
298 std::vector
<Status
> stat_list(num_keys
);
299 for (size_t i
= 0; i
< num_keys
; ++i
) {
300 stat_list
[i
] = this->Get(options
, column_family
[i
], keys
[i
], &(*values
)[i
]);
305 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
306 struct WritePreparedTxnDB::IteratorState
{
307 IteratorState(WritePreparedTxnDB
* txn_db
, SequenceNumber sequence
,
308 std::shared_ptr
<ManagedSnapshot
> s
,
309 SequenceNumber min_uncommitted
)
310 : callback(txn_db
, sequence
, min_uncommitted
, kBackedByDBSnapshot
),
313 WritePreparedTxnReadCallback callback
;
314 std::shared_ptr
<ManagedSnapshot
> snapshot
;
318 static void CleanupWritePreparedTxnDBIterator(void* arg1
, void* /*arg2*/) {
319 delete reinterpret_cast<WritePreparedTxnDB::IteratorState
*>(arg1
);
321 } // anonymous namespace
323 Iterator
* WritePreparedTxnDB::NewIterator(const ReadOptions
& options
,
324 ColumnFamilyHandle
* column_family
) {
325 constexpr bool ALLOW_BLOB
= true;
326 constexpr bool ALLOW_REFRESH
= true;
327 std::shared_ptr
<ManagedSnapshot
> own_snapshot
= nullptr;
328 SequenceNumber snapshot_seq
= kMaxSequenceNumber
;
329 SequenceNumber min_uncommitted
= 0;
330 if (options
.snapshot
!= nullptr) {
331 snapshot_seq
= options
.snapshot
->GetSequenceNumber();
333 static_cast_with_check
<const SnapshotImpl
>(options
.snapshot
)
336 auto* snapshot
= GetSnapshot();
337 // We take a snapshot to make sure that the related data in the commit map
339 snapshot_seq
= snapshot
->GetSequenceNumber();
341 static_cast_with_check
<const SnapshotImpl
>(snapshot
)->min_uncommitted_
;
342 own_snapshot
= std::make_shared
<ManagedSnapshot
>(db_impl_
, snapshot
);
344 assert(snapshot_seq
!= kMaxSequenceNumber
);
346 static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
)->cfd();
348 new IteratorState(this, snapshot_seq
, own_snapshot
, min_uncommitted
);
350 db_impl_
->NewIteratorImpl(options
, cfd
, snapshot_seq
, &state
->callback
,
351 !ALLOW_BLOB
, !ALLOW_REFRESH
);
352 db_iter
->RegisterCleanup(CleanupWritePreparedTxnDBIterator
, state
, nullptr);
356 Status
WritePreparedTxnDB::NewIterators(
357 const ReadOptions
& options
,
358 const std::vector
<ColumnFamilyHandle
*>& column_families
,
359 std::vector
<Iterator
*>* iterators
) {
360 constexpr bool ALLOW_BLOB
= true;
361 constexpr bool ALLOW_REFRESH
= true;
362 std::shared_ptr
<ManagedSnapshot
> own_snapshot
= nullptr;
363 SequenceNumber snapshot_seq
= kMaxSequenceNumber
;
364 SequenceNumber min_uncommitted
= 0;
365 if (options
.snapshot
!= nullptr) {
366 snapshot_seq
= options
.snapshot
->GetSequenceNumber();
368 static_cast_with_check
<const SnapshotImpl
>(options
.snapshot
)
371 auto* snapshot
= GetSnapshot();
372 // We take a snapshot to make sure that the related data in the commit map
374 snapshot_seq
= snapshot
->GetSequenceNumber();
375 own_snapshot
= std::make_shared
<ManagedSnapshot
>(db_impl_
, snapshot
);
377 static_cast_with_check
<const SnapshotImpl
>(snapshot
)->min_uncommitted_
;
380 iterators
->reserve(column_families
.size());
381 for (auto* column_family
: column_families
) {
383 static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
)->cfd();
385 new IteratorState(this, snapshot_seq
, own_snapshot
, min_uncommitted
);
387 db_impl_
->NewIteratorImpl(options
, cfd
, snapshot_seq
, &state
->callback
,
388 !ALLOW_BLOB
, !ALLOW_REFRESH
);
389 db_iter
->RegisterCleanup(CleanupWritePreparedTxnDBIterator
, state
, nullptr);
390 iterators
->push_back(db_iter
);
395 void WritePreparedTxnDB::Init(const TransactionDBOptions
& /* unused */) {
396 // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
398 INC_STEP_FOR_MAX_EVICTED
=
399 std::max(COMMIT_CACHE_SIZE
/ 100, static_cast<size_t>(1));
400 snapshot_cache_
= std::unique_ptr
<std::atomic
<SequenceNumber
>[]>(
401 new std::atomic
<SequenceNumber
>[SNAPSHOT_CACHE_SIZE
] {});
402 commit_cache_
= std::unique_ptr
<std::atomic
<CommitEntry64b
>[]>(
403 new std::atomic
<CommitEntry64b
>[COMMIT_CACHE_SIZE
] {});
404 dummy_max_snapshot_
.number_
= kMaxSequenceNumber
;
407 void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max
,
409 // When max_evicted_seq_ advances, move older entries from prepared_txns_
410 // to delayed_prepared_. This guarantees that if a seq is lower than max,
411 // then it is not in prepared_txns_ and save an expensive, synchronized
412 // lookup from a shared set. delayed_prepared_ is expected to be empty in
416 "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64
,
417 prepared_txns_
.empty(),
418 prepared_txns_
.empty() ? 0 : prepared_txns_
.top());
419 const SequenceNumber prepared_top
= prepared_txns_
.top();
420 const bool empty
= prepared_top
== kMaxSequenceNumber
;
421 // Preliminary check to avoid the synchronization cost
422 if (!empty
&& prepared_top
<= new_max
) {
424 // Needed to avoid double locking in pop().
425 prepared_txns_
.push_pop_mutex()->Unlock();
427 WriteLock
wl(&prepared_mutex_
);
428 // Need to fetch fresh values of ::top after mutex is acquired
429 while (!prepared_txns_
.empty() && prepared_txns_
.top() <= new_max
) {
430 auto to_be_popped
= prepared_txns_
.top();
431 delayed_prepared_
.insert(to_be_popped
);
432 ROCKS_LOG_WARN(info_log_
,
433 "prepared_mutex_ overhead %" PRIu64
" (prep=%" PRIu64
435 static_cast<uint64_t>(delayed_prepared_
.size()),
436 to_be_popped
, new_max
);
437 delayed_prepared_empty_
.store(false, std::memory_order_release
);
438 // Update prepared_txns_ after updating delayed_prepared_empty_ otherwise
439 // there will be a point in time that the entry is neither in
440 // prepared_txns_ nor in delayed_prepared_, which will not be checked if
441 // delayed_prepared_empty_ is false.
442 prepared_txns_
.pop();
445 prepared_txns_
.push_pop_mutex()->Lock();
450 void WritePreparedTxnDB::AddPrepared(uint64_t seq
, bool locked
) {
451 ROCKS_LOG_DETAILS(info_log_
, "Txn %" PRIu64
" Preparing with max %" PRIu64
,
452 seq
, max_evicted_seq_
.load());
453 TEST_SYNC_POINT("AddPrepared::begin:pause");
454 TEST_SYNC_POINT("AddPrepared::begin:resume");
456 prepared_txns_
.push_pop_mutex()->Lock();
458 prepared_txns_
.push_pop_mutex()->AssertHeld();
459 prepared_txns_
.push(seq
);
460 auto new_max
= future_max_evicted_seq_
.load();
461 if (UNLIKELY(seq
<= new_max
)) {
462 // This should not happen in normal case
465 "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
468 CheckPreparedAgainstMax(new_max
, true /*locked*/);
471 prepared_txns_
.push_pop_mutex()->Unlock();
473 TEST_SYNC_POINT("AddPrepared::end");
476 void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq
, uint64_t commit_seq
,
478 ROCKS_LOG_DETAILS(info_log_
, "Txn %" PRIu64
" Committing with %" PRIu64
,
479 prepare_seq
, commit_seq
);
480 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
481 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
482 auto indexed_seq
= prepare_seq
% COMMIT_CACHE_SIZE
;
483 CommitEntry64b evicted_64b
;
485 bool to_be_evicted
= GetCommitEntry(indexed_seq
, &evicted_64b
, &evicted
);
486 if (LIKELY(to_be_evicted
)) {
487 assert(evicted
.prep_seq
!= prepare_seq
);
488 auto prev_max
= max_evicted_seq_
.load(std::memory_order_acquire
);
489 ROCKS_LOG_DETAILS(info_log_
,
490 "Evicting %" PRIu64
",%" PRIu64
" with max %" PRIu64
,
491 evicted
.prep_seq
, evicted
.commit_seq
, prev_max
);
492 if (prev_max
< evicted
.commit_seq
) {
493 auto last
= db_impl_
->GetLastPublishedSequence(); // could be 0
494 SequenceNumber max_evicted_seq
;
495 if (LIKELY(evicted
.commit_seq
< last
)) {
497 // Inc max in larger steps to avoid frequent updates
499 std::min(evicted
.commit_seq
+ INC_STEP_FOR_MAX_EVICTED
, last
- 1);
501 // legit when a commit entry in a write batch overwrite the previous one
502 max_evicted_seq
= evicted
.commit_seq
;
504 ROCKS_LOG_DETAILS(info_log_
,
505 "%lu Evicting %" PRIu64
",%" PRIu64
" with max %" PRIu64
507 prepare_seq
, evicted
.prep_seq
, evicted
.commit_seq
,
508 prev_max
, max_evicted_seq
);
509 AdvanceMaxEvictedSeq(prev_max
, max_evicted_seq
);
511 // After each eviction from commit cache, check if the commit entry should
512 // be kept around because it overlaps with a live snapshot.
513 CheckAgainstSnapshots(evicted
);
514 if (UNLIKELY(!delayed_prepared_empty_
.load(std::memory_order_acquire
))) {
515 WriteLock
wl(&prepared_mutex_
);
516 for (auto dp
: delayed_prepared_
) {
517 if (dp
== evicted
.prep_seq
) {
518 // This is a rare case that txn is committed but prepared_txns_ is not
519 // cleaned up yet. Refer to delayed_prepared_commits_ definition for
520 // why it should be kept updated.
521 delayed_prepared_commits_
[evicted
.prep_seq
] = evicted
.commit_seq
;
522 ROCKS_LOG_DEBUG(info_log_
,
523 "delayed_prepared_commits_[%" PRIu64
"]=%" PRIu64
,
524 evicted
.prep_seq
, evicted
.commit_seq
);
531 ExchangeCommitEntry(indexed_seq
, evicted_64b
, {prepare_seq
, commit_seq
});
532 if (UNLIKELY(!succ
)) {
533 ROCKS_LOG_ERROR(info_log_
,
534 "ExchangeCommitEntry failed on [%" PRIu64
"] %" PRIu64
535 ",%" PRIu64
" retrying...",
536 indexed_seq
, prepare_seq
, commit_seq
);
537 // A very rare event, in which the commit entry is updated before we do.
538 // Here we apply a very simple solution of retrying.
539 if (loop_cnt
> 100) {
540 throw std::runtime_error("Infinite loop in AddCommitted!");
542 AddCommitted(prepare_seq
, commit_seq
, ++loop_cnt
);
545 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
546 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
549 void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq
,
550 const size_t batch_cnt
) {
551 TEST_SYNC_POINT_CALLBACK(
552 "RemovePrepared:Start",
553 const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq
)));
554 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause");
555 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume");
556 ROCKS_LOG_DETAILS(info_log_
,
557 "RemovePrepared %" PRIu64
" cnt: %" ROCKSDB_PRIszt
,
558 prepare_seq
, batch_cnt
);
559 WriteLock
wl(&prepared_mutex_
);
560 for (size_t i
= 0; i
< batch_cnt
; i
++) {
561 prepared_txns_
.erase(prepare_seq
+ i
);
562 bool was_empty
= delayed_prepared_
.empty();
564 delayed_prepared_
.erase(prepare_seq
+ i
);
565 auto it
= delayed_prepared_commits_
.find(prepare_seq
+ i
);
566 if (it
!= delayed_prepared_commits_
.end()) {
567 ROCKS_LOG_DETAILS(info_log_
, "delayed_prepared_commits_.erase %" PRIu64
,
569 delayed_prepared_commits_
.erase(it
);
571 bool is_empty
= delayed_prepared_
.empty();
572 if (was_empty
!= is_empty
) {
573 delayed_prepared_empty_
.store(is_empty
, std::memory_order_release
);
579 bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq
,
580 CommitEntry64b
* entry_64b
,
581 CommitEntry
* entry
) const {
582 *entry_64b
= commit_cache_
[static_cast<size_t>(indexed_seq
)].load(std::memory_order_acquire
);
583 bool valid
= entry_64b
->Parse(indexed_seq
, entry
, FORMAT
);
587 bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq
,
588 const CommitEntry
& new_entry
,
589 CommitEntry
* evicted_entry
) {
590 CommitEntry64b
new_entry_64b(new_entry
, FORMAT
);
591 CommitEntry64b evicted_entry_64b
= commit_cache_
[static_cast<size_t>(indexed_seq
)].exchange(
592 new_entry_64b
, std::memory_order_acq_rel
);
593 bool valid
= evicted_entry_64b
.Parse(indexed_seq
, evicted_entry
, FORMAT
);
597 bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq
,
598 CommitEntry64b
& expected_entry_64b
,
599 const CommitEntry
& new_entry
) {
600 auto& atomic_entry
= commit_cache_
[static_cast<size_t>(indexed_seq
)];
601 CommitEntry64b
new_entry_64b(new_entry
, FORMAT
);
602 bool succ
= atomic_entry
.compare_exchange_strong(
603 expected_entry_64b
, new_entry_64b
, std::memory_order_acq_rel
,
604 std::memory_order_acquire
);
608 void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber
& prev_max
,
609 const SequenceNumber
& new_max
) {
610 ROCKS_LOG_DETAILS(info_log_
,
611 "AdvanceMaxEvictedSeq overhead %" PRIu64
" => %" PRIu64
,
613 // Declare the intention before getting snapshot from the DB. This helps a
614 // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
615 // it has not already. Otherwise the new snapshot is when we ask DB for
616 // snapshots smaller than future max.
617 auto updated_future_max
= prev_max
;
618 while (updated_future_max
< new_max
&&
619 !future_max_evicted_seq_
.compare_exchange_weak(
620 updated_future_max
, new_max
, std::memory_order_acq_rel
,
621 std::memory_order_relaxed
)) {
624 CheckPreparedAgainstMax(new_max
, false /*locked*/);
626 // With each change to max_evicted_seq_ fetch the live snapshots behind it.
627 // We use max as the version of snapshots to identify how fresh are the
628 // snapshot list. This works because the snapshots are between 0 and
629 // max, so the larger the max, the more complete they are.
630 SequenceNumber new_snapshots_version
= new_max
;
631 std::vector
<SequenceNumber
> snapshots
;
632 bool update_snapshots
= false;
633 if (new_snapshots_version
> snapshots_version_
) {
634 // This is to avoid updating the snapshots_ if it already updated
635 // with a more recent vesion by a concrrent thread
636 update_snapshots
= true;
637 // We only care about snapshots lower then max
638 snapshots
= GetSnapshotListFromDB(new_max
);
640 if (update_snapshots
) {
641 UpdateSnapshots(snapshots
, new_snapshots_version
);
642 if (!snapshots
.empty()) {
643 WriteLock
wl(&old_commit_map_mutex_
);
644 for (auto snap
: snapshots
) {
645 // This allows IsInSnapshot to tell apart the reads from in valid
646 // snapshots from the reads from committed values in valid snapshots.
647 old_commit_map_
[snap
];
649 old_commit_map_empty_
.store(false, std::memory_order_release
);
652 auto updated_prev_max
= prev_max
;
653 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause");
654 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume");
655 while (updated_prev_max
< new_max
&&
656 !max_evicted_seq_
.compare_exchange_weak(updated_prev_max
, new_max
,
657 std::memory_order_acq_rel
,
658 std::memory_order_relaxed
)) {
662 const Snapshot
* WritePreparedTxnDB::GetSnapshot() {
663 const bool kForWWConflictCheck
= true;
664 return GetSnapshotInternal(!kForWWConflictCheck
);
667 SnapshotImpl
* WritePreparedTxnDB::GetSnapshotInternal(
668 bool for_ww_conflict_check
) {
669 // Note: for this optimization setting the last sequence number and obtaining
670 // the smallest uncommitted seq should be done atomically. However to avoid
671 // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
672 // snapshot. Since we always updated the list of unprepared seq (via
673 // AddPrepared) AFTER the last sequence is updated, this guarantees that the
674 // smallest uncommitted seq that we pair with the snapshot is smaller or equal
675 // the value that would be obtained otherwise atomically. That is ok since
676 // this optimization works as long as min_uncommitted is less than or equal
677 // than the smallest uncommitted seq when the snapshot was taken.
678 auto min_uncommitted
= WritePreparedTxnDB::SmallestUnCommittedSeq();
679 SnapshotImpl
* snap_impl
= db_impl_
->GetSnapshotImpl(for_ww_conflict_check
);
680 TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:first");
682 SequenceNumber snap_seq
= snap_impl
->GetSequenceNumber();
683 // Note: Check against future_max_evicted_seq_ (in contrast with
684 // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq.
685 if (UNLIKELY(snap_seq
!= 0 && snap_seq
<= future_max_evicted_seq_
)) {
686 // There is a very rare case in which the commit entry evicts another commit
687 // entry that is not published yet thus advancing max evicted seq beyond the
688 // last published seq. This case is not likely in real-world setup so we
689 // handle it with a few retries.
692 while ((max
= future_max_evicted_seq_
.load()) != 0 &&
693 snap_impl
->GetSequenceNumber() <= max
&& retry
< 100) {
694 ROCKS_LOG_WARN(info_log_
,
695 "GetSnapshot snap: %" PRIu64
" max: %" PRIu64
696 " retry %" ROCKSDB_PRIszt
,
697 snap_impl
->GetSequenceNumber(), max
, retry
);
698 ReleaseSnapshot(snap_impl
);
699 // Wait for last visible seq to catch up with max, and also go beyond it
702 snap_impl
= db_impl_
->GetSnapshotImpl(for_ww_conflict_check
);
706 assert(snap_impl
->GetSequenceNumber() > max
);
707 if (snap_impl
->GetSequenceNumber() <= max
) {
708 throw std::runtime_error(
709 "Snapshot seq " + ToString(snap_impl
->GetSequenceNumber()) +
710 " after " + ToString(retry
) +
711 " retries is still less than futre_max_evicted_seq_" + ToString(max
));
714 EnhanceSnapshot(snap_impl
, min_uncommitted
);
716 db_impl_
->immutable_db_options().info_log
,
717 "GetSnapshot %" PRIu64
" ww:%" PRIi32
" min_uncommitted: %" PRIu64
,
718 snap_impl
->GetSequenceNumber(), for_ww_conflict_check
, min_uncommitted
);
719 TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:end");
723 void WritePreparedTxnDB::AdvanceSeqByOne() {
724 // Inserting an empty value will i) let the max evicted entry to be
725 // published, i.e., max == last_published, increase the last published to
726 // be one beyond max, i.e., max < last_published.
727 WriteOptions woptions
;
728 TransactionOptions txn_options
;
729 Transaction
* txn0
= BeginTransaction(woptions
, txn_options
, nullptr);
730 std::hash
<std::thread::id
> hasher
;
732 snprintf(name
, 64, "txn%" ROCKSDB_PRIszt
, hasher(std::this_thread::get_id()));
733 assert(strlen(name
) < 64 - 1);
734 Status s
= txn0
->SetName(name
);
737 // Without prepare it would simply skip the commit
748 const std::vector
<SequenceNumber
> WritePreparedTxnDB::GetSnapshotListFromDB(
749 SequenceNumber max
) {
750 ROCKS_LOG_DETAILS(info_log_
, "GetSnapshotListFromDB with max %" PRIu64
, max
);
751 InstrumentedMutexLock
dblock(db_impl_
->mutex());
752 db_impl_
->mutex()->AssertHeld();
753 return db_impl_
->snapshots().GetAll(nullptr, max
);
756 void WritePreparedTxnDB::ReleaseSnapshotInternal(
757 const SequenceNumber snap_seq
) {
758 // TODO(myabandeh): relax should enough since the synchronizatin is already
759 // done by snapshots_mutex_ under which this function is called.
760 if (snap_seq
<= max_evicted_seq_
.load(std::memory_order_acquire
)) {
761 // Then this is a rare case that transaction did not finish before max
762 // advances. It is expected for a few read-only backup snapshots. For such
763 // snapshots we might have kept around a couple of entries in the
764 // old_commit_map_. Check and do garbage collection if that is the case.
765 bool need_gc
= false;
767 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD
);
768 ROCKS_LOG_WARN(info_log_
, "old_commit_map_mutex_ overhead for %" PRIu64
,
770 ReadLock
rl(&old_commit_map_mutex_
);
771 auto prep_set_entry
= old_commit_map_
.find(snap_seq
);
772 need_gc
= prep_set_entry
!= old_commit_map_
.end();
775 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD
);
776 ROCKS_LOG_WARN(info_log_
, "old_commit_map_mutex_ overhead for %" PRIu64
,
778 WriteLock
wl(&old_commit_map_mutex_
);
779 old_commit_map_
.erase(snap_seq
);
780 old_commit_map_empty_
.store(old_commit_map_
.empty(),
781 std::memory_order_release
);
786 void WritePreparedTxnDB::CleanupReleasedSnapshots(
787 const std::vector
<SequenceNumber
>& new_snapshots
,
788 const std::vector
<SequenceNumber
>& old_snapshots
) {
789 auto newi
= new_snapshots
.begin();
790 auto oldi
= old_snapshots
.begin();
791 for (; newi
!= new_snapshots
.end() && oldi
!= old_snapshots
.end();) {
792 assert(*newi
>= *oldi
); // cannot have new snapshots with lower seq
793 if (*newi
== *oldi
) { // still not released
795 while (newi
!= new_snapshots
.end() && *newi
== value
) {
798 while (oldi
!= old_snapshots
.end() && *oldi
== value
) {
802 assert(*newi
> *oldi
); // *oldi is released
803 ReleaseSnapshotInternal(*oldi
);
807 // Everything remained in old_snapshots is released and must be cleaned up
808 for (; oldi
!= old_snapshots
.end(); oldi
++) {
809 ReleaseSnapshotInternal(*oldi
);
813 void WritePreparedTxnDB::UpdateSnapshots(
814 const std::vector
<SequenceNumber
>& snapshots
,
815 const SequenceNumber
& version
) {
816 ROCKS_LOG_DETAILS(info_log_
, "UpdateSnapshots with version %" PRIu64
,
818 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
819 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
823 ROCKS_LOG_DETAILS(info_log_
, "snapshots_mutex_ overhead");
824 WriteLock
wl(&snapshots_mutex_
);
825 snapshots_version_
= version
;
826 // We update the list concurrently with the readers.
827 // Both new and old lists are sorted and the new list is subset of the
828 // previous list plus some new items. Thus if a snapshot repeats in
829 // both new and old lists, it will appear upper in the new list. So if
830 // we simply insert the new snapshots in order, if an overwritten item
831 // is still valid in the new list is either written to the same place in
832 // the array or it is written in a higher palce before it gets
833 // overwritten by another item. This guarantess a reader that reads the
834 // list bottom-up will eventaully see a snapshot that repeats in the
835 // update, either before it gets overwritten by the writer or
838 auto it
= snapshots
.begin();
839 for (; it
!= snapshots
.end() && i
< SNAPSHOT_CACHE_SIZE
; ++it
, ++i
) {
840 snapshot_cache_
[i
].store(*it
, std::memory_order_release
);
841 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i
);
842 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i
);
845 // Release the remaining sync points since they are useless given that the
846 // reader would also use lock to access snapshots
847 for (++sync_i
; sync_i
<= 10; ++sync_i
) {
848 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i
);
849 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i
);
853 for (; it
!= snapshots
.end(); ++it
) {
854 // Insert them to a vector that is less efficient to access
856 snapshots_
.push_back(*it
);
858 // Update the size at the end. Otherwise a parallel reader might read
859 // items that are not set yet.
860 snapshots_total_
.store(snapshots
.size(), std::memory_order_release
);
862 // Note: this must be done after the snapshots data structures are updated
863 // with the new list of snapshots.
864 CleanupReleasedSnapshots(snapshots
, snapshots_all_
);
865 snapshots_all_
= snapshots
;
867 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
868 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
871 void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry
& evicted
) {
872 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
873 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
877 // First check the snapshot cache that is efficient for concurrent access
878 auto cnt
= snapshots_total_
.load(std::memory_order_acquire
);
879 // The list might get updated concurrently as we are reading from it. The
880 // reader should be able to read all the snapshots that are still valid
881 // after the update. Since the survived snapshots are written in a higher
882 // place before gets overwritten the reader that reads bottom-up will
884 const bool next_is_larger
= true;
885 // We will set to true if the border line snapshot suggests that.
886 bool search_larger_list
= false;
887 size_t ip1
= std::min(cnt
, SNAPSHOT_CACHE_SIZE
);
888 for (; 0 < ip1
; ip1
--) {
889 SequenceNumber snapshot_seq
=
890 snapshot_cache_
[ip1
- 1].load(std::memory_order_acquire
);
891 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
893 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i
);
894 if (ip1
== SNAPSHOT_CACHE_SIZE
) { // border line snapshot
895 // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
896 // then later also continue the search to larger snapshots
897 search_larger_list
= snapshot_seq
< evicted
.commit_seq
;
899 if (!MaybeUpdateOldCommitMap(evicted
.prep_seq
, evicted
.commit_seq
,
900 snapshot_seq
, !next_is_larger
)) {
905 // Release the remaining sync points before accquiring the lock
906 for (++sync_i
; sync_i
<= 10; ++sync_i
) {
907 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i
);
908 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i
);
911 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
912 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
913 if (UNLIKELY(SNAPSHOT_CACHE_SIZE
< cnt
&& search_larger_list
)) {
914 // Then access the less efficient list of snapshots_
915 WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD
);
916 ROCKS_LOG_WARN(info_log_
,
917 "snapshots_mutex_ overhead for <%" PRIu64
",%" PRIu64
918 "> with %" ROCKSDB_PRIszt
" snapshots",
919 evicted
.prep_seq
, evicted
.commit_seq
, cnt
);
920 ReadLock
rl(&snapshots_mutex_
);
921 // Items could have moved from the snapshots_ to snapshot_cache_ before
922 // accquiring the lock. To make sure that we do not miss a valid snapshot,
923 // read snapshot_cache_ again while holding the lock.
924 for (size_t i
= 0; i
< SNAPSHOT_CACHE_SIZE
; i
++) {
925 SequenceNumber snapshot_seq
=
926 snapshot_cache_
[i
].load(std::memory_order_acquire
);
927 if (!MaybeUpdateOldCommitMap(evicted
.prep_seq
, evicted
.commit_seq
,
928 snapshot_seq
, next_is_larger
)) {
932 for (auto snapshot_seq_2
: snapshots_
) {
933 if (!MaybeUpdateOldCommitMap(evicted
.prep_seq
, evicted
.commit_seq
,
934 snapshot_seq_2
, next_is_larger
)) {
941 bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
942 const uint64_t& prep_seq
, const uint64_t& commit_seq
,
943 const uint64_t& snapshot_seq
, const bool next_is_larger
= true) {
944 // If we do not store an entry in old_commit_map_ we assume it is committed in
945 // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
946 // the snapshot so we need not to keep the entry around for this snapshot.
947 if (commit_seq
<= snapshot_seq
) {
948 // continue the search if the next snapshot could be smaller than commit_seq
949 return !next_is_larger
;
951 // then snapshot_seq < commit_seq
952 if (prep_seq
<= snapshot_seq
) { // overlapping range
953 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD
);
954 ROCKS_LOG_WARN(info_log_
,
955 "old_commit_map_mutex_ overhead for %" PRIu64
956 " commit entry: <%" PRIu64
",%" PRIu64
">",
957 snapshot_seq
, prep_seq
, commit_seq
);
958 WriteLock
wl(&old_commit_map_mutex_
);
959 old_commit_map_empty_
.store(false, std::memory_order_release
);
960 auto& vec
= old_commit_map_
[snapshot_seq
];
961 vec
.insert(std::upper_bound(vec
.begin(), vec
.end(), prep_seq
), prep_seq
);
962 // We need to store it once for each overlapping snapshot. Returning true to
963 // continue the search if there is more overlapping snapshot.
966 // continue the search if the next snapshot could be larger than prep_seq
967 return next_is_larger
;
970 WritePreparedTxnDB::~WritePreparedTxnDB() {
971 // At this point there could be running compaction/flush holding a
972 // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
973 // Make sure those jobs finished before destructing WritePreparedTxnDB.
974 if (!db_impl_
->shutting_down_
) {
975 db_impl_
->CancelAllBackgroundWork(true /*wait*/);
979 void SubBatchCounter::InitWithComp(const uint32_t cf
) {
980 auto cmp
= comparators_
[cf
];
981 keys_
[cf
] = CFKeys(SetComparator(cmp
));
984 void SubBatchCounter::AddKey(const uint32_t cf
, const Slice
& key
) {
985 CFKeys
& cf_keys
= keys_
[cf
];
986 if (cf_keys
.size() == 0) { // just inserted
989 auto it
= cf_keys
.insert(key
);
990 if (it
.second
== false) { // second is false if a element already existed.
994 keys_
[cf
].insert(key
);
998 } // namespace ROCKSDB_NAMESPACE
999 #endif // ROCKSDB_LITE