1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
12 #include "utilities/transactions/write_prepared_txn_db.h"
17 #include <unordered_set>
20 #include "db/db_impl.h"
21 #include "rocksdb/db.h"
22 #include "rocksdb/options.h"
23 #include "rocksdb/utilities/transaction_db.h"
24 #include "util/cast_util.h"
25 #include "util/mutexlock.h"
26 #include "util/string_util.h"
27 #include "util/sync_point.h"
28 #include "utilities/transactions/pessimistic_transaction.h"
29 #include "utilities/transactions/transaction_db_mutex_impl.h"
33 Status
WritePreparedTxnDB::Initialize(
34 const std::vector
<size_t>& compaction_enabled_cf_indices
,
35 const std::vector
<ColumnFamilyHandle
*>& handles
) {
36 auto dbimpl
= reinterpret_cast<DBImpl
*>(GetRootDB());
37 assert(dbimpl
!= nullptr);
38 auto rtxns
= dbimpl
->recovered_transactions();
39 for (auto rtxn
: rtxns
) {
40 // There should only one batch for WritePrepared policy.
41 assert(rtxn
.second
->batches_
.size() == 1);
42 const auto& seq
= rtxn
.second
->batches_
.begin()->first
;
43 const auto& batch_info
= rtxn
.second
->batches_
.begin()->second
;
44 auto cnt
= batch_info
.batch_cnt_
? batch_info
.batch_cnt_
: 1;
45 for (size_t i
= 0; i
< cnt
; i
++) {
49 SequenceNumber prev_max
= max_evicted_seq_
;
50 SequenceNumber last_seq
= db_impl_
->GetLatestSequenceNumber();
51 AdvanceMaxEvictedSeq(prev_max
, last_seq
);
53 db_impl_
->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
54 // A callback to commit a single sub-batch
55 class CommitSubBatchPreReleaseCallback
: public PreReleaseCallback
{
57 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB
* db
)
59 virtual Status
Callback(SequenceNumber commit_seq
,
60 bool is_mem_disabled
) override
{
62 (void)is_mem_disabled
;
64 assert(!is_mem_disabled
);
65 db_
->AddCommitted(commit_seq
, commit_seq
);
70 WritePreparedTxnDB
* db_
;
72 db_impl_
->SetRecoverableStatePreReleaseCallback(
73 new CommitSubBatchPreReleaseCallback(this));
75 auto s
= PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices
,
80 Status
WritePreparedTxnDB::VerifyCFOptions(
81 const ColumnFamilyOptions
& cf_options
) {
82 Status s
= PessimisticTransactionDB::VerifyCFOptions(cf_options
);
86 if (!cf_options
.memtable_factory
->CanHandleDuplicatedKey()) {
87 return Status::InvalidArgument(
88 "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
89 "WritePrpeared transactions");
94 Transaction
* WritePreparedTxnDB::BeginTransaction(
95 const WriteOptions
& write_options
, const TransactionOptions
& txn_options
,
96 Transaction
* old_txn
) {
97 if (old_txn
!= nullptr) {
98 ReinitializeTransaction(old_txn
, write_options
, txn_options
);
101 return new WritePreparedTxn(this, write_options
, txn_options
);
105 Status
WritePreparedTxnDB::Write(
106 const WriteOptions
& opts
,
107 const TransactionDBWriteOptimizations
& optimizations
, WriteBatch
* updates
) {
108 if (optimizations
.skip_concurrency_control
) {
109 // Skip locking the rows
110 const size_t UNKNOWN_BATCH_CNT
= 0;
111 const size_t ONE_BATCH_CNT
= 1;
112 const size_t batch_cnt
= optimizations
.skip_duplicate_key_check
115 WritePreparedTxn
* NO_TXN
= nullptr;
116 return WriteInternal(opts
, updates
, batch_cnt
, NO_TXN
);
118 // TODO(myabandeh): Make use of skip_duplicate_key_check hint
119 // Fall back to unoptimized version
120 return PessimisticTransactionDB::Write(opts
, updates
);
124 Status
WritePreparedTxnDB::WriteInternal(const WriteOptions
& write_options_orig
,
125 WriteBatch
* batch
, size_t batch_cnt
,
126 WritePreparedTxn
* txn
) {
127 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
128 "CommitBatchInternal");
129 if (batch
->Count() == 0) {
130 // Otherwise our 1 seq per batch logic will break since there is no seq
131 // increased for this batch.
134 if (batch_cnt
== 0) { // not provided, then compute it
135 // TODO(myabandeh): add an option to allow user skipping this cost
136 SubBatchCounter
counter(*GetCFComparatorMap());
137 auto s
= batch
->Iterate(&counter
);
139 batch_cnt
= counter
.BatchCount();
140 WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD
);
141 ROCKS_LOG_DETAILS(info_log_
, "Duplicate key overhead: %" PRIu64
" batches",
142 static_cast<uint64_t>(batch_cnt
));
146 bool do_one_write
= !db_impl_
->immutable_db_options().two_write_queues
;
147 WriteOptions
write_options(write_options_orig
);
148 bool sync
= write_options
.sync
;
150 // No need to sync on the first write
151 write_options
.sync
= false;
153 // In the absence of Prepare markers, use Noop as a batch separator
154 WriteBatchInternal::InsertNoop(batch
);
155 const bool DISABLE_MEMTABLE
= true;
156 const uint64_t no_log_ref
= 0;
157 uint64_t seq_used
= kMaxSequenceNumber
;
158 const size_t ZERO_PREPARES
= 0;
159 // Since this is not 2pc, there is no need for AddPrepared but having it in
160 // the PreReleaseCallback enables an optimization. Refer to
161 // SmallestUnCommittedSeq for more details.
162 AddPreparedCallback
add_prepared_callback(
163 this, batch_cnt
, db_impl_
->immutable_db_options().two_write_queues
);
164 WritePreparedCommitEntryPreReleaseCallback
update_commit_map(
165 this, db_impl_
, kMaxSequenceNumber
, ZERO_PREPARES
, batch_cnt
);
166 PreReleaseCallback
* pre_release_callback
;
168 pre_release_callback
= &update_commit_map
;
170 pre_release_callback
= &add_prepared_callback
;
172 auto s
= db_impl_
->WriteImpl(write_options
, batch
, nullptr, nullptr,
173 no_log_ref
, !DISABLE_MEMTABLE
, &seq_used
,
174 batch_cnt
, pre_release_callback
);
175 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
176 uint64_t prepare_seq
= seq_used
;
177 if (txn
!= nullptr) {
178 txn
->SetId(prepare_seq
);
185 } // else do the 2nd write for commit
186 // Set the original value of sync
187 write_options
.sync
= sync
;
188 ROCKS_LOG_DETAILS(db_impl_
->immutable_db_options().info_log
,
189 "CommitBatchInternal 2nd write prepare_seq: %" PRIu64
,
191 // Commit the batch by writing an empty batch to the 2nd queue that will
192 // release the commit sequence number to readers.
193 const size_t ZERO_COMMITS
= 0;
194 WritePreparedCommitEntryPreReleaseCallback
update_commit_map_with_prepare(
195 this, db_impl_
, prepare_seq
, batch_cnt
, ZERO_COMMITS
);
196 WriteBatch empty_batch
;
197 empty_batch
.PutLogData(Slice());
198 const size_t ONE_BATCH
= 1;
199 // In the absence of Prepare markers, use Noop as a batch separator
200 WriteBatchInternal::InsertNoop(&empty_batch
);
201 s
= db_impl_
->WriteImpl(write_options
, &empty_batch
, nullptr, nullptr,
202 no_log_ref
, DISABLE_MEMTABLE
, &seq_used
, ONE_BATCH
,
203 &update_commit_map_with_prepare
);
204 assert(!s
.ok() || seq_used
!= kMaxSequenceNumber
);
205 // Note RemovePrepared should be called after WriteImpl that publishsed the
206 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
207 RemovePrepared(prepare_seq
, batch_cnt
);
211 Status
WritePreparedTxnDB::Get(const ReadOptions
& options
,
212 ColumnFamilyHandle
* column_family
,
213 const Slice
& key
, PinnableSlice
* value
) {
214 // We are fine with the latest committed value. This could be done by
215 // specifying the snapshot as kMaxSequenceNumber.
216 SequenceNumber seq
= kMaxSequenceNumber
;
217 SequenceNumber min_uncommitted
= 0;
218 if (options
.snapshot
!= nullptr) {
219 seq
= options
.snapshot
->GetSequenceNumber();
220 min_uncommitted
= static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(
224 min_uncommitted
= SmallestUnCommittedSeq();
226 WritePreparedTxnReadCallback
callback(this, seq
, min_uncommitted
);
227 bool* dont_care
= nullptr;
228 // Note: no need to specify a snapshot for read options as no specific
229 // snapshot is requested by the user.
230 return db_impl_
->GetImpl(options
, column_family
, key
, value
, dont_care
,
234 void WritePreparedTxnDB::UpdateCFComparatorMap(
235 const std::vector
<ColumnFamilyHandle
*>& handles
) {
236 auto cf_map
= new std::map
<uint32_t, const Comparator
*>();
237 auto handle_map
= new std::map
<uint32_t, ColumnFamilyHandle
*>();
238 for (auto h
: handles
) {
239 auto id
= h
->GetID();
240 const Comparator
* comparator
= h
->GetComparator();
241 (*cf_map
)[id
] = comparator
;
243 (*handle_map
)[id
] = h
;
245 // The pointer to the default cf handle in the handles will be deleted.
246 // Use the pointer maintained by the db instead.
247 (*handle_map
)[id
] = DefaultColumnFamily();
250 cf_map_
.reset(cf_map
);
251 handle_map_
.reset(handle_map
);
254 void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle
* h
) {
255 auto old_cf_map_ptr
= cf_map_
.get();
256 assert(old_cf_map_ptr
);
257 auto cf_map
= new std::map
<uint32_t, const Comparator
*>(*old_cf_map_ptr
);
258 auto old_handle_map_ptr
= handle_map_
.get();
259 assert(old_handle_map_ptr
);
261 new std::map
<uint32_t, ColumnFamilyHandle
*>(*old_handle_map_ptr
);
262 auto id
= h
->GetID();
263 const Comparator
* comparator
= h
->GetComparator();
264 (*cf_map
)[id
] = comparator
;
265 (*handle_map
)[id
] = h
;
266 cf_map_
.reset(cf_map
);
267 handle_map_
.reset(handle_map
);
271 std::vector
<Status
> WritePreparedTxnDB::MultiGet(
272 const ReadOptions
& options
,
273 const std::vector
<ColumnFamilyHandle
*>& column_family
,
274 const std::vector
<Slice
>& keys
, std::vector
<std::string
>* values
) {
276 size_t num_keys
= keys
.size();
277 values
->resize(num_keys
);
279 std::vector
<Status
> stat_list(num_keys
);
280 for (size_t i
= 0; i
< num_keys
; ++i
) {
281 std::string
* value
= values
? &(*values
)[i
] : nullptr;
282 stat_list
[i
] = this->Get(options
, column_family
[i
], keys
[i
], value
);
287 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
288 struct WritePreparedTxnDB::IteratorState
{
289 IteratorState(WritePreparedTxnDB
* txn_db
, SequenceNumber sequence
,
290 std::shared_ptr
<ManagedSnapshot
> s
,
291 SequenceNumber min_uncommitted
)
292 : callback(txn_db
, sequence
, min_uncommitted
), snapshot(s
) {}
294 WritePreparedTxnReadCallback callback
;
295 std::shared_ptr
<ManagedSnapshot
> snapshot
;
299 static void CleanupWritePreparedTxnDBIterator(void* arg1
, void* /*arg2*/) {
300 delete reinterpret_cast<WritePreparedTxnDB::IteratorState
*>(arg1
);
302 } // anonymous namespace
304 Iterator
* WritePreparedTxnDB::NewIterator(const ReadOptions
& options
,
305 ColumnFamilyHandle
* column_family
) {
306 constexpr bool ALLOW_BLOB
= true;
307 constexpr bool ALLOW_REFRESH
= true;
308 std::shared_ptr
<ManagedSnapshot
> own_snapshot
= nullptr;
309 SequenceNumber snapshot_seq
= kMaxSequenceNumber
;
310 SequenceNumber min_uncommitted
= 0;
311 if (options
.snapshot
!= nullptr) {
312 snapshot_seq
= options
.snapshot
->GetSequenceNumber();
314 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(
318 auto* snapshot
= GetSnapshot();
319 // We take a snapshot to make sure that the related data in the commit map
321 snapshot_seq
= snapshot
->GetSequenceNumber();
323 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(snapshot
)
325 own_snapshot
= std::make_shared
<ManagedSnapshot
>(db_impl_
, snapshot
);
327 assert(snapshot_seq
!= kMaxSequenceNumber
);
328 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
)->cfd();
330 new IteratorState(this, snapshot_seq
, own_snapshot
, min_uncommitted
);
332 db_impl_
->NewIteratorImpl(options
, cfd
, snapshot_seq
, &state
->callback
,
333 !ALLOW_BLOB
, !ALLOW_REFRESH
);
334 db_iter
->RegisterCleanup(CleanupWritePreparedTxnDBIterator
, state
, nullptr);
338 Status
WritePreparedTxnDB::NewIterators(
339 const ReadOptions
& options
,
340 const std::vector
<ColumnFamilyHandle
*>& column_families
,
341 std::vector
<Iterator
*>* iterators
) {
342 constexpr bool ALLOW_BLOB
= true;
343 constexpr bool ALLOW_REFRESH
= true;
344 std::shared_ptr
<ManagedSnapshot
> own_snapshot
= nullptr;
345 SequenceNumber snapshot_seq
= kMaxSequenceNumber
;
346 SequenceNumber min_uncommitted
= 0;
347 if (options
.snapshot
!= nullptr) {
348 snapshot_seq
= options
.snapshot
->GetSequenceNumber();
349 min_uncommitted
= static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(
353 auto* snapshot
= GetSnapshot();
354 // We take a snapshot to make sure that the related data in the commit map
356 snapshot_seq
= snapshot
->GetSequenceNumber();
357 own_snapshot
= std::make_shared
<ManagedSnapshot
>(db_impl_
, snapshot
);
359 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(snapshot
)
363 iterators
->reserve(column_families
.size());
364 for (auto* column_family
: column_families
) {
365 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
)->cfd();
367 new IteratorState(this, snapshot_seq
, own_snapshot
, min_uncommitted
);
369 db_impl_
->NewIteratorImpl(options
, cfd
, snapshot_seq
, &state
->callback
,
370 !ALLOW_BLOB
, !ALLOW_REFRESH
);
371 db_iter
->RegisterCleanup(CleanupWritePreparedTxnDBIterator
, state
, nullptr);
372 iterators
->push_back(db_iter
);
377 void WritePreparedTxnDB::Init(const TransactionDBOptions
& /* unused */) {
378 // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
380 INC_STEP_FOR_MAX_EVICTED
=
381 std::max(COMMIT_CACHE_SIZE
/ 100, static_cast<size_t>(1));
382 snapshot_cache_
= unique_ptr
<std::atomic
<SequenceNumber
>[]>(
383 new std::atomic
<SequenceNumber
>[SNAPSHOT_CACHE_SIZE
] {});
384 commit_cache_
= unique_ptr
<std::atomic
<CommitEntry64b
>[]>(
385 new std::atomic
<CommitEntry64b
>[COMMIT_CACHE_SIZE
] {});
388 void WritePreparedTxnDB::AddPrepared(uint64_t seq
) {
389 ROCKS_LOG_DETAILS(info_log_
, "Txn %" PRIu64
" Prepareing", seq
);
390 assert(seq
> max_evicted_seq_
);
391 if (seq
<= max_evicted_seq_
) {
392 throw std::runtime_error(
393 "Added prepare_seq is larger than max_evicted_seq_: " + ToString(seq
) +
394 " <= " + ToString(max_evicted_seq_
.load()));
396 WriteLock
wl(&prepared_mutex_
);
397 prepared_txns_
.push(seq
);
400 void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq
, uint64_t commit_seq
,
402 ROCKS_LOG_DETAILS(info_log_
, "Txn %" PRIu64
" Committing with %" PRIu64
,
403 prepare_seq
, commit_seq
);
404 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
405 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
406 auto indexed_seq
= prepare_seq
% COMMIT_CACHE_SIZE
;
407 CommitEntry64b evicted_64b
;
409 bool to_be_evicted
= GetCommitEntry(indexed_seq
, &evicted_64b
, &evicted
);
410 if (LIKELY(to_be_evicted
)) {
411 assert(evicted
.prep_seq
!= prepare_seq
);
412 auto prev_max
= max_evicted_seq_
.load(std::memory_order_acquire
);
413 ROCKS_LOG_DETAILS(info_log_
,
414 "Evicting %" PRIu64
",%" PRIu64
" with max %" PRIu64
,
415 evicted
.prep_seq
, evicted
.commit_seq
, prev_max
);
416 if (prev_max
< evicted
.commit_seq
) {
417 // Inc max in larger steps to avoid frequent updates
418 auto max_evicted_seq
= evicted
.commit_seq
+ INC_STEP_FOR_MAX_EVICTED
;
419 AdvanceMaxEvictedSeq(prev_max
, max_evicted_seq
);
421 // After each eviction from commit cache, check if the commit entry should
422 // be kept around because it overlaps with a live snapshot.
423 CheckAgainstSnapshots(evicted
);
426 ExchangeCommitEntry(indexed_seq
, evicted_64b
, {prepare_seq
, commit_seq
});
427 if (UNLIKELY(!succ
)) {
428 ROCKS_LOG_ERROR(info_log_
,
429 "ExchangeCommitEntry failed on [%" PRIu64
"] %" PRIu64
430 ",%" PRIu64
" retrying...",
431 indexed_seq
, prepare_seq
, commit_seq
);
432 // A very rare event, in which the commit entry is updated before we do.
433 // Here we apply a very simple solution of retrying.
434 if (loop_cnt
> 100) {
435 throw std::runtime_error("Infinite loop in AddCommitted!");
437 AddCommitted(prepare_seq
, commit_seq
, ++loop_cnt
);
440 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
441 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
444 void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq
,
445 const size_t batch_cnt
) {
446 WriteLock
wl(&prepared_mutex_
);
447 for (size_t i
= 0; i
< batch_cnt
; i
++) {
448 prepared_txns_
.erase(prepare_seq
+ i
);
449 bool was_empty
= delayed_prepared_
.empty();
451 delayed_prepared_
.erase(prepare_seq
+ i
);
452 bool is_empty
= delayed_prepared_
.empty();
453 if (was_empty
!= is_empty
) {
454 delayed_prepared_empty_
.store(is_empty
, std::memory_order_release
);
460 bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq
,
461 CommitEntry64b
* entry_64b
,
462 CommitEntry
* entry
) const {
463 *entry_64b
= commit_cache_
[static_cast<size_t>(indexed_seq
)].load(std::memory_order_acquire
);
464 bool valid
= entry_64b
->Parse(indexed_seq
, entry
, FORMAT
);
468 bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq
,
469 const CommitEntry
& new_entry
,
470 CommitEntry
* evicted_entry
) {
471 CommitEntry64b
new_entry_64b(new_entry
, FORMAT
);
472 CommitEntry64b evicted_entry_64b
= commit_cache_
[static_cast<size_t>(indexed_seq
)].exchange(
473 new_entry_64b
, std::memory_order_acq_rel
);
474 bool valid
= evicted_entry_64b
.Parse(indexed_seq
, evicted_entry
, FORMAT
);
478 bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq
,
479 CommitEntry64b
& expected_entry_64b
,
480 const CommitEntry
& new_entry
) {
481 auto& atomic_entry
= commit_cache_
[static_cast<size_t>(indexed_seq
)];
482 CommitEntry64b
new_entry_64b(new_entry
, FORMAT
);
483 bool succ
= atomic_entry
.compare_exchange_strong(
484 expected_entry_64b
, new_entry_64b
, std::memory_order_acq_rel
,
485 std::memory_order_acquire
);
489 void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber
& prev_max
,
490 const SequenceNumber
& new_max
) {
491 ROCKS_LOG_DETAILS(info_log_
,
492 "AdvanceMaxEvictedSeq overhead %" PRIu64
" => %" PRIu64
,
494 // When max_evicted_seq_ advances, move older entries from prepared_txns_
495 // to delayed_prepared_. This guarantees that if a seq is lower than max,
496 // then it is not in prepared_txns_ ans save an expensive, synchronized
497 // lookup from a shared set. delayed_prepared_ is expected to be empty in
500 WriteLock
wl(&prepared_mutex_
);
501 while (!prepared_txns_
.empty() && prepared_txns_
.top() <= new_max
) {
502 auto to_be_popped
= prepared_txns_
.top();
503 delayed_prepared_
.insert(to_be_popped
);
504 ROCKS_LOG_WARN(info_log_
,
505 "prepared_mutex_ overhead %" PRIu64
" (prep=%" PRIu64
506 " new_max=%" PRIu64
" oldmax=%" PRIu64
,
507 static_cast<uint64_t>(delayed_prepared_
.size()),
508 to_be_popped
, new_max
, prev_max
);
509 prepared_txns_
.pop();
510 delayed_prepared_empty_
.store(false, std::memory_order_release
);
514 // With each change to max_evicted_seq_ fetch the live snapshots behind it.
515 // We use max as the version of snapshots to identify how fresh are the
516 // snapshot list. This works because the snapshots are between 0 and
517 // max, so the larger the max, the more complete they are.
518 SequenceNumber new_snapshots_version
= new_max
;
519 std::vector
<SequenceNumber
> snapshots
;
520 bool update_snapshots
= false;
521 if (new_snapshots_version
> snapshots_version_
) {
522 // This is to avoid updating the snapshots_ if it already updated
523 // with a more recent vesion by a concrrent thread
524 update_snapshots
= true;
525 // We only care about snapshots lower then max
526 snapshots
= GetSnapshotListFromDB(new_max
);
528 if (update_snapshots
) {
529 UpdateSnapshots(snapshots
, new_snapshots_version
);
531 auto updated_prev_max
= prev_max
;
532 while (updated_prev_max
< new_max
&&
533 !max_evicted_seq_
.compare_exchange_weak(updated_prev_max
, new_max
,
534 std::memory_order_acq_rel
,
535 std::memory_order_relaxed
)) {
539 const Snapshot
* WritePreparedTxnDB::GetSnapshot() {
540 // Note: SmallestUnCommittedSeq must be called before GetSnapshotImpl. Refer
541 // to WritePreparedTxn::SetSnapshot for more explanation.
542 auto min_uncommitted
= WritePreparedTxnDB::SmallestUnCommittedSeq();
543 const bool FOR_WW_CONFLICT_CHECK
= true;
544 SnapshotImpl
* snap_impl
= db_impl_
->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK
);
546 EnhanceSnapshot(snap_impl
, min_uncommitted
);
550 const std::vector
<SequenceNumber
> WritePreparedTxnDB::GetSnapshotListFromDB(
551 SequenceNumber max
) {
552 ROCKS_LOG_DETAILS(info_log_
, "GetSnapshotListFromDB with max %" PRIu64
, max
);
553 InstrumentedMutex(db_impl_
->mutex());
554 return db_impl_
->snapshots().GetAll(nullptr, max
);
557 void WritePreparedTxnDB::ReleaseSnapshot(const Snapshot
* snapshot
) {
558 auto snap_seq
= snapshot
->GetSequenceNumber();
559 ReleaseSnapshotInternal(snap_seq
);
560 db_impl_
->ReleaseSnapshot(snapshot
);
563 void WritePreparedTxnDB::ReleaseSnapshotInternal(
564 const SequenceNumber snap_seq
) {
565 // relax is enough since max increases monotonically, i.e., if snap_seq <
566 // old_max => snap_seq < new_max as well.
567 if (snap_seq
< max_evicted_seq_
.load(std::memory_order_relaxed
)) {
568 // Then this is a rare case that transaction did not finish before max
569 // advances. It is expected for a few read-only backup snapshots. For such
570 // snapshots we might have kept around a couple of entries in the
571 // old_commit_map_. Check and do garbage collection if that is the case.
572 bool need_gc
= false;
574 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD
);
575 ROCKS_LOG_WARN(info_log_
, "old_commit_map_mutex_ overhead");
576 ReadLock
rl(&old_commit_map_mutex_
);
577 auto prep_set_entry
= old_commit_map_
.find(snap_seq
);
578 need_gc
= prep_set_entry
!= old_commit_map_
.end();
581 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD
);
582 ROCKS_LOG_WARN(info_log_
, "old_commit_map_mutex_ overhead");
583 WriteLock
wl(&old_commit_map_mutex_
);
584 old_commit_map_
.erase(snap_seq
);
585 old_commit_map_empty_
.store(old_commit_map_
.empty(),
586 std::memory_order_release
);
591 void WritePreparedTxnDB::UpdateSnapshots(
592 const std::vector
<SequenceNumber
>& snapshots
,
593 const SequenceNumber
& version
) {
594 ROCKS_LOG_DETAILS(info_log_
, "UpdateSnapshots with version %" PRIu64
,
596 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
597 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
601 ROCKS_LOG_DETAILS(info_log_
, "snapshots_mutex_ overhead");
602 WriteLock
wl(&snapshots_mutex_
);
603 snapshots_version_
= version
;
604 // We update the list concurrently with the readers.
605 // Both new and old lists are sorted and the new list is subset of the
606 // previous list plus some new items. Thus if a snapshot repeats in
607 // both new and old lists, it will appear upper in the new list. So if
608 // we simply insert the new snapshots in order, if an overwritten item
609 // is still valid in the new list is either written to the same place in
610 // the array or it is written in a higher palce before it gets
611 // overwritten by another item. This guarantess a reader that reads the
612 // list bottom-up will eventaully see a snapshot that repeats in the
613 // update, either before it gets overwritten by the writer or
616 auto it
= snapshots
.begin();
617 for (; it
!= snapshots
.end() && i
< SNAPSHOT_CACHE_SIZE
; it
++, i
++) {
618 snapshot_cache_
[i
].store(*it
, std::memory_order_release
);
619 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i
);
620 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i
);
623 // Release the remaining sync points since they are useless given that the
624 // reader would also use lock to access snapshots
625 for (++sync_i
; sync_i
<= 10; ++sync_i
) {
626 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i
);
627 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i
);
631 for (; it
!= snapshots
.end(); it
++) {
632 // Insert them to a vector that is less efficient to access
634 snapshots_
.push_back(*it
);
636 // Update the size at the end. Otherwise a parallel reader might read
637 // items that are not set yet.
638 snapshots_total_
.store(snapshots
.size(), std::memory_order_release
);
639 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
640 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
643 void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry
& evicted
) {
644 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
645 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
649 // First check the snapshot cache that is efficient for concurrent access
650 auto cnt
= snapshots_total_
.load(std::memory_order_acquire
);
651 // The list might get updated concurrently as we are reading from it. The
652 // reader should be able to read all the snapshots that are still valid
653 // after the update. Since the survived snapshots are written in a higher
654 // place before gets overwritten the reader that reads bottom-up will
656 const bool next_is_larger
= true;
657 SequenceNumber snapshot_seq
= kMaxSequenceNumber
;
658 size_t ip1
= std::min(cnt
, SNAPSHOT_CACHE_SIZE
);
659 for (; 0 < ip1
; ip1
--) {
660 snapshot_seq
= snapshot_cache_
[ip1
- 1].load(std::memory_order_acquire
);
661 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
663 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i
);
664 if (!MaybeUpdateOldCommitMap(evicted
.prep_seq
, evicted
.commit_seq
,
665 snapshot_seq
, !next_is_larger
)) {
670 // Release the remaining sync points before accquiring the lock
671 for (++sync_i
; sync_i
<= 10; ++sync_i
) {
672 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i
);
673 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i
);
676 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
677 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
678 if (UNLIKELY(SNAPSHOT_CACHE_SIZE
< cnt
&& ip1
== SNAPSHOT_CACHE_SIZE
&&
679 snapshot_seq
< evicted
.prep_seq
)) {
680 // Then access the less efficient list of snapshots_
681 WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD
);
682 ROCKS_LOG_WARN(info_log_
, "snapshots_mutex_ overhead");
683 ReadLock
rl(&snapshots_mutex_
);
684 // Items could have moved from the snapshots_ to snapshot_cache_ before
685 // accquiring the lock. To make sure that we do not miss a valid snapshot,
686 // read snapshot_cache_ again while holding the lock.
687 for (size_t i
= 0; i
< SNAPSHOT_CACHE_SIZE
; i
++) {
688 snapshot_seq
= snapshot_cache_
[i
].load(std::memory_order_acquire
);
689 if (!MaybeUpdateOldCommitMap(evicted
.prep_seq
, evicted
.commit_seq
,
690 snapshot_seq
, next_is_larger
)) {
694 for (auto snapshot_seq_2
: snapshots_
) {
695 if (!MaybeUpdateOldCommitMap(evicted
.prep_seq
, evicted
.commit_seq
,
696 snapshot_seq_2
, next_is_larger
)) {
703 bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
704 const uint64_t& prep_seq
, const uint64_t& commit_seq
,
705 const uint64_t& snapshot_seq
, const bool next_is_larger
= true) {
706 // If we do not store an entry in old_commit_map_ we assume it is committed in
707 // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
708 // the snapshot so we need not to keep the entry around for this snapshot.
709 if (commit_seq
<= snapshot_seq
) {
710 // continue the search if the next snapshot could be smaller than commit_seq
711 return !next_is_larger
;
713 // then snapshot_seq < commit_seq
714 if (prep_seq
<= snapshot_seq
) { // overlapping range
715 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD
);
716 ROCKS_LOG_WARN(info_log_
, "old_commit_map_mutex_ overhead");
717 WriteLock
wl(&old_commit_map_mutex_
);
718 old_commit_map_empty_
.store(false, std::memory_order_release
);
719 auto& vec
= old_commit_map_
[snapshot_seq
];
720 vec
.insert(std::upper_bound(vec
.begin(), vec
.end(), prep_seq
), prep_seq
);
721 // We need to store it once for each overlapping snapshot. Returning true to
722 // continue the search if there is more overlapping snapshot.
725 // continue the search if the next snapshot could be larger than prep_seq
726 return next_is_larger
;
729 WritePreparedTxnDB::~WritePreparedTxnDB() {
730 // At this point there could be running compaction/flush holding a
731 // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
732 // Make sure those jobs finished before destructing WritePreparedTxnDB.
733 db_impl_
->CancelAllBackgroundWork(true /*wait*/);
736 void SubBatchCounter::InitWithComp(const uint32_t cf
) {
737 auto cmp
= comparators_
[cf
];
738 keys_
[cf
] = CFKeys(SetComparator(cmp
));
741 void SubBatchCounter::AddKey(const uint32_t cf
, const Slice
& key
) {
742 CFKeys
& cf_keys
= keys_
[cf
];
743 if (cf_keys
.size() == 0) { // just inserted
746 auto it
= cf_keys
.insert(key
);
747 if (it
.second
== false) { // second is false if a element already existed.
751 keys_
[cf
].insert(key
);
755 } // namespace rocksdb
756 #endif // ROCKSDB_LITE