]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn_db.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/write_prepared_txn_db.h"
9
10 #include <algorithm>
11 #include <cinttypes>
12 #include <string>
13 #include <unordered_set>
14 #include <vector>
15
16 #include "db/arena_wrapped_db_iter.h"
17 #include "db/db_impl/db_impl.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/options.h"
20 #include "rocksdb/utilities/transaction_db.h"
21 #include "test_util/sync_point.h"
22 #include "util/cast_util.h"
23 #include "util/mutexlock.h"
24 #include "util/string_util.h"
25 #include "utilities/transactions/pessimistic_transaction.h"
26 #include "utilities/transactions/transaction_db_mutex_impl.h"
27
28 namespace ROCKSDB_NAMESPACE {
29
30 Status WritePreparedTxnDB::Initialize(
31 const std::vector<size_t>& compaction_enabled_cf_indices,
32 const std::vector<ColumnFamilyHandle*>& handles) {
33 auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
34 assert(dbimpl != nullptr);
35 auto rtxns = dbimpl->recovered_transactions();
36 std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
37 for (auto rtxn : rtxns) {
38 // There should only one batch for WritePrepared policy.
39 assert(rtxn.second->batches_.size() == 1);
40 const auto& seq = rtxn.second->batches_.begin()->first;
41 const auto& batch_info = rtxn.second->batches_.begin()->second;
42 auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
43 ordered_seq_cnt[seq] = cnt;
44 }
45 // AddPrepared must be called in order
46 for (auto seq_cnt : ordered_seq_cnt) {
47 auto seq = seq_cnt.first;
48 auto cnt = seq_cnt.second;
49 for (size_t i = 0; i < cnt; i++) {
50 AddPrepared(seq + i);
51 }
52 }
53 SequenceNumber prev_max = max_evicted_seq_;
54 SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
55 AdvanceMaxEvictedSeq(prev_max, last_seq);
56 // Create a gap between max and the next snapshot. This simplifies the logic
57 // in IsInSnapshot by not having to consider the special case of max ==
58 // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
59 if (last_seq) {
60 db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
61 db_impl_->versions_->SetLastSequence(last_seq + 1);
62 db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
63 }
64
65 db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
66 // A callback to commit a single sub-batch
67 class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
68 public:
69 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
70 : db_(db) {}
71 Status Callback(SequenceNumber commit_seq,
72 bool is_mem_disabled __attribute__((__unused__)), uint64_t,
73 size_t /*index*/, size_t /*total*/) override {
74 assert(!is_mem_disabled);
75 db_->AddCommitted(commit_seq, commit_seq);
76 return Status::OK();
77 }
78
79 private:
80 WritePreparedTxnDB* db_;
81 };
82 db_impl_->SetRecoverableStatePreReleaseCallback(
83 new CommitSubBatchPreReleaseCallback(this));
84
85 auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
86 handles);
87 return s;
88 }
89
90 Status WritePreparedTxnDB::VerifyCFOptions(
91 const ColumnFamilyOptions& cf_options) {
92 Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
93 if (!s.ok()) {
94 return s;
95 }
96 if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
97 return Status::InvalidArgument(
98 "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
99 "WritePrpeared transactions");
100 }
101 return Status::OK();
102 }
103
104 Transaction* WritePreparedTxnDB::BeginTransaction(
105 const WriteOptions& write_options, const TransactionOptions& txn_options,
106 Transaction* old_txn) {
107 if (old_txn != nullptr) {
108 ReinitializeTransaction(old_txn, write_options, txn_options);
109 return old_txn;
110 } else {
111 return new WritePreparedTxn(this, write_options, txn_options);
112 }
113 }
114
115 Status WritePreparedTxnDB::Write(const WriteOptions& opts,
116 WriteBatch* updates) {
117 if (txn_db_options_.skip_concurrency_control) {
118 // Skip locking the rows
119 const size_t UNKNOWN_BATCH_CNT = 0;
120 WritePreparedTxn* NO_TXN = nullptr;
121 return WriteInternal(opts, updates, UNKNOWN_BATCH_CNT, NO_TXN);
122 } else {
123 return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
124 }
125 }
126
127 Status WritePreparedTxnDB::Write(
128 const WriteOptions& opts,
129 const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
130 if (optimizations.skip_concurrency_control) {
131 // Skip locking the rows
132 const size_t UNKNOWN_BATCH_CNT = 0;
133 const size_t ONE_BATCH_CNT = 1;
134 const size_t batch_cnt = optimizations.skip_duplicate_key_check
135 ? ONE_BATCH_CNT
136 : UNKNOWN_BATCH_CNT;
137 WritePreparedTxn* NO_TXN = nullptr;
138 return WriteInternal(opts, updates, batch_cnt, NO_TXN);
139 } else {
140 // TODO(myabandeh): Make use of skip_duplicate_key_check hint
141 // Fall back to unoptimized version
142 return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
143 }
144 }
145
146 Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
147 WriteBatch* batch, size_t batch_cnt,
148 WritePreparedTxn* txn) {
149 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
150 "CommitBatchInternal");
151 if (batch->Count() == 0) {
152 // Otherwise our 1 seq per batch logic will break since there is no seq
153 // increased for this batch.
154 return Status::OK();
155 }
156 if (batch_cnt == 0) { // not provided, then compute it
157 // TODO(myabandeh): add an option to allow user skipping this cost
158 SubBatchCounter counter(*GetCFComparatorMap());
159 auto s = batch->Iterate(&counter);
160 if (!s.ok()) {
161 return s;
162 }
163 batch_cnt = counter.BatchCount();
164 WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
165 ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
166 static_cast<uint64_t>(batch_cnt));
167 }
168 assert(batch_cnt);
169
170 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
171 WriteOptions write_options(write_options_orig);
172 // In the absence of Prepare markers, use Noop as a batch separator
173 auto s = WriteBatchInternal::InsertNoop(batch);
174 assert(s.ok());
175 const bool DISABLE_MEMTABLE = true;
176 const uint64_t no_log_ref = 0;
177 uint64_t seq_used = kMaxSequenceNumber;
178 const size_t ZERO_PREPARES = 0;
179 const bool kSeperatePrepareCommitBatches = true;
180 // Since this is not 2pc, there is no need for AddPrepared but having it in
181 // the PreReleaseCallback enables an optimization. Refer to
182 // SmallestUnCommittedSeq for more details.
183 AddPreparedCallback add_prepared_callback(
184 this, db_impl_, batch_cnt,
185 db_impl_->immutable_db_options().two_write_queues,
186 !kSeperatePrepareCommitBatches);
187 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
188 this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
189 PreReleaseCallback* pre_release_callback;
190 if (do_one_write) {
191 pre_release_callback = &update_commit_map;
192 } else {
193 pre_release_callback = &add_prepared_callback;
194 }
195 s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, no_log_ref,
196 !DISABLE_MEMTABLE, &seq_used, batch_cnt,
197 pre_release_callback);
198 assert(!s.ok() || seq_used != kMaxSequenceNumber);
199 uint64_t prepare_seq = seq_used;
200 if (txn != nullptr) {
201 txn->SetId(prepare_seq);
202 }
203 if (!s.ok()) {
204 return s;
205 }
206 if (do_one_write) {
207 return s;
208 } // else do the 2nd write for commit
209 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
210 "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
211 prepare_seq);
212 // Commit the batch by writing an empty batch to the 2nd queue that will
213 // release the commit sequence number to readers.
214 const size_t ZERO_COMMITS = 0;
215 WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
216 this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
217 WriteBatch empty_batch;
218 write_options.disableWAL = true;
219 write_options.sync = false;
220 const size_t ONE_BATCH = 1; // Just to inc the seq
221 s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
222 no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
223 &update_commit_map_with_prepare);
224 assert(!s.ok() || seq_used != kMaxSequenceNumber);
225 // Note: RemovePrepared is called from within PreReleaseCallback
226 return s;
227 }
228
229 Status WritePreparedTxnDB::Get(const ReadOptions& options,
230 ColumnFamilyHandle* column_family,
231 const Slice& key, PinnableSlice* value) {
232 SequenceNumber min_uncommitted, snap_seq;
233 const SnapshotBackup backed_by_snapshot =
234 AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
235 WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted,
236 backed_by_snapshot);
237 bool* dont_care = nullptr;
238 DBImpl::GetImplOptions get_impl_options;
239 get_impl_options.column_family = column_family;
240 get_impl_options.value = value;
241 get_impl_options.value_found = dont_care;
242 get_impl_options.callback = &callback;
243 auto res = db_impl_->GetImpl(options, key, get_impl_options);
244 if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(),
245 backed_by_snapshot))) {
246 return res;
247 } else {
248 WPRecordTick(TXN_GET_TRY_AGAIN);
249 return Status::TryAgain();
250 }
251 }
252
253 void WritePreparedTxnDB::UpdateCFComparatorMap(
254 const std::vector<ColumnFamilyHandle*>& handles) {
255 auto cf_map = new std::map<uint32_t, const Comparator*>();
256 auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>();
257 for (auto h : handles) {
258 auto id = h->GetID();
259 const Comparator* comparator = h->GetComparator();
260 (*cf_map)[id] = comparator;
261 if (id != 0) {
262 (*handle_map)[id] = h;
263 } else {
264 // The pointer to the default cf handle in the handles will be deleted.
265 // Use the pointer maintained by the db instead.
266 (*handle_map)[id] = DefaultColumnFamily();
267 }
268 }
269 cf_map_.reset(cf_map);
270 handle_map_.reset(handle_map);
271 }
272
273 void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) {
274 auto old_cf_map_ptr = cf_map_.get();
275 assert(old_cf_map_ptr);
276 auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
277 auto old_handle_map_ptr = handle_map_.get();
278 assert(old_handle_map_ptr);
279 auto handle_map =
280 new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr);
281 auto id = h->GetID();
282 const Comparator* comparator = h->GetComparator();
283 (*cf_map)[id] = comparator;
284 (*handle_map)[id] = h;
285 cf_map_.reset(cf_map);
286 handle_map_.reset(handle_map);
287 }
288
289
290 std::vector<Status> WritePreparedTxnDB::MultiGet(
291 const ReadOptions& options,
292 const std::vector<ColumnFamilyHandle*>& column_family,
293 const std::vector<Slice>& keys, std::vector<std::string>* values) {
294 assert(values);
295 size_t num_keys = keys.size();
296 values->resize(num_keys);
297
298 std::vector<Status> stat_list(num_keys);
299 for (size_t i = 0; i < num_keys; ++i) {
300 stat_list[i] = this->Get(options, column_family[i], keys[i], &(*values)[i]);
301 }
302 return stat_list;
303 }
304
305 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
306 struct WritePreparedTxnDB::IteratorState {
307 IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
308 std::shared_ptr<ManagedSnapshot> s,
309 SequenceNumber min_uncommitted)
310 : callback(txn_db, sequence, min_uncommitted, kBackedByDBSnapshot),
311 snapshot(s) {}
312
313 WritePreparedTxnReadCallback callback;
314 std::shared_ptr<ManagedSnapshot> snapshot;
315 };
316
317 namespace {
318 static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
319 delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
320 }
321 } // anonymous namespace
322
323 Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
324 ColumnFamilyHandle* column_family) {
325 constexpr bool ALLOW_BLOB = true;
326 constexpr bool ALLOW_REFRESH = true;
327 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
328 SequenceNumber snapshot_seq = kMaxSequenceNumber;
329 SequenceNumber min_uncommitted = 0;
330 if (options.snapshot != nullptr) {
331 snapshot_seq = options.snapshot->GetSequenceNumber();
332 min_uncommitted =
333 static_cast_with_check<const SnapshotImpl>(options.snapshot)
334 ->min_uncommitted_;
335 } else {
336 auto* snapshot = GetSnapshot();
337 // We take a snapshot to make sure that the related data in the commit map
338 // are not deleted.
339 snapshot_seq = snapshot->GetSequenceNumber();
340 min_uncommitted =
341 static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
342 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
343 }
344 assert(snapshot_seq != kMaxSequenceNumber);
345 auto* cfd =
346 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
347 auto* state =
348 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
349 auto* db_iter =
350 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
351 !ALLOW_BLOB, !ALLOW_REFRESH);
352 db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
353 return db_iter;
354 }
355
356 Status WritePreparedTxnDB::NewIterators(
357 const ReadOptions& options,
358 const std::vector<ColumnFamilyHandle*>& column_families,
359 std::vector<Iterator*>* iterators) {
360 constexpr bool ALLOW_BLOB = true;
361 constexpr bool ALLOW_REFRESH = true;
362 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
363 SequenceNumber snapshot_seq = kMaxSequenceNumber;
364 SequenceNumber min_uncommitted = 0;
365 if (options.snapshot != nullptr) {
366 snapshot_seq = options.snapshot->GetSequenceNumber();
367 min_uncommitted =
368 static_cast_with_check<const SnapshotImpl>(options.snapshot)
369 ->min_uncommitted_;
370 } else {
371 auto* snapshot = GetSnapshot();
372 // We take a snapshot to make sure that the related data in the commit map
373 // are not deleted.
374 snapshot_seq = snapshot->GetSequenceNumber();
375 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
376 min_uncommitted =
377 static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
378 }
379 iterators->clear();
380 iterators->reserve(column_families.size());
381 for (auto* column_family : column_families) {
382 auto* cfd =
383 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
384 auto* state =
385 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
386 auto* db_iter =
387 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
388 !ALLOW_BLOB, !ALLOW_REFRESH);
389 db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
390 iterators->push_back(db_iter);
391 }
392 return Status::OK();
393 }
394
395 void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
396 // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
397 // around.
398 INC_STEP_FOR_MAX_EVICTED =
399 std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
400 snapshot_cache_ = std::unique_ptr<std::atomic<SequenceNumber>[]>(
401 new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
402 commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>(
403 new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
404 dummy_max_snapshot_.number_ = kMaxSequenceNumber;
405 }
406
407 void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,
408 bool locked) {
409 // When max_evicted_seq_ advances, move older entries from prepared_txns_
410 // to delayed_prepared_. This guarantees that if a seq is lower than max,
411 // then it is not in prepared_txns_ and save an expensive, synchronized
412 // lookup from a shared set. delayed_prepared_ is expected to be empty in
413 // normal cases.
414 ROCKS_LOG_DETAILS(
415 info_log_,
416 "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64,
417 prepared_txns_.empty(),
418 prepared_txns_.empty() ? 0 : prepared_txns_.top());
419 const SequenceNumber prepared_top = prepared_txns_.top();
420 const bool empty = prepared_top == kMaxSequenceNumber;
421 // Preliminary check to avoid the synchronization cost
422 if (!empty && prepared_top <= new_max) {
423 if (locked) {
424 // Needed to avoid double locking in pop().
425 prepared_txns_.push_pop_mutex()->Unlock();
426 }
427 WriteLock wl(&prepared_mutex_);
428 // Need to fetch fresh values of ::top after mutex is acquired
429 while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
430 auto to_be_popped = prepared_txns_.top();
431 delayed_prepared_.insert(to_be_popped);
432 ROCKS_LOG_WARN(info_log_,
433 "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
434 " new_max=%" PRIu64,
435 static_cast<uint64_t>(delayed_prepared_.size()),
436 to_be_popped, new_max);
437 delayed_prepared_empty_.store(false, std::memory_order_release);
438 // Update prepared_txns_ after updating delayed_prepared_empty_ otherwise
439 // there will be a point in time that the entry is neither in
440 // prepared_txns_ nor in delayed_prepared_, which will not be checked if
441 // delayed_prepared_empty_ is false.
442 prepared_txns_.pop();
443 }
444 if (locked) {
445 prepared_txns_.push_pop_mutex()->Lock();
446 }
447 }
448 }
449
450 void WritePreparedTxnDB::AddPrepared(uint64_t seq, bool locked) {
451 ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64,
452 seq, max_evicted_seq_.load());
453 TEST_SYNC_POINT("AddPrepared::begin:pause");
454 TEST_SYNC_POINT("AddPrepared::begin:resume");
455 if (!locked) {
456 prepared_txns_.push_pop_mutex()->Lock();
457 }
458 prepared_txns_.push_pop_mutex()->AssertHeld();
459 prepared_txns_.push(seq);
460 auto new_max = future_max_evicted_seq_.load();
461 if (UNLIKELY(seq <= new_max)) {
462 // This should not happen in normal case
463 ROCKS_LOG_ERROR(
464 info_log_,
465 "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
466 " <= %" PRIu64,
467 seq, new_max);
468 CheckPreparedAgainstMax(new_max, true /*locked*/);
469 }
470 if (!locked) {
471 prepared_txns_.push_pop_mutex()->Unlock();
472 }
473 TEST_SYNC_POINT("AddPrepared::end");
474 }
475
476 void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
477 uint8_t loop_cnt) {
478 ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
479 prepare_seq, commit_seq);
480 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
481 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
482 auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
483 CommitEntry64b evicted_64b;
484 CommitEntry evicted;
485 bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
486 if (LIKELY(to_be_evicted)) {
487 assert(evicted.prep_seq != prepare_seq);
488 auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
489 ROCKS_LOG_DETAILS(info_log_,
490 "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
491 evicted.prep_seq, evicted.commit_seq, prev_max);
492 if (prev_max < evicted.commit_seq) {
493 auto last = db_impl_->GetLastPublishedSequence(); // could be 0
494 SequenceNumber max_evicted_seq;
495 if (LIKELY(evicted.commit_seq < last)) {
496 assert(last > 0);
497 // Inc max in larger steps to avoid frequent updates
498 max_evicted_seq =
499 std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1);
500 } else {
501 // legit when a commit entry in a write batch overwrite the previous one
502 max_evicted_seq = evicted.commit_seq;
503 }
504 ROCKS_LOG_DETAILS(info_log_,
505 "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64
506 " => %lu",
507 prepare_seq, evicted.prep_seq, evicted.commit_seq,
508 prev_max, max_evicted_seq);
509 AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
510 }
511 // After each eviction from commit cache, check if the commit entry should
512 // be kept around because it overlaps with a live snapshot.
513 CheckAgainstSnapshots(evicted);
514 if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) {
515 WriteLock wl(&prepared_mutex_);
516 for (auto dp : delayed_prepared_) {
517 if (dp == evicted.prep_seq) {
518 // This is a rare case that txn is committed but prepared_txns_ is not
519 // cleaned up yet. Refer to delayed_prepared_commits_ definition for
520 // why it should be kept updated.
521 delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq;
522 ROCKS_LOG_DEBUG(info_log_,
523 "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64,
524 evicted.prep_seq, evicted.commit_seq);
525 break;
526 }
527 }
528 }
529 }
530 bool succ =
531 ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
532 if (UNLIKELY(!succ)) {
533 ROCKS_LOG_ERROR(info_log_,
534 "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64
535 ",%" PRIu64 " retrying...",
536 indexed_seq, prepare_seq, commit_seq);
537 // A very rare event, in which the commit entry is updated before we do.
538 // Here we apply a very simple solution of retrying.
539 if (loop_cnt > 100) {
540 throw std::runtime_error("Infinite loop in AddCommitted!");
541 }
542 AddCommitted(prepare_seq, commit_seq, ++loop_cnt);
543 return;
544 }
545 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
546 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
547 }
548
549 void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
550 const size_t batch_cnt) {
551 TEST_SYNC_POINT_CALLBACK(
552 "RemovePrepared:Start",
553 const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq)));
554 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause");
555 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume");
556 ROCKS_LOG_DETAILS(info_log_,
557 "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt,
558 prepare_seq, batch_cnt);
559 WriteLock wl(&prepared_mutex_);
560 for (size_t i = 0; i < batch_cnt; i++) {
561 prepared_txns_.erase(prepare_seq + i);
562 bool was_empty = delayed_prepared_.empty();
563 if (!was_empty) {
564 delayed_prepared_.erase(prepare_seq + i);
565 auto it = delayed_prepared_commits_.find(prepare_seq + i);
566 if (it != delayed_prepared_commits_.end()) {
567 ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64,
568 prepare_seq + i);
569 delayed_prepared_commits_.erase(it);
570 }
571 bool is_empty = delayed_prepared_.empty();
572 if (was_empty != is_empty) {
573 delayed_prepared_empty_.store(is_empty, std::memory_order_release);
574 }
575 }
576 }
577 }
578
579 bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
580 CommitEntry64b* entry_64b,
581 CommitEntry* entry) const {
582 *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(std::memory_order_acquire);
583 bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
584 return valid;
585 }
586
587 bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
588 const CommitEntry& new_entry,
589 CommitEntry* evicted_entry) {
590 CommitEntry64b new_entry_64b(new_entry, FORMAT);
591 CommitEntry64b evicted_entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].exchange(
592 new_entry_64b, std::memory_order_acq_rel);
593 bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
594 return valid;
595 }
596
597 bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
598 CommitEntry64b& expected_entry_64b,
599 const CommitEntry& new_entry) {
600 auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)];
601 CommitEntry64b new_entry_64b(new_entry, FORMAT);
602 bool succ = atomic_entry.compare_exchange_strong(
603 expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
604 std::memory_order_acquire);
605 return succ;
606 }
607
608 void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
609 const SequenceNumber& new_max) {
610 ROCKS_LOG_DETAILS(info_log_,
611 "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
612 prev_max, new_max);
613 // Declare the intention before getting snapshot from the DB. This helps a
614 // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
615 // it has not already. Otherwise the new snapshot is when we ask DB for
616 // snapshots smaller than future max.
617 auto updated_future_max = prev_max;
618 while (updated_future_max < new_max &&
619 !future_max_evicted_seq_.compare_exchange_weak(
620 updated_future_max, new_max, std::memory_order_acq_rel,
621 std::memory_order_relaxed)) {
622 };
623
624 CheckPreparedAgainstMax(new_max, false /*locked*/);
625
626 // With each change to max_evicted_seq_ fetch the live snapshots behind it.
627 // We use max as the version of snapshots to identify how fresh are the
628 // snapshot list. This works because the snapshots are between 0 and
629 // max, so the larger the max, the more complete they are.
630 SequenceNumber new_snapshots_version = new_max;
631 std::vector<SequenceNumber> snapshots;
632 bool update_snapshots = false;
633 if (new_snapshots_version > snapshots_version_) {
634 // This is to avoid updating the snapshots_ if it already updated
635 // with a more recent vesion by a concrrent thread
636 update_snapshots = true;
637 // We only care about snapshots lower then max
638 snapshots = GetSnapshotListFromDB(new_max);
639 }
640 if (update_snapshots) {
641 UpdateSnapshots(snapshots, new_snapshots_version);
642 if (!snapshots.empty()) {
643 WriteLock wl(&old_commit_map_mutex_);
644 for (auto snap : snapshots) {
645 // This allows IsInSnapshot to tell apart the reads from in valid
646 // snapshots from the reads from committed values in valid snapshots.
647 old_commit_map_[snap];
648 }
649 old_commit_map_empty_.store(false, std::memory_order_release);
650 }
651 }
652 auto updated_prev_max = prev_max;
653 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause");
654 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume");
655 while (updated_prev_max < new_max &&
656 !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
657 std::memory_order_acq_rel,
658 std::memory_order_relaxed)) {
659 };
660 }
661
662 const Snapshot* WritePreparedTxnDB::GetSnapshot() {
663 const bool kForWWConflictCheck = true;
664 return GetSnapshotInternal(!kForWWConflictCheck);
665 }
666
667 SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
668 bool for_ww_conflict_check) {
669 // Note: for this optimization setting the last sequence number and obtaining
670 // the smallest uncommitted seq should be done atomically. However to avoid
671 // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
672 // snapshot. Since we always updated the list of unprepared seq (via
673 // AddPrepared) AFTER the last sequence is updated, this guarantees that the
674 // smallest uncommitted seq that we pair with the snapshot is smaller or equal
675 // the value that would be obtained otherwise atomically. That is ok since
676 // this optimization works as long as min_uncommitted is less than or equal
677 // than the smallest uncommitted seq when the snapshot was taken.
678 auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
679 SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
680 TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:first");
681 assert(snap_impl);
682 SequenceNumber snap_seq = snap_impl->GetSequenceNumber();
683 // Note: Check against future_max_evicted_seq_ (in contrast with
684 // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq.
685 if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) {
686 // There is a very rare case in which the commit entry evicts another commit
687 // entry that is not published yet thus advancing max evicted seq beyond the
688 // last published seq. This case is not likely in real-world setup so we
689 // handle it with a few retries.
690 size_t retry = 0;
691 SequenceNumber max;
692 while ((max = future_max_evicted_seq_.load()) != 0 &&
693 snap_impl->GetSequenceNumber() <= max && retry < 100) {
694 ROCKS_LOG_WARN(info_log_,
695 "GetSnapshot snap: %" PRIu64 " max: %" PRIu64
696 " retry %" ROCKSDB_PRIszt,
697 snap_impl->GetSequenceNumber(), max, retry);
698 ReleaseSnapshot(snap_impl);
699 // Wait for last visible seq to catch up with max, and also go beyond it
700 // by one.
701 AdvanceSeqByOne();
702 snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
703 assert(snap_impl);
704 retry++;
705 }
706 assert(snap_impl->GetSequenceNumber() > max);
707 if (snap_impl->GetSequenceNumber() <= max) {
708 throw std::runtime_error(
709 "Snapshot seq " + ToString(snap_impl->GetSequenceNumber()) +
710 " after " + ToString(retry) +
711 " retries is still less than futre_max_evicted_seq_" + ToString(max));
712 }
713 }
714 EnhanceSnapshot(snap_impl, min_uncommitted);
715 ROCKS_LOG_DETAILS(
716 db_impl_->immutable_db_options().info_log,
717 "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64,
718 snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted);
719 TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:end");
720 return snap_impl;
721 }
722
723 void WritePreparedTxnDB::AdvanceSeqByOne() {
724 // Inserting an empty value will i) let the max evicted entry to be
725 // published, i.e., max == last_published, increase the last published to
726 // be one beyond max, i.e., max < last_published.
727 WriteOptions woptions;
728 TransactionOptions txn_options;
729 Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr);
730 std::hash<std::thread::id> hasher;
731 char name[64];
732 snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id()));
733 assert(strlen(name) < 64 - 1);
734 Status s = txn0->SetName(name);
735 assert(s.ok());
736 if (s.ok()) {
737 // Without prepare it would simply skip the commit
738 s = txn0->Prepare();
739 }
740 assert(s.ok());
741 if (s.ok()) {
742 s = txn0->Commit();
743 }
744 assert(s.ok());
745 delete txn0;
746 }
747
748 const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
749 SequenceNumber max) {
750 ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
751 InstrumentedMutexLock dblock(db_impl_->mutex());
752 db_impl_->mutex()->AssertHeld();
753 return db_impl_->snapshots().GetAll(nullptr, max);
754 }
755
756 void WritePreparedTxnDB::ReleaseSnapshotInternal(
757 const SequenceNumber snap_seq) {
758 // TODO(myabandeh): relax should enough since the synchronizatin is already
759 // done by snapshots_mutex_ under which this function is called.
760 if (snap_seq <= max_evicted_seq_.load(std::memory_order_acquire)) {
761 // Then this is a rare case that transaction did not finish before max
762 // advances. It is expected for a few read-only backup snapshots. For such
763 // snapshots we might have kept around a couple of entries in the
764 // old_commit_map_. Check and do garbage collection if that is the case.
765 bool need_gc = false;
766 {
767 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
768 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
769 snap_seq);
770 ReadLock rl(&old_commit_map_mutex_);
771 auto prep_set_entry = old_commit_map_.find(snap_seq);
772 need_gc = prep_set_entry != old_commit_map_.end();
773 }
774 if (need_gc) {
775 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
776 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
777 snap_seq);
778 WriteLock wl(&old_commit_map_mutex_);
779 old_commit_map_.erase(snap_seq);
780 old_commit_map_empty_.store(old_commit_map_.empty(),
781 std::memory_order_release);
782 }
783 }
784 }
785
786 void WritePreparedTxnDB::CleanupReleasedSnapshots(
787 const std::vector<SequenceNumber>& new_snapshots,
788 const std::vector<SequenceNumber>& old_snapshots) {
789 auto newi = new_snapshots.begin();
790 auto oldi = old_snapshots.begin();
791 for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) {
792 assert(*newi >= *oldi); // cannot have new snapshots with lower seq
793 if (*newi == *oldi) { // still not released
794 auto value = *newi;
795 while (newi != new_snapshots.end() && *newi == value) {
796 newi++;
797 }
798 while (oldi != old_snapshots.end() && *oldi == value) {
799 oldi++;
800 }
801 } else {
802 assert(*newi > *oldi); // *oldi is released
803 ReleaseSnapshotInternal(*oldi);
804 oldi++;
805 }
806 }
807 // Everything remained in old_snapshots is released and must be cleaned up
808 for (; oldi != old_snapshots.end(); oldi++) {
809 ReleaseSnapshotInternal(*oldi);
810 }
811 }
812
813 void WritePreparedTxnDB::UpdateSnapshots(
814 const std::vector<SequenceNumber>& snapshots,
815 const SequenceNumber& version) {
816 ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
817 version);
818 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
819 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
820 #ifndef NDEBUG
821 size_t sync_i = 0;
822 #endif
823 ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead");
824 WriteLock wl(&snapshots_mutex_);
825 snapshots_version_ = version;
826 // We update the list concurrently with the readers.
827 // Both new and old lists are sorted and the new list is subset of the
828 // previous list plus some new items. Thus if a snapshot repeats in
829 // both new and old lists, it will appear upper in the new list. So if
830 // we simply insert the new snapshots in order, if an overwritten item
831 // is still valid in the new list is either written to the same place in
832 // the array or it is written in a higher palce before it gets
833 // overwritten by another item. This guarantess a reader that reads the
834 // list bottom-up will eventaully see a snapshot that repeats in the
835 // update, either before it gets overwritten by the writer or
836 // afterwards.
837 size_t i = 0;
838 auto it = snapshots.begin();
839 for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; ++it, ++i) {
840 snapshot_cache_[i].store(*it, std::memory_order_release);
841 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
842 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
843 }
844 #ifndef NDEBUG
845 // Release the remaining sync points since they are useless given that the
846 // reader would also use lock to access snapshots
847 for (++sync_i; sync_i <= 10; ++sync_i) {
848 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
849 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
850 }
851 #endif
852 snapshots_.clear();
853 for (; it != snapshots.end(); ++it) {
854 // Insert them to a vector that is less efficient to access
855 // concurrently
856 snapshots_.push_back(*it);
857 }
858 // Update the size at the end. Otherwise a parallel reader might read
859 // items that are not set yet.
860 snapshots_total_.store(snapshots.size(), std::memory_order_release);
861
862 // Note: this must be done after the snapshots data structures are updated
863 // with the new list of snapshots.
864 CleanupReleasedSnapshots(snapshots, snapshots_all_);
865 snapshots_all_ = snapshots;
866
867 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
868 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
869 }
870
871 void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
872 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
873 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
874 #ifndef NDEBUG
875 size_t sync_i = 0;
876 #endif
877 // First check the snapshot cache that is efficient for concurrent access
878 auto cnt = snapshots_total_.load(std::memory_order_acquire);
879 // The list might get updated concurrently as we are reading from it. The
880 // reader should be able to read all the snapshots that are still valid
881 // after the update. Since the survived snapshots are written in a higher
882 // place before gets overwritten the reader that reads bottom-up will
883 // eventully see it.
884 const bool next_is_larger = true;
885 // We will set to true if the border line snapshot suggests that.
886 bool search_larger_list = false;
887 size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
888 for (; 0 < ip1; ip1--) {
889 SequenceNumber snapshot_seq =
890 snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
891 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
892 ++sync_i);
893 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
894 if (ip1 == SNAPSHOT_CACHE_SIZE) { // border line snapshot
895 // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
896 // then later also continue the search to larger snapshots
897 search_larger_list = snapshot_seq < evicted.commit_seq;
898 }
899 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
900 snapshot_seq, !next_is_larger)) {
901 break;
902 }
903 }
904 #ifndef NDEBUG
905 // Release the remaining sync points before accquiring the lock
906 for (++sync_i; sync_i <= 10; ++sync_i) {
907 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
908 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
909 }
910 #endif
911 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
912 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
913 if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) {
914 // Then access the less efficient list of snapshots_
915 WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
916 ROCKS_LOG_WARN(info_log_,
917 "snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64
918 "> with %" ROCKSDB_PRIszt " snapshots",
919 evicted.prep_seq, evicted.commit_seq, cnt);
920 ReadLock rl(&snapshots_mutex_);
921 // Items could have moved from the snapshots_ to snapshot_cache_ before
922 // accquiring the lock. To make sure that we do not miss a valid snapshot,
923 // read snapshot_cache_ again while holding the lock.
924 for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
925 SequenceNumber snapshot_seq =
926 snapshot_cache_[i].load(std::memory_order_acquire);
927 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
928 snapshot_seq, next_is_larger)) {
929 break;
930 }
931 }
932 for (auto snapshot_seq_2 : snapshots_) {
933 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
934 snapshot_seq_2, next_is_larger)) {
935 break;
936 }
937 }
938 }
939 }
940
941 bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
942 const uint64_t& prep_seq, const uint64_t& commit_seq,
943 const uint64_t& snapshot_seq, const bool next_is_larger = true) {
944 // If we do not store an entry in old_commit_map_ we assume it is committed in
945 // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
946 // the snapshot so we need not to keep the entry around for this snapshot.
947 if (commit_seq <= snapshot_seq) {
948 // continue the search if the next snapshot could be smaller than commit_seq
949 return !next_is_larger;
950 }
951 // then snapshot_seq < commit_seq
952 if (prep_seq <= snapshot_seq) { // overlapping range
953 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
954 ROCKS_LOG_WARN(info_log_,
955 "old_commit_map_mutex_ overhead for %" PRIu64
956 " commit entry: <%" PRIu64 ",%" PRIu64 ">",
957 snapshot_seq, prep_seq, commit_seq);
958 WriteLock wl(&old_commit_map_mutex_);
959 old_commit_map_empty_.store(false, std::memory_order_release);
960 auto& vec = old_commit_map_[snapshot_seq];
961 vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq);
962 // We need to store it once for each overlapping snapshot. Returning true to
963 // continue the search if there is more overlapping snapshot.
964 return true;
965 }
966 // continue the search if the next snapshot could be larger than prep_seq
967 return next_is_larger;
968 }
969
970 WritePreparedTxnDB::~WritePreparedTxnDB() {
971 // At this point there could be running compaction/flush holding a
972 // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
973 // Make sure those jobs finished before destructing WritePreparedTxnDB.
974 if (!db_impl_->shutting_down_) {
975 db_impl_->CancelAllBackgroundWork(true /*wait*/);
976 }
977 }
978
979 void SubBatchCounter::InitWithComp(const uint32_t cf) {
980 auto cmp = comparators_[cf];
981 keys_[cf] = CFKeys(SetComparator(cmp));
982 }
983
984 void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
985 CFKeys& cf_keys = keys_[cf];
986 if (cf_keys.size() == 0) { // just inserted
987 InitWithComp(cf);
988 }
989 auto it = cf_keys.insert(key);
990 if (it.second == false) { // second is false if a element already existed.
991 batches_++;
992 keys_.clear();
993 InitWithComp(cf);
994 keys_[cf].insert(key);
995 }
996 }
997
998 } // namespace ROCKSDB_NAMESPACE
999 #endif // ROCKSDB_LITE