]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn_db.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
10 #endif
11
12 #include "utilities/transactions/write_prepared_txn_db.h"
13
14 #include <inttypes.h>
15 #include <algorithm>
16 #include <string>
17 #include <unordered_set>
18 #include <vector>
19
20 #include "db/db_impl.h"
21 #include "rocksdb/db.h"
22 #include "rocksdb/options.h"
23 #include "rocksdb/utilities/transaction_db.h"
24 #include "util/cast_util.h"
25 #include "util/mutexlock.h"
26 #include "util/string_util.h"
27 #include "util/sync_point.h"
28 #include "utilities/transactions/pessimistic_transaction.h"
29 #include "utilities/transactions/transaction_db_mutex_impl.h"
30
31 namespace rocksdb {
32
33 Status WritePreparedTxnDB::Initialize(
34 const std::vector<size_t>& compaction_enabled_cf_indices,
35 const std::vector<ColumnFamilyHandle*>& handles) {
36 auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
37 assert(dbimpl != nullptr);
38 auto rtxns = dbimpl->recovered_transactions();
39 for (auto rtxn : rtxns) {
40 // There should only one batch for WritePrepared policy.
41 assert(rtxn.second->batches_.size() == 1);
42 const auto& seq = rtxn.second->batches_.begin()->first;
43 const auto& batch_info = rtxn.second->batches_.begin()->second;
44 auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
45 for (size_t i = 0; i < cnt; i++) {
46 AddPrepared(seq + i);
47 }
48 }
49 SequenceNumber prev_max = max_evicted_seq_;
50 SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
51 AdvanceMaxEvictedSeq(prev_max, last_seq);
52
53 db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
54 // A callback to commit a single sub-batch
55 class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
56 public:
57 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
58 : db_(db) {}
59 virtual Status Callback(SequenceNumber commit_seq,
60 bool is_mem_disabled) override {
61 #ifdef NDEBUG
62 (void)is_mem_disabled;
63 #endif
64 assert(!is_mem_disabled);
65 db_->AddCommitted(commit_seq, commit_seq);
66 return Status::OK();
67 }
68
69 private:
70 WritePreparedTxnDB* db_;
71 };
72 db_impl_->SetRecoverableStatePreReleaseCallback(
73 new CommitSubBatchPreReleaseCallback(this));
74
75 auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
76 handles);
77 return s;
78 }
79
80 Status WritePreparedTxnDB::VerifyCFOptions(
81 const ColumnFamilyOptions& cf_options) {
82 Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
83 if (!s.ok()) {
84 return s;
85 }
86 if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
87 return Status::InvalidArgument(
88 "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
89 "WritePrpeared transactions");
90 }
91 return Status::OK();
92 }
93
94 Transaction* WritePreparedTxnDB::BeginTransaction(
95 const WriteOptions& write_options, const TransactionOptions& txn_options,
96 Transaction* old_txn) {
97 if (old_txn != nullptr) {
98 ReinitializeTransaction(old_txn, write_options, txn_options);
99 return old_txn;
100 } else {
101 return new WritePreparedTxn(this, write_options, txn_options);
102 }
103 }
104
105 Status WritePreparedTxnDB::Write(
106 const WriteOptions& opts,
107 const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
108 if (optimizations.skip_concurrency_control) {
109 // Skip locking the rows
110 const size_t UNKNOWN_BATCH_CNT = 0;
111 const size_t ONE_BATCH_CNT = 1;
112 const size_t batch_cnt = optimizations.skip_duplicate_key_check
113 ? ONE_BATCH_CNT
114 : UNKNOWN_BATCH_CNT;
115 WritePreparedTxn* NO_TXN = nullptr;
116 return WriteInternal(opts, updates, batch_cnt, NO_TXN);
117 } else {
118 // TODO(myabandeh): Make use of skip_duplicate_key_check hint
119 // Fall back to unoptimized version
120 return PessimisticTransactionDB::Write(opts, updates);
121 }
122 }
123
124 Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
125 WriteBatch* batch, size_t batch_cnt,
126 WritePreparedTxn* txn) {
127 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
128 "CommitBatchInternal");
129 if (batch->Count() == 0) {
130 // Otherwise our 1 seq per batch logic will break since there is no seq
131 // increased for this batch.
132 return Status::OK();
133 }
134 if (batch_cnt == 0) { // not provided, then compute it
135 // TODO(myabandeh): add an option to allow user skipping this cost
136 SubBatchCounter counter(*GetCFComparatorMap());
137 auto s = batch->Iterate(&counter);
138 assert(s.ok());
139 batch_cnt = counter.BatchCount();
140 WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
141 ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
142 static_cast<uint64_t>(batch_cnt));
143 }
144 assert(batch_cnt);
145
146 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
147 WriteOptions write_options(write_options_orig);
148 bool sync = write_options.sync;
149 if (!do_one_write) {
150 // No need to sync on the first write
151 write_options.sync = false;
152 }
153 // In the absence of Prepare markers, use Noop as a batch separator
154 WriteBatchInternal::InsertNoop(batch);
155 const bool DISABLE_MEMTABLE = true;
156 const uint64_t no_log_ref = 0;
157 uint64_t seq_used = kMaxSequenceNumber;
158 const size_t ZERO_PREPARES = 0;
159 // Since this is not 2pc, there is no need for AddPrepared but having it in
160 // the PreReleaseCallback enables an optimization. Refer to
161 // SmallestUnCommittedSeq for more details.
162 AddPreparedCallback add_prepared_callback(
163 this, batch_cnt, db_impl_->immutable_db_options().two_write_queues);
164 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
165 this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
166 PreReleaseCallback* pre_release_callback;
167 if (do_one_write) {
168 pre_release_callback = &update_commit_map;
169 } else {
170 pre_release_callback = &add_prepared_callback;
171 }
172 auto s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr,
173 no_log_ref, !DISABLE_MEMTABLE, &seq_used,
174 batch_cnt, pre_release_callback);
175 assert(!s.ok() || seq_used != kMaxSequenceNumber);
176 uint64_t prepare_seq = seq_used;
177 if (txn != nullptr) {
178 txn->SetId(prepare_seq);
179 }
180 if (!s.ok()) {
181 return s;
182 }
183 if (do_one_write) {
184 return s;
185 } // else do the 2nd write for commit
186 // Set the original value of sync
187 write_options.sync = sync;
188 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
189 "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
190 prepare_seq);
191 // Commit the batch by writing an empty batch to the 2nd queue that will
192 // release the commit sequence number to readers.
193 const size_t ZERO_COMMITS = 0;
194 WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
195 this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
196 WriteBatch empty_batch;
197 empty_batch.PutLogData(Slice());
198 const size_t ONE_BATCH = 1;
199 // In the absence of Prepare markers, use Noop as a batch separator
200 WriteBatchInternal::InsertNoop(&empty_batch);
201 s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
202 no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
203 &update_commit_map_with_prepare);
204 assert(!s.ok() || seq_used != kMaxSequenceNumber);
205 // Note RemovePrepared should be called after WriteImpl that publishsed the
206 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
207 RemovePrepared(prepare_seq, batch_cnt);
208 return s;
209 }
210
211 Status WritePreparedTxnDB::Get(const ReadOptions& options,
212 ColumnFamilyHandle* column_family,
213 const Slice& key, PinnableSlice* value) {
214 // We are fine with the latest committed value. This could be done by
215 // specifying the snapshot as kMaxSequenceNumber.
216 SequenceNumber seq = kMaxSequenceNumber;
217 SequenceNumber min_uncommitted = 0;
218 if (options.snapshot != nullptr) {
219 seq = options.snapshot->GetSequenceNumber();
220 min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
221 options.snapshot)
222 ->min_uncommitted_;
223 } else {
224 min_uncommitted = SmallestUnCommittedSeq();
225 }
226 WritePreparedTxnReadCallback callback(this, seq, min_uncommitted);
227 bool* dont_care = nullptr;
228 // Note: no need to specify a snapshot for read options as no specific
229 // snapshot is requested by the user.
230 return db_impl_->GetImpl(options, column_family, key, value, dont_care,
231 &callback);
232 }
233
234 void WritePreparedTxnDB::UpdateCFComparatorMap(
235 const std::vector<ColumnFamilyHandle*>& handles) {
236 auto cf_map = new std::map<uint32_t, const Comparator*>();
237 auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>();
238 for (auto h : handles) {
239 auto id = h->GetID();
240 const Comparator* comparator = h->GetComparator();
241 (*cf_map)[id] = comparator;
242 if (id != 0) {
243 (*handle_map)[id] = h;
244 } else {
245 // The pointer to the default cf handle in the handles will be deleted.
246 // Use the pointer maintained by the db instead.
247 (*handle_map)[id] = DefaultColumnFamily();
248 }
249 }
250 cf_map_.reset(cf_map);
251 handle_map_.reset(handle_map);
252 }
253
254 void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) {
255 auto old_cf_map_ptr = cf_map_.get();
256 assert(old_cf_map_ptr);
257 auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
258 auto old_handle_map_ptr = handle_map_.get();
259 assert(old_handle_map_ptr);
260 auto handle_map =
261 new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr);
262 auto id = h->GetID();
263 const Comparator* comparator = h->GetComparator();
264 (*cf_map)[id] = comparator;
265 (*handle_map)[id] = h;
266 cf_map_.reset(cf_map);
267 handle_map_.reset(handle_map);
268 }
269
270
271 std::vector<Status> WritePreparedTxnDB::MultiGet(
272 const ReadOptions& options,
273 const std::vector<ColumnFamilyHandle*>& column_family,
274 const std::vector<Slice>& keys, std::vector<std::string>* values) {
275 assert(values);
276 size_t num_keys = keys.size();
277 values->resize(num_keys);
278
279 std::vector<Status> stat_list(num_keys);
280 for (size_t i = 0; i < num_keys; ++i) {
281 std::string* value = values ? &(*values)[i] : nullptr;
282 stat_list[i] = this->Get(options, column_family[i], keys[i], value);
283 }
284 return stat_list;
285 }
286
287 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
288 struct WritePreparedTxnDB::IteratorState {
289 IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
290 std::shared_ptr<ManagedSnapshot> s,
291 SequenceNumber min_uncommitted)
292 : callback(txn_db, sequence, min_uncommitted), snapshot(s) {}
293
294 WritePreparedTxnReadCallback callback;
295 std::shared_ptr<ManagedSnapshot> snapshot;
296 };
297
298 namespace {
299 static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
300 delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
301 }
302 } // anonymous namespace
303
304 Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
305 ColumnFamilyHandle* column_family) {
306 constexpr bool ALLOW_BLOB = true;
307 constexpr bool ALLOW_REFRESH = true;
308 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
309 SequenceNumber snapshot_seq = kMaxSequenceNumber;
310 SequenceNumber min_uncommitted = 0;
311 if (options.snapshot != nullptr) {
312 snapshot_seq = options.snapshot->GetSequenceNumber();
313 min_uncommitted =
314 static_cast_with_check<const SnapshotImpl, const Snapshot>(
315 options.snapshot)
316 ->min_uncommitted_;
317 } else {
318 auto* snapshot = GetSnapshot();
319 // We take a snapshot to make sure that the related data in the commit map
320 // are not deleted.
321 snapshot_seq = snapshot->GetSequenceNumber();
322 min_uncommitted =
323 static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
324 ->min_uncommitted_;
325 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
326 }
327 assert(snapshot_seq != kMaxSequenceNumber);
328 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
329 auto* state =
330 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
331 auto* db_iter =
332 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
333 !ALLOW_BLOB, !ALLOW_REFRESH);
334 db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
335 return db_iter;
336 }
337
338 Status WritePreparedTxnDB::NewIterators(
339 const ReadOptions& options,
340 const std::vector<ColumnFamilyHandle*>& column_families,
341 std::vector<Iterator*>* iterators) {
342 constexpr bool ALLOW_BLOB = true;
343 constexpr bool ALLOW_REFRESH = true;
344 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
345 SequenceNumber snapshot_seq = kMaxSequenceNumber;
346 SequenceNumber min_uncommitted = 0;
347 if (options.snapshot != nullptr) {
348 snapshot_seq = options.snapshot->GetSequenceNumber();
349 min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
350 options.snapshot)
351 ->min_uncommitted_;
352 } else {
353 auto* snapshot = GetSnapshot();
354 // We take a snapshot to make sure that the related data in the commit map
355 // are not deleted.
356 snapshot_seq = snapshot->GetSequenceNumber();
357 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
358 min_uncommitted =
359 static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
360 ->min_uncommitted_;
361 }
362 iterators->clear();
363 iterators->reserve(column_families.size());
364 for (auto* column_family : column_families) {
365 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
366 auto* state =
367 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
368 auto* db_iter =
369 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
370 !ALLOW_BLOB, !ALLOW_REFRESH);
371 db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
372 iterators->push_back(db_iter);
373 }
374 return Status::OK();
375 }
376
377 void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
378 // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
379 // around.
380 INC_STEP_FOR_MAX_EVICTED =
381 std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
382 snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
383 new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
384 commit_cache_ = unique_ptr<std::atomic<CommitEntry64b>[]>(
385 new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
386 }
387
388 void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
389 ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq);
390 assert(seq > max_evicted_seq_);
391 if (seq <= max_evicted_seq_) {
392 throw std::runtime_error(
393 "Added prepare_seq is larger than max_evicted_seq_: " + ToString(seq) +
394 " <= " + ToString(max_evicted_seq_.load()));
395 }
396 WriteLock wl(&prepared_mutex_);
397 prepared_txns_.push(seq);
398 }
399
400 void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
401 uint8_t loop_cnt) {
402 ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
403 prepare_seq, commit_seq);
404 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
405 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
406 auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
407 CommitEntry64b evicted_64b;
408 CommitEntry evicted;
409 bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
410 if (LIKELY(to_be_evicted)) {
411 assert(evicted.prep_seq != prepare_seq);
412 auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
413 ROCKS_LOG_DETAILS(info_log_,
414 "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
415 evicted.prep_seq, evicted.commit_seq, prev_max);
416 if (prev_max < evicted.commit_seq) {
417 // Inc max in larger steps to avoid frequent updates
418 auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED;
419 AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
420 }
421 // After each eviction from commit cache, check if the commit entry should
422 // be kept around because it overlaps with a live snapshot.
423 CheckAgainstSnapshots(evicted);
424 }
425 bool succ =
426 ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
427 if (UNLIKELY(!succ)) {
428 ROCKS_LOG_ERROR(info_log_,
429 "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64
430 ",%" PRIu64 " retrying...",
431 indexed_seq, prepare_seq, commit_seq);
432 // A very rare event, in which the commit entry is updated before we do.
433 // Here we apply a very simple solution of retrying.
434 if (loop_cnt > 100) {
435 throw std::runtime_error("Infinite loop in AddCommitted!");
436 }
437 AddCommitted(prepare_seq, commit_seq, ++loop_cnt);
438 return;
439 }
440 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
441 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
442 }
443
444 void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
445 const size_t batch_cnt) {
446 WriteLock wl(&prepared_mutex_);
447 for (size_t i = 0; i < batch_cnt; i++) {
448 prepared_txns_.erase(prepare_seq + i);
449 bool was_empty = delayed_prepared_.empty();
450 if (!was_empty) {
451 delayed_prepared_.erase(prepare_seq + i);
452 bool is_empty = delayed_prepared_.empty();
453 if (was_empty != is_empty) {
454 delayed_prepared_empty_.store(is_empty, std::memory_order_release);
455 }
456 }
457 }
458 }
459
460 bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
461 CommitEntry64b* entry_64b,
462 CommitEntry* entry) const {
463 *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(std::memory_order_acquire);
464 bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
465 return valid;
466 }
467
468 bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
469 const CommitEntry& new_entry,
470 CommitEntry* evicted_entry) {
471 CommitEntry64b new_entry_64b(new_entry, FORMAT);
472 CommitEntry64b evicted_entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].exchange(
473 new_entry_64b, std::memory_order_acq_rel);
474 bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
475 return valid;
476 }
477
478 bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
479 CommitEntry64b& expected_entry_64b,
480 const CommitEntry& new_entry) {
481 auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)];
482 CommitEntry64b new_entry_64b(new_entry, FORMAT);
483 bool succ = atomic_entry.compare_exchange_strong(
484 expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
485 std::memory_order_acquire);
486 return succ;
487 }
488
489 void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
490 const SequenceNumber& new_max) {
491 ROCKS_LOG_DETAILS(info_log_,
492 "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
493 prev_max, new_max);
494 // When max_evicted_seq_ advances, move older entries from prepared_txns_
495 // to delayed_prepared_. This guarantees that if a seq is lower than max,
496 // then it is not in prepared_txns_ ans save an expensive, synchronized
497 // lookup from a shared set. delayed_prepared_ is expected to be empty in
498 // normal cases.
499 {
500 WriteLock wl(&prepared_mutex_);
501 while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
502 auto to_be_popped = prepared_txns_.top();
503 delayed_prepared_.insert(to_be_popped);
504 ROCKS_LOG_WARN(info_log_,
505 "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
506 " new_max=%" PRIu64 " oldmax=%" PRIu64,
507 static_cast<uint64_t>(delayed_prepared_.size()),
508 to_be_popped, new_max, prev_max);
509 prepared_txns_.pop();
510 delayed_prepared_empty_.store(false, std::memory_order_release);
511 }
512 }
513
514 // With each change to max_evicted_seq_ fetch the live snapshots behind it.
515 // We use max as the version of snapshots to identify how fresh are the
516 // snapshot list. This works because the snapshots are between 0 and
517 // max, so the larger the max, the more complete they are.
518 SequenceNumber new_snapshots_version = new_max;
519 std::vector<SequenceNumber> snapshots;
520 bool update_snapshots = false;
521 if (new_snapshots_version > snapshots_version_) {
522 // This is to avoid updating the snapshots_ if it already updated
523 // with a more recent vesion by a concrrent thread
524 update_snapshots = true;
525 // We only care about snapshots lower then max
526 snapshots = GetSnapshotListFromDB(new_max);
527 }
528 if (update_snapshots) {
529 UpdateSnapshots(snapshots, new_snapshots_version);
530 }
531 auto updated_prev_max = prev_max;
532 while (updated_prev_max < new_max &&
533 !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
534 std::memory_order_acq_rel,
535 std::memory_order_relaxed)) {
536 };
537 }
538
539 const Snapshot* WritePreparedTxnDB::GetSnapshot() {
540 // Note: SmallestUnCommittedSeq must be called before GetSnapshotImpl. Refer
541 // to WritePreparedTxn::SetSnapshot for more explanation.
542 auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
543 const bool FOR_WW_CONFLICT_CHECK = true;
544 SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK);
545 assert(snap_impl);
546 EnhanceSnapshot(snap_impl, min_uncommitted);
547 return snap_impl;
548 }
549
550 const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
551 SequenceNumber max) {
552 ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
553 InstrumentedMutex(db_impl_->mutex());
554 return db_impl_->snapshots().GetAll(nullptr, max);
555 }
556
557 void WritePreparedTxnDB::ReleaseSnapshot(const Snapshot* snapshot) {
558 auto snap_seq = snapshot->GetSequenceNumber();
559 ReleaseSnapshotInternal(snap_seq);
560 db_impl_->ReleaseSnapshot(snapshot);
561 }
562
563 void WritePreparedTxnDB::ReleaseSnapshotInternal(
564 const SequenceNumber snap_seq) {
565 // relax is enough since max increases monotonically, i.e., if snap_seq <
566 // old_max => snap_seq < new_max as well.
567 if (snap_seq < max_evicted_seq_.load(std::memory_order_relaxed)) {
568 // Then this is a rare case that transaction did not finish before max
569 // advances. It is expected for a few read-only backup snapshots. For such
570 // snapshots we might have kept around a couple of entries in the
571 // old_commit_map_. Check and do garbage collection if that is the case.
572 bool need_gc = false;
573 {
574 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
575 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
576 ReadLock rl(&old_commit_map_mutex_);
577 auto prep_set_entry = old_commit_map_.find(snap_seq);
578 need_gc = prep_set_entry != old_commit_map_.end();
579 }
580 if (need_gc) {
581 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
582 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
583 WriteLock wl(&old_commit_map_mutex_);
584 old_commit_map_.erase(snap_seq);
585 old_commit_map_empty_.store(old_commit_map_.empty(),
586 std::memory_order_release);
587 }
588 }
589 }
590
591 void WritePreparedTxnDB::UpdateSnapshots(
592 const std::vector<SequenceNumber>& snapshots,
593 const SequenceNumber& version) {
594 ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
595 version);
596 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
597 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
598 #ifndef NDEBUG
599 size_t sync_i = 0;
600 #endif
601 ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead");
602 WriteLock wl(&snapshots_mutex_);
603 snapshots_version_ = version;
604 // We update the list concurrently with the readers.
605 // Both new and old lists are sorted and the new list is subset of the
606 // previous list plus some new items. Thus if a snapshot repeats in
607 // both new and old lists, it will appear upper in the new list. So if
608 // we simply insert the new snapshots in order, if an overwritten item
609 // is still valid in the new list is either written to the same place in
610 // the array or it is written in a higher palce before it gets
611 // overwritten by another item. This guarantess a reader that reads the
612 // list bottom-up will eventaully see a snapshot that repeats in the
613 // update, either before it gets overwritten by the writer or
614 // afterwards.
615 size_t i = 0;
616 auto it = snapshots.begin();
617 for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) {
618 snapshot_cache_[i].store(*it, std::memory_order_release);
619 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
620 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
621 }
622 #ifndef NDEBUG
623 // Release the remaining sync points since they are useless given that the
624 // reader would also use lock to access snapshots
625 for (++sync_i; sync_i <= 10; ++sync_i) {
626 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
627 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
628 }
629 #endif
630 snapshots_.clear();
631 for (; it != snapshots.end(); it++) {
632 // Insert them to a vector that is less efficient to access
633 // concurrently
634 snapshots_.push_back(*it);
635 }
636 // Update the size at the end. Otherwise a parallel reader might read
637 // items that are not set yet.
638 snapshots_total_.store(snapshots.size(), std::memory_order_release);
639 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
640 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
641 }
642
643 void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
644 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
645 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
646 #ifndef NDEBUG
647 size_t sync_i = 0;
648 #endif
649 // First check the snapshot cache that is efficient for concurrent access
650 auto cnt = snapshots_total_.load(std::memory_order_acquire);
651 // The list might get updated concurrently as we are reading from it. The
652 // reader should be able to read all the snapshots that are still valid
653 // after the update. Since the survived snapshots are written in a higher
654 // place before gets overwritten the reader that reads bottom-up will
655 // eventully see it.
656 const bool next_is_larger = true;
657 SequenceNumber snapshot_seq = kMaxSequenceNumber;
658 size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
659 for (; 0 < ip1; ip1--) {
660 snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
661 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
662 ++sync_i);
663 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
664 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
665 snapshot_seq, !next_is_larger)) {
666 break;
667 }
668 }
669 #ifndef NDEBUG
670 // Release the remaining sync points before accquiring the lock
671 for (++sync_i; sync_i <= 10; ++sync_i) {
672 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
673 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
674 }
675 #endif
676 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
677 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
678 if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
679 snapshot_seq < evicted.prep_seq)) {
680 // Then access the less efficient list of snapshots_
681 WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
682 ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead");
683 ReadLock rl(&snapshots_mutex_);
684 // Items could have moved from the snapshots_ to snapshot_cache_ before
685 // accquiring the lock. To make sure that we do not miss a valid snapshot,
686 // read snapshot_cache_ again while holding the lock.
687 for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
688 snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire);
689 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
690 snapshot_seq, next_is_larger)) {
691 break;
692 }
693 }
694 for (auto snapshot_seq_2 : snapshots_) {
695 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
696 snapshot_seq_2, next_is_larger)) {
697 break;
698 }
699 }
700 }
701 }
702
703 bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
704 const uint64_t& prep_seq, const uint64_t& commit_seq,
705 const uint64_t& snapshot_seq, const bool next_is_larger = true) {
706 // If we do not store an entry in old_commit_map_ we assume it is committed in
707 // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
708 // the snapshot so we need not to keep the entry around for this snapshot.
709 if (commit_seq <= snapshot_seq) {
710 // continue the search if the next snapshot could be smaller than commit_seq
711 return !next_is_larger;
712 }
713 // then snapshot_seq < commit_seq
714 if (prep_seq <= snapshot_seq) { // overlapping range
715 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
716 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
717 WriteLock wl(&old_commit_map_mutex_);
718 old_commit_map_empty_.store(false, std::memory_order_release);
719 auto& vec = old_commit_map_[snapshot_seq];
720 vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq);
721 // We need to store it once for each overlapping snapshot. Returning true to
722 // continue the search if there is more overlapping snapshot.
723 return true;
724 }
725 // continue the search if the next snapshot could be larger than prep_seq
726 return next_is_larger;
727 }
728
729 WritePreparedTxnDB::~WritePreparedTxnDB() {
730 // At this point there could be running compaction/flush holding a
731 // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
732 // Make sure those jobs finished before destructing WritePreparedTxnDB.
733 db_impl_->CancelAllBackgroundWork(true /*wait*/);
734 }
735
736 void SubBatchCounter::InitWithComp(const uint32_t cf) {
737 auto cmp = comparators_[cf];
738 keys_[cf] = CFKeys(SetComparator(cmp));
739 }
740
741 void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
742 CFKeys& cf_keys = keys_[cf];
743 if (cf_keys.size() == 0) { // just inserted
744 InitWithComp(cf);
745 }
746 auto it = cf_keys.insert(key);
747 if (it.second == false) { // second is false if a element already existed.
748 batches_++;
749 keys_.clear();
750 InitWithComp(cf);
751 keys_[cf].insert(key);
752 }
753 }
754
755 } // namespace rocksdb
756 #endif // ROCKSDB_LITE