]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn_db.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#ifndef ROCKSDB_LITE
7
8#ifndef __STDC_FORMAT_MACROS
9#define __STDC_FORMAT_MACROS
10#endif
11
12#include "utilities/transactions/write_prepared_txn_db.h"
13
14#include <inttypes.h>
15#include <algorithm>
16#include <string>
17#include <unordered_set>
18#include <vector>
19
20#include "db/db_impl.h"
21#include "rocksdb/db.h"
22#include "rocksdb/options.h"
23#include "rocksdb/utilities/transaction_db.h"
24#include "util/cast_util.h"
25#include "util/mutexlock.h"
26#include "util/string_util.h"
27#include "util/sync_point.h"
28#include "utilities/transactions/pessimistic_transaction.h"
29#include "utilities/transactions/transaction_db_mutex_impl.h"
30
31namespace rocksdb {
32
33Status WritePreparedTxnDB::Initialize(
34 const std::vector<size_t>& compaction_enabled_cf_indices,
35 const std::vector<ColumnFamilyHandle*>& handles) {
36 auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
37 assert(dbimpl != nullptr);
38 auto rtxns = dbimpl->recovered_transactions();
39 for (auto rtxn : rtxns) {
40 // There should only one batch for WritePrepared policy.
41 assert(rtxn.second->batches_.size() == 1);
42 const auto& seq = rtxn.second->batches_.begin()->first;
43 const auto& batch_info = rtxn.second->batches_.begin()->second;
44 auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
45 for (size_t i = 0; i < cnt; i++) {
46 AddPrepared(seq + i);
47 }
48 }
49 SequenceNumber prev_max = max_evicted_seq_;
50 SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
51 AdvanceMaxEvictedSeq(prev_max, last_seq);
494da23a
TL
52 // Create a gap between max and the next snapshot. This simplifies the logic
53 // in IsInSnapshot by not having to consider the special case of max ==
54 // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
55 if (last_seq) {
56 db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
57 db_impl_->versions_->SetLastSequence(last_seq + 1);
58 db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
59 }
11fdf7f2
TL
60
61 db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
62 // A callback to commit a single sub-batch
63 class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
64 public:
65 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
66 : db_(db) {}
494da23a
TL
67 Status Callback(SequenceNumber commit_seq,
68 bool is_mem_disabled __attribute__((__unused__)),
69 uint64_t) override {
11fdf7f2
TL
70 assert(!is_mem_disabled);
71 db_->AddCommitted(commit_seq, commit_seq);
72 return Status::OK();
73 }
74
75 private:
76 WritePreparedTxnDB* db_;
77 };
78 db_impl_->SetRecoverableStatePreReleaseCallback(
79 new CommitSubBatchPreReleaseCallback(this));
80
81 auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
82 handles);
83 return s;
84}
85
86Status WritePreparedTxnDB::VerifyCFOptions(
87 const ColumnFamilyOptions& cf_options) {
88 Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
89 if (!s.ok()) {
90 return s;
91 }
92 if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
93 return Status::InvalidArgument(
94 "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
95 "WritePrpeared transactions");
96 }
97 return Status::OK();
98}
99
100Transaction* WritePreparedTxnDB::BeginTransaction(
101 const WriteOptions& write_options, const TransactionOptions& txn_options,
102 Transaction* old_txn) {
103 if (old_txn != nullptr) {
104 ReinitializeTransaction(old_txn, write_options, txn_options);
105 return old_txn;
106 } else {
107 return new WritePreparedTxn(this, write_options, txn_options);
108 }
109}
110
111Status WritePreparedTxnDB::Write(
112 const WriteOptions& opts,
113 const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
114 if (optimizations.skip_concurrency_control) {
115 // Skip locking the rows
116 const size_t UNKNOWN_BATCH_CNT = 0;
117 const size_t ONE_BATCH_CNT = 1;
118 const size_t batch_cnt = optimizations.skip_duplicate_key_check
119 ? ONE_BATCH_CNT
120 : UNKNOWN_BATCH_CNT;
121 WritePreparedTxn* NO_TXN = nullptr;
122 return WriteInternal(opts, updates, batch_cnt, NO_TXN);
123 } else {
124 // TODO(myabandeh): Make use of skip_duplicate_key_check hint
125 // Fall back to unoptimized version
126 return PessimisticTransactionDB::Write(opts, updates);
127 }
128}
129
130Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
131 WriteBatch* batch, size_t batch_cnt,
132 WritePreparedTxn* txn) {
133 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
134 "CommitBatchInternal");
135 if (batch->Count() == 0) {
136 // Otherwise our 1 seq per batch logic will break since there is no seq
137 // increased for this batch.
138 return Status::OK();
139 }
140 if (batch_cnt == 0) { // not provided, then compute it
141 // TODO(myabandeh): add an option to allow user skipping this cost
142 SubBatchCounter counter(*GetCFComparatorMap());
143 auto s = batch->Iterate(&counter);
144 assert(s.ok());
145 batch_cnt = counter.BatchCount();
146 WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
147 ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
148 static_cast<uint64_t>(batch_cnt));
149 }
150 assert(batch_cnt);
151
152 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
153 WriteOptions write_options(write_options_orig);
154 bool sync = write_options.sync;
155 if (!do_one_write) {
156 // No need to sync on the first write
157 write_options.sync = false;
158 }
159 // In the absence of Prepare markers, use Noop as a batch separator
160 WriteBatchInternal::InsertNoop(batch);
161 const bool DISABLE_MEMTABLE = true;
162 const uint64_t no_log_ref = 0;
163 uint64_t seq_used = kMaxSequenceNumber;
164 const size_t ZERO_PREPARES = 0;
494da23a 165 const bool kSeperatePrepareCommitBatches = true;
11fdf7f2
TL
166 // Since this is not 2pc, there is no need for AddPrepared but having it in
167 // the PreReleaseCallback enables an optimization. Refer to
168 // SmallestUnCommittedSeq for more details.
169 AddPreparedCallback add_prepared_callback(
494da23a
TL
170 this, db_impl_, batch_cnt,
171 db_impl_->immutable_db_options().two_write_queues,
172 !kSeperatePrepareCommitBatches);
11fdf7f2
TL
173 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
174 this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
175 PreReleaseCallback* pre_release_callback;
176 if (do_one_write) {
177 pre_release_callback = &update_commit_map;
178 } else {
179 pre_release_callback = &add_prepared_callback;
180 }
181 auto s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr,
182 no_log_ref, !DISABLE_MEMTABLE, &seq_used,
183 batch_cnt, pre_release_callback);
184 assert(!s.ok() || seq_used != kMaxSequenceNumber);
185 uint64_t prepare_seq = seq_used;
186 if (txn != nullptr) {
187 txn->SetId(prepare_seq);
188 }
189 if (!s.ok()) {
190 return s;
191 }
192 if (do_one_write) {
193 return s;
194 } // else do the 2nd write for commit
195 // Set the original value of sync
196 write_options.sync = sync;
197 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
198 "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
199 prepare_seq);
200 // Commit the batch by writing an empty batch to the 2nd queue that will
201 // release the commit sequence number to readers.
202 const size_t ZERO_COMMITS = 0;
203 WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
204 this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
205 WriteBatch empty_batch;
206 empty_batch.PutLogData(Slice());
207 const size_t ONE_BATCH = 1;
208 // In the absence of Prepare markers, use Noop as a batch separator
209 WriteBatchInternal::InsertNoop(&empty_batch);
210 s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
211 no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
212 &update_commit_map_with_prepare);
213 assert(!s.ok() || seq_used != kMaxSequenceNumber);
214 // Note RemovePrepared should be called after WriteImpl that publishsed the
215 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
216 RemovePrepared(prepare_seq, batch_cnt);
217 return s;
218}
219
220Status WritePreparedTxnDB::Get(const ReadOptions& options,
221 ColumnFamilyHandle* column_family,
222 const Slice& key, PinnableSlice* value) {
223 // We are fine with the latest committed value. This could be done by
224 // specifying the snapshot as kMaxSequenceNumber.
225 SequenceNumber seq = kMaxSequenceNumber;
226 SequenceNumber min_uncommitted = 0;
227 if (options.snapshot != nullptr) {
228 seq = options.snapshot->GetSequenceNumber();
229 min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
230 options.snapshot)
231 ->min_uncommitted_;
232 } else {
233 min_uncommitted = SmallestUnCommittedSeq();
234 }
235 WritePreparedTxnReadCallback callback(this, seq, min_uncommitted);
236 bool* dont_care = nullptr;
237 // Note: no need to specify a snapshot for read options as no specific
238 // snapshot is requested by the user.
239 return db_impl_->GetImpl(options, column_family, key, value, dont_care,
240 &callback);
241}
242
243void WritePreparedTxnDB::UpdateCFComparatorMap(
244 const std::vector<ColumnFamilyHandle*>& handles) {
245 auto cf_map = new std::map<uint32_t, const Comparator*>();
246 auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>();
247 for (auto h : handles) {
248 auto id = h->GetID();
249 const Comparator* comparator = h->GetComparator();
250 (*cf_map)[id] = comparator;
251 if (id != 0) {
252 (*handle_map)[id] = h;
253 } else {
254 // The pointer to the default cf handle in the handles will be deleted.
255 // Use the pointer maintained by the db instead.
256 (*handle_map)[id] = DefaultColumnFamily();
257 }
258 }
259 cf_map_.reset(cf_map);
260 handle_map_.reset(handle_map);
261}
262
263void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) {
264 auto old_cf_map_ptr = cf_map_.get();
265 assert(old_cf_map_ptr);
266 auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
267 auto old_handle_map_ptr = handle_map_.get();
268 assert(old_handle_map_ptr);
269 auto handle_map =
270 new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr);
271 auto id = h->GetID();
272 const Comparator* comparator = h->GetComparator();
273 (*cf_map)[id] = comparator;
274 (*handle_map)[id] = h;
275 cf_map_.reset(cf_map);
276 handle_map_.reset(handle_map);
277}
278
279
280std::vector<Status> WritePreparedTxnDB::MultiGet(
281 const ReadOptions& options,
282 const std::vector<ColumnFamilyHandle*>& column_family,
283 const std::vector<Slice>& keys, std::vector<std::string>* values) {
284 assert(values);
285 size_t num_keys = keys.size();
286 values->resize(num_keys);
287
288 std::vector<Status> stat_list(num_keys);
289 for (size_t i = 0; i < num_keys; ++i) {
290 std::string* value = values ? &(*values)[i] : nullptr;
291 stat_list[i] = this->Get(options, column_family[i], keys[i], value);
292 }
293 return stat_list;
294}
295
296// Struct to hold ownership of snapshot and read callback for iterator cleanup.
297struct WritePreparedTxnDB::IteratorState {
298 IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
299 std::shared_ptr<ManagedSnapshot> s,
300 SequenceNumber min_uncommitted)
301 : callback(txn_db, sequence, min_uncommitted), snapshot(s) {}
302
303 WritePreparedTxnReadCallback callback;
304 std::shared_ptr<ManagedSnapshot> snapshot;
305};
306
307namespace {
308static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
309 delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
310}
311} // anonymous namespace
312
313Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
314 ColumnFamilyHandle* column_family) {
315 constexpr bool ALLOW_BLOB = true;
316 constexpr bool ALLOW_REFRESH = true;
317 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
318 SequenceNumber snapshot_seq = kMaxSequenceNumber;
319 SequenceNumber min_uncommitted = 0;
320 if (options.snapshot != nullptr) {
321 snapshot_seq = options.snapshot->GetSequenceNumber();
322 min_uncommitted =
323 static_cast_with_check<const SnapshotImpl, const Snapshot>(
324 options.snapshot)
325 ->min_uncommitted_;
326 } else {
327 auto* snapshot = GetSnapshot();
328 // We take a snapshot to make sure that the related data in the commit map
329 // are not deleted.
330 snapshot_seq = snapshot->GetSequenceNumber();
331 min_uncommitted =
332 static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
333 ->min_uncommitted_;
334 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
335 }
336 assert(snapshot_seq != kMaxSequenceNumber);
337 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
338 auto* state =
339 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
340 auto* db_iter =
341 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
342 !ALLOW_BLOB, !ALLOW_REFRESH);
343 db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
344 return db_iter;
345}
346
347Status WritePreparedTxnDB::NewIterators(
348 const ReadOptions& options,
349 const std::vector<ColumnFamilyHandle*>& column_families,
350 std::vector<Iterator*>* iterators) {
351 constexpr bool ALLOW_BLOB = true;
352 constexpr bool ALLOW_REFRESH = true;
353 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
354 SequenceNumber snapshot_seq = kMaxSequenceNumber;
355 SequenceNumber min_uncommitted = 0;
356 if (options.snapshot != nullptr) {
357 snapshot_seq = options.snapshot->GetSequenceNumber();
358 min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
359 options.snapshot)
360 ->min_uncommitted_;
361 } else {
362 auto* snapshot = GetSnapshot();
363 // We take a snapshot to make sure that the related data in the commit map
364 // are not deleted.
365 snapshot_seq = snapshot->GetSequenceNumber();
366 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
367 min_uncommitted =
368 static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
369 ->min_uncommitted_;
370 }
371 iterators->clear();
372 iterators->reserve(column_families.size());
373 for (auto* column_family : column_families) {
374 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
375 auto* state =
376 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
377 auto* db_iter =
378 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
379 !ALLOW_BLOB, !ALLOW_REFRESH);
380 db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
381 iterators->push_back(db_iter);
382 }
383 return Status::OK();
384}
385
386void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
387 // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
388 // around.
389 INC_STEP_FOR_MAX_EVICTED =
390 std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
494da23a 391 snapshot_cache_ = std::unique_ptr<std::atomic<SequenceNumber>[]>(
11fdf7f2 392 new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
494da23a 393 commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>(
11fdf7f2
TL
394 new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
395}
396
494da23a
TL
397void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max) {
398 prepared_mutex_.AssertHeld();
399 // When max_evicted_seq_ advances, move older entries from prepared_txns_
400 // to delayed_prepared_. This guarantees that if a seq is lower than max,
401 // then it is not in prepared_txns_ and save an expensive, synchronized
402 // lookup from a shared set. delayed_prepared_ is expected to be empty in
403 // normal cases.
404 ROCKS_LOG_DETAILS(
405 info_log_,
406 "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64,
407 prepared_txns_.empty(),
408 prepared_txns_.empty() ? 0 : prepared_txns_.top());
409 while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
410 auto to_be_popped = prepared_txns_.top();
411 delayed_prepared_.insert(to_be_popped);
412 ROCKS_LOG_WARN(info_log_,
413 "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
414 " new_max=%" PRIu64,
415 static_cast<uint64_t>(delayed_prepared_.size()),
416 to_be_popped, new_max);
417 prepared_txns_.pop();
418 delayed_prepared_empty_.store(false, std::memory_order_release);
11fdf7f2 419 }
494da23a
TL
420}
421
422void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
423 ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64,
424 seq, max_evicted_seq_.load());
425 TEST_SYNC_POINT("AddPrepared::begin:pause");
426 TEST_SYNC_POINT("AddPrepared::begin:resume");
11fdf7f2
TL
427 WriteLock wl(&prepared_mutex_);
428 prepared_txns_.push(seq);
494da23a
TL
429 auto new_max = future_max_evicted_seq_.load();
430 if (UNLIKELY(seq <= new_max)) {
431 // This should not happen in normal case
432 ROCKS_LOG_ERROR(
433 info_log_,
434 "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
435 " <= %" PRIu64,
436 seq, new_max);
437 CheckPreparedAgainstMax(new_max);
438 }
439 TEST_SYNC_POINT("AddPrepared::end");
11fdf7f2
TL
440}
441
442void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
443 uint8_t loop_cnt) {
444 ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
445 prepare_seq, commit_seq);
446 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
447 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
448 auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
449 CommitEntry64b evicted_64b;
450 CommitEntry evicted;
451 bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
452 if (LIKELY(to_be_evicted)) {
453 assert(evicted.prep_seq != prepare_seq);
454 auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
455 ROCKS_LOG_DETAILS(info_log_,
456 "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
457 evicted.prep_seq, evicted.commit_seq, prev_max);
458 if (prev_max < evicted.commit_seq) {
494da23a
TL
459 auto last = db_impl_->GetLastPublishedSequence(); // could be 0
460 SequenceNumber max_evicted_seq;
461 if (LIKELY(evicted.commit_seq < last)) {
462 assert(last > 0);
463 // Inc max in larger steps to avoid frequent updates
464 max_evicted_seq =
465 std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1);
466 } else {
467 // legit when a commit entry in a write batch overwrite the previous one
468 max_evicted_seq = evicted.commit_seq;
469 }
470 ROCKS_LOG_DETAILS(info_log_,
471 "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64
472 " => %lu",
473 prepare_seq, evicted.prep_seq, evicted.commit_seq,
474 prev_max, max_evicted_seq);
11fdf7f2
TL
475 AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
476 }
477 // After each eviction from commit cache, check if the commit entry should
478 // be kept around because it overlaps with a live snapshot.
479 CheckAgainstSnapshots(evicted);
494da23a
TL
480 if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) {
481 WriteLock wl(&prepared_mutex_);
482 for (auto dp : delayed_prepared_) {
483 if (dp == evicted.prep_seq) {
484 // This is a rare case that txn is committed but prepared_txns_ is not
485 // cleaned up yet. Refer to delayed_prepared_commits_ definition for
486 // why it should be kept updated.
487 delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq;
488 ROCKS_LOG_DEBUG(info_log_,
489 "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64,
490 evicted.prep_seq, evicted.commit_seq);
491 break;
492 }
493 }
494 }
11fdf7f2
TL
495 }
496 bool succ =
497 ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
498 if (UNLIKELY(!succ)) {
499 ROCKS_LOG_ERROR(info_log_,
500 "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64
501 ",%" PRIu64 " retrying...",
502 indexed_seq, prepare_seq, commit_seq);
503 // A very rare event, in which the commit entry is updated before we do.
504 // Here we apply a very simple solution of retrying.
505 if (loop_cnt > 100) {
506 throw std::runtime_error("Infinite loop in AddCommitted!");
507 }
508 AddCommitted(prepare_seq, commit_seq, ++loop_cnt);
509 return;
510 }
511 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
512 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
513}
514
515void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
516 const size_t batch_cnt) {
494da23a
TL
517 TEST_SYNC_POINT_CALLBACK(
518 "RemovePrepared:Start",
519 const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq)));
520 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause");
521 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume");
522 ROCKS_LOG_DETAILS(info_log_,
523 "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt,
524 prepare_seq, batch_cnt);
11fdf7f2
TL
525 WriteLock wl(&prepared_mutex_);
526 for (size_t i = 0; i < batch_cnt; i++) {
527 prepared_txns_.erase(prepare_seq + i);
528 bool was_empty = delayed_prepared_.empty();
529 if (!was_empty) {
530 delayed_prepared_.erase(prepare_seq + i);
494da23a
TL
531 auto it = delayed_prepared_commits_.find(prepare_seq + i);
532 if (it != delayed_prepared_commits_.end()) {
533 ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64,
534 prepare_seq + i);
535 delayed_prepared_commits_.erase(it);
536 }
11fdf7f2
TL
537 bool is_empty = delayed_prepared_.empty();
538 if (was_empty != is_empty) {
539 delayed_prepared_empty_.store(is_empty, std::memory_order_release);
540 }
541 }
542 }
543}
544
545bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
546 CommitEntry64b* entry_64b,
547 CommitEntry* entry) const {
548 *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(std::memory_order_acquire);
549 bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
550 return valid;
551}
552
553bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
554 const CommitEntry& new_entry,
555 CommitEntry* evicted_entry) {
556 CommitEntry64b new_entry_64b(new_entry, FORMAT);
557 CommitEntry64b evicted_entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].exchange(
558 new_entry_64b, std::memory_order_acq_rel);
559 bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
560 return valid;
561}
562
563bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
564 CommitEntry64b& expected_entry_64b,
565 const CommitEntry& new_entry) {
566 auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)];
567 CommitEntry64b new_entry_64b(new_entry, FORMAT);
568 bool succ = atomic_entry.compare_exchange_strong(
569 expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
570 std::memory_order_acquire);
571 return succ;
572}
573
574void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
575 const SequenceNumber& new_max) {
576 ROCKS_LOG_DETAILS(info_log_,
577 "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
578 prev_max, new_max);
494da23a
TL
579 // Declare the intention before getting snapshot from the DB. This helps a
580 // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
581 // it has not already. Otherwise the new snapshot is when we ask DB for
582 // snapshots smaller than future max.
583 auto updated_future_max = prev_max;
584 while (updated_future_max < new_max &&
585 !future_max_evicted_seq_.compare_exchange_weak(
586 updated_future_max, new_max, std::memory_order_acq_rel,
587 std::memory_order_relaxed)) {
588 };
589
11fdf7f2
TL
590 {
591 WriteLock wl(&prepared_mutex_);
494da23a 592 CheckPreparedAgainstMax(new_max);
11fdf7f2
TL
593 }
594
595 // With each change to max_evicted_seq_ fetch the live snapshots behind it.
596 // We use max as the version of snapshots to identify how fresh are the
597 // snapshot list. This works because the snapshots are between 0 and
598 // max, so the larger the max, the more complete they are.
599 SequenceNumber new_snapshots_version = new_max;
600 std::vector<SequenceNumber> snapshots;
601 bool update_snapshots = false;
602 if (new_snapshots_version > snapshots_version_) {
603 // This is to avoid updating the snapshots_ if it already updated
604 // with a more recent vesion by a concrrent thread
605 update_snapshots = true;
606 // We only care about snapshots lower then max
607 snapshots = GetSnapshotListFromDB(new_max);
608 }
609 if (update_snapshots) {
610 UpdateSnapshots(snapshots, new_snapshots_version);
494da23a
TL
611 if (!snapshots.empty()) {
612 WriteLock wl(&old_commit_map_mutex_);
613 for (auto snap : snapshots) {
614 // This allows IsInSnapshot to tell apart the reads from in valid
615 // snapshots from the reads from committed values in valid snapshots.
616 old_commit_map_[snap];
617 }
618 old_commit_map_empty_.store(false, std::memory_order_release);
619 }
11fdf7f2
TL
620 }
621 auto updated_prev_max = prev_max;
494da23a
TL
622 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause");
623 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume");
11fdf7f2
TL
624 while (updated_prev_max < new_max &&
625 !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
626 std::memory_order_acq_rel,
627 std::memory_order_relaxed)) {
628 };
629}
630
631const Snapshot* WritePreparedTxnDB::GetSnapshot() {
494da23a
TL
632 const bool kForWWConflictCheck = true;
633 return GetSnapshotInternal(!kForWWConflictCheck);
634}
635
636SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
637 bool for_ww_conflict_check) {
638 // Note: for this optimization setting the last sequence number and obtaining
639 // the smallest uncommitted seq should be done atomically. However to avoid
640 // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
641 // snapshot. Since we always updated the list of unprepared seq (via
642 // AddPrepared) AFTER the last sequence is updated, this guarantees that the
643 // smallest uncommitted seq that we pair with the snapshot is smaller or equal
644 // the value that would be obtained otherwise atomically. That is ok since
645 // this optimization works as long as min_uncommitted is less than or equal
646 // than the smallest uncommitted seq when the snapshot was taken.
11fdf7f2 647 auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
494da23a 648 SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
11fdf7f2 649 assert(snap_impl);
494da23a
TL
650 SequenceNumber snap_seq = snap_impl->GetSequenceNumber();
651 // Note: Check against future_max_evicted_seq_ (in contrast with
652 // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq.
653 if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) {
654 // There is a very rare case in which the commit entry evicts another commit
655 // entry that is not published yet thus advancing max evicted seq beyond the
656 // last published seq. This case is not likely in real-world setup so we
657 // handle it with a few retries.
658 size_t retry = 0;
659 SequenceNumber max;
660 while ((max = future_max_evicted_seq_.load()) != 0 &&
661 snap_impl->GetSequenceNumber() <= max && retry < 100) {
662 ROCKS_LOG_WARN(info_log_,
663 "GetSnapshot snap: %" PRIu64 " max: %" PRIu64
664 " retry %" ROCKSDB_PRIszt,
665 snap_impl->GetSequenceNumber(), max, retry);
666 ReleaseSnapshot(snap_impl);
667 // Wait for last visible seq to catch up with max, and also go beyond it
668 // by one.
669 AdvanceSeqByOne();
670 snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
671 assert(snap_impl);
672 retry++;
673 }
674 assert(snap_impl->GetSequenceNumber() > max);
675 if (snap_impl->GetSequenceNumber() <= max) {
676 throw std::runtime_error(
677 "Snapshot seq " + ToString(snap_impl->GetSequenceNumber()) +
678 " after " + ToString(retry) +
679 " retries is still less than futre_max_evicted_seq_" + ToString(max));
680 }
681 }
11fdf7f2 682 EnhanceSnapshot(snap_impl, min_uncommitted);
494da23a
TL
683 ROCKS_LOG_DETAILS(
684 db_impl_->immutable_db_options().info_log,
685 "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64,
686 snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted);
11fdf7f2
TL
687 return snap_impl;
688}
689
494da23a
TL
690void WritePreparedTxnDB::AdvanceSeqByOne() {
691 // Inserting an empty value will i) let the max evicted entry to be
692 // published, i.e., max == last_published, increase the last published to
693 // be one beyond max, i.e., max < last_published.
694 WriteOptions woptions;
695 TransactionOptions txn_options;
696 Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr);
697 std::hash<std::thread::id> hasher;
698 char name[64];
699 snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id()));
700 assert(strlen(name) < 64 - 1);
701 Status s = txn0->SetName(name);
702 assert(s.ok());
703 if (s.ok()) {
704 // Without prepare it would simply skip the commit
705 s = txn0->Prepare();
706 }
707 assert(s.ok());
708 if (s.ok()) {
709 s = txn0->Commit();
710 }
711 assert(s.ok());
712 delete txn0;
713}
714
11fdf7f2
TL
715const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
716 SequenceNumber max) {
717 ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
494da23a
TL
718 InstrumentedMutexLock dblock(db_impl_->mutex());
719 db_impl_->mutex()->AssertHeld();
11fdf7f2
TL
720 return db_impl_->snapshots().GetAll(nullptr, max);
721}
722
11fdf7f2
TL
723void WritePreparedTxnDB::ReleaseSnapshotInternal(
724 const SequenceNumber snap_seq) {
494da23a
TL
725 // TODO(myabandeh): relax should enough since the synchronizatin is already
726 // done by snapshots_mutex_ under which this function is called.
727 if (snap_seq <= max_evicted_seq_.load(std::memory_order_acquire)) {
11fdf7f2
TL
728 // Then this is a rare case that transaction did not finish before max
729 // advances. It is expected for a few read-only backup snapshots. For such
730 // snapshots we might have kept around a couple of entries in the
731 // old_commit_map_. Check and do garbage collection if that is the case.
732 bool need_gc = false;
733 {
734 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
494da23a
TL
735 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
736 snap_seq);
11fdf7f2
TL
737 ReadLock rl(&old_commit_map_mutex_);
738 auto prep_set_entry = old_commit_map_.find(snap_seq);
739 need_gc = prep_set_entry != old_commit_map_.end();
740 }
741 if (need_gc) {
742 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
494da23a
TL
743 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
744 snap_seq);
11fdf7f2
TL
745 WriteLock wl(&old_commit_map_mutex_);
746 old_commit_map_.erase(snap_seq);
747 old_commit_map_empty_.store(old_commit_map_.empty(),
748 std::memory_order_release);
749 }
750 }
751}
752
494da23a
TL
753void WritePreparedTxnDB::CleanupReleasedSnapshots(
754 const std::vector<SequenceNumber>& new_snapshots,
755 const std::vector<SequenceNumber>& old_snapshots) {
756 auto newi = new_snapshots.begin();
757 auto oldi = old_snapshots.begin();
758 for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) {
759 assert(*newi >= *oldi); // cannot have new snapshots with lower seq
760 if (*newi == *oldi) { // still not released
761 auto value = *newi;
762 while (newi != new_snapshots.end() && *newi == value) {
763 newi++;
764 }
765 while (oldi != old_snapshots.end() && *oldi == value) {
766 oldi++;
767 }
768 } else {
769 assert(*newi > *oldi); // *oldi is released
770 ReleaseSnapshotInternal(*oldi);
771 oldi++;
772 }
773 }
774 // Everything remained in old_snapshots is released and must be cleaned up
775 for (; oldi != old_snapshots.end(); oldi++) {
776 ReleaseSnapshotInternal(*oldi);
777 }
778}
779
11fdf7f2
TL
780void WritePreparedTxnDB::UpdateSnapshots(
781 const std::vector<SequenceNumber>& snapshots,
782 const SequenceNumber& version) {
783 ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
784 version);
785 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
786 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
787#ifndef NDEBUG
788 size_t sync_i = 0;
789#endif
790 ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead");
791 WriteLock wl(&snapshots_mutex_);
792 snapshots_version_ = version;
793 // We update the list concurrently with the readers.
794 // Both new and old lists are sorted and the new list is subset of the
795 // previous list plus some new items. Thus if a snapshot repeats in
796 // both new and old lists, it will appear upper in the new list. So if
797 // we simply insert the new snapshots in order, if an overwritten item
798 // is still valid in the new list is either written to the same place in
799 // the array or it is written in a higher palce before it gets
800 // overwritten by another item. This guarantess a reader that reads the
801 // list bottom-up will eventaully see a snapshot that repeats in the
802 // update, either before it gets overwritten by the writer or
803 // afterwards.
804 size_t i = 0;
805 auto it = snapshots.begin();
806 for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) {
807 snapshot_cache_[i].store(*it, std::memory_order_release);
808 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
809 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
810 }
811#ifndef NDEBUG
812 // Release the remaining sync points since they are useless given that the
813 // reader would also use lock to access snapshots
814 for (++sync_i; sync_i <= 10; ++sync_i) {
815 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
816 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
817 }
818#endif
819 snapshots_.clear();
820 for (; it != snapshots.end(); it++) {
821 // Insert them to a vector that is less efficient to access
822 // concurrently
823 snapshots_.push_back(*it);
824 }
825 // Update the size at the end. Otherwise a parallel reader might read
826 // items that are not set yet.
827 snapshots_total_.store(snapshots.size(), std::memory_order_release);
494da23a
TL
828
829 // Note: this must be done after the snapshots data structures are updated
830 // with the new list of snapshots.
831 CleanupReleasedSnapshots(snapshots, snapshots_all_);
832 snapshots_all_ = snapshots;
833
11fdf7f2
TL
834 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
835 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
836}
837
838void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
839 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
840 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
841#ifndef NDEBUG
842 size_t sync_i = 0;
843#endif
844 // First check the snapshot cache that is efficient for concurrent access
845 auto cnt = snapshots_total_.load(std::memory_order_acquire);
846 // The list might get updated concurrently as we are reading from it. The
847 // reader should be able to read all the snapshots that are still valid
848 // after the update. Since the survived snapshots are written in a higher
849 // place before gets overwritten the reader that reads bottom-up will
850 // eventully see it.
851 const bool next_is_larger = true;
494da23a
TL
852 // We will set to true if the border line snapshot suggests that.
853 bool search_larger_list = false;
11fdf7f2
TL
854 size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
855 for (; 0 < ip1; ip1--) {
494da23a
TL
856 SequenceNumber snapshot_seq =
857 snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
11fdf7f2
TL
858 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
859 ++sync_i);
860 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
494da23a
TL
861 if (ip1 == SNAPSHOT_CACHE_SIZE) { // border line snapshot
862 // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
863 // then later also continue the search to larger snapshots
864 search_larger_list = snapshot_seq < evicted.commit_seq;
865 }
11fdf7f2
TL
866 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
867 snapshot_seq, !next_is_larger)) {
868 break;
869 }
870 }
871#ifndef NDEBUG
872 // Release the remaining sync points before accquiring the lock
873 for (++sync_i; sync_i <= 10; ++sync_i) {
874 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
875 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
876 }
877#endif
878 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
879 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
494da23a 880 if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) {
11fdf7f2
TL
881 // Then access the less efficient list of snapshots_
882 WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
494da23a
TL
883 ROCKS_LOG_WARN(info_log_,
884 "snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64
885 "> with %" ROCKSDB_PRIszt " snapshots",
886 evicted.prep_seq, evicted.commit_seq, cnt);
11fdf7f2
TL
887 ReadLock rl(&snapshots_mutex_);
888 // Items could have moved from the snapshots_ to snapshot_cache_ before
889 // accquiring the lock. To make sure that we do not miss a valid snapshot,
890 // read snapshot_cache_ again while holding the lock.
891 for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
494da23a
TL
892 SequenceNumber snapshot_seq =
893 snapshot_cache_[i].load(std::memory_order_acquire);
11fdf7f2
TL
894 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
895 snapshot_seq, next_is_larger)) {
896 break;
897 }
898 }
899 for (auto snapshot_seq_2 : snapshots_) {
900 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
901 snapshot_seq_2, next_is_larger)) {
902 break;
903 }
904 }
905 }
906}
907
908bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
909 const uint64_t& prep_seq, const uint64_t& commit_seq,
910 const uint64_t& snapshot_seq, const bool next_is_larger = true) {
911 // If we do not store an entry in old_commit_map_ we assume it is committed in
912 // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
913 // the snapshot so we need not to keep the entry around for this snapshot.
914 if (commit_seq <= snapshot_seq) {
915 // continue the search if the next snapshot could be smaller than commit_seq
916 return !next_is_larger;
917 }
918 // then snapshot_seq < commit_seq
919 if (prep_seq <= snapshot_seq) { // overlapping range
920 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
494da23a
TL
921 ROCKS_LOG_WARN(info_log_,
922 "old_commit_map_mutex_ overhead for %" PRIu64
923 " commit entry: <%" PRIu64 ",%" PRIu64 ">",
924 snapshot_seq, prep_seq, commit_seq);
11fdf7f2
TL
925 WriteLock wl(&old_commit_map_mutex_);
926 old_commit_map_empty_.store(false, std::memory_order_release);
927 auto& vec = old_commit_map_[snapshot_seq];
928 vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq);
929 // We need to store it once for each overlapping snapshot. Returning true to
930 // continue the search if there is more overlapping snapshot.
931 return true;
932 }
933 // continue the search if the next snapshot could be larger than prep_seq
934 return next_is_larger;
935}
936
937WritePreparedTxnDB::~WritePreparedTxnDB() {
938 // At this point there could be running compaction/flush holding a
939 // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
940 // Make sure those jobs finished before destructing WritePreparedTxnDB.
941 db_impl_->CancelAllBackgroundWork(true /*wait*/);
942}
943
944void SubBatchCounter::InitWithComp(const uint32_t cf) {
945 auto cmp = comparators_[cf];
946 keys_[cf] = CFKeys(SetComparator(cmp));
947}
948
949void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
950 CFKeys& cf_keys = keys_[cf];
951 if (cf_keys.size() == 0) { // just inserted
952 InitWithComp(cf);
953 }
954 auto it = cf_keys.insert(key);
955 if (it.second == false) { // second is false if a element already existed.
956 batches_++;
957 keys_.clear();
958 InitWithComp(cf);
959 keys_[cf].insert(key);
960 }
961}
962
963} // namespace rocksdb
964#endif // ROCKSDB_LITE