]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #ifndef __STDC_FORMAT_MACROS | |
9 | #define __STDC_FORMAT_MACROS | |
10 | #endif | |
11 | ||
12 | #include "utilities/transactions/write_prepared_txn_db.h" | |
13 | ||
14 | #include <inttypes.h> | |
15 | #include <algorithm> | |
16 | #include <string> | |
17 | #include <unordered_set> | |
18 | #include <vector> | |
19 | ||
20 | #include "db/db_impl.h" | |
21 | #include "rocksdb/db.h" | |
22 | #include "rocksdb/options.h" | |
23 | #include "rocksdb/utilities/transaction_db.h" | |
24 | #include "util/cast_util.h" | |
25 | #include "util/mutexlock.h" | |
26 | #include "util/string_util.h" | |
27 | #include "util/sync_point.h" | |
28 | #include "utilities/transactions/pessimistic_transaction.h" | |
29 | #include "utilities/transactions/transaction_db_mutex_impl.h" | |
30 | ||
31 | namespace rocksdb { | |
32 | ||
33 | Status WritePreparedTxnDB::Initialize( | |
34 | const std::vector<size_t>& compaction_enabled_cf_indices, | |
35 | const std::vector<ColumnFamilyHandle*>& handles) { | |
36 | auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB()); | |
37 | assert(dbimpl != nullptr); | |
38 | auto rtxns = dbimpl->recovered_transactions(); | |
39 | for (auto rtxn : rtxns) { | |
40 | // There should only one batch for WritePrepared policy. | |
41 | assert(rtxn.second->batches_.size() == 1); | |
42 | const auto& seq = rtxn.second->batches_.begin()->first; | |
43 | const auto& batch_info = rtxn.second->batches_.begin()->second; | |
44 | auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1; | |
45 | for (size_t i = 0; i < cnt; i++) { | |
46 | AddPrepared(seq + i); | |
47 | } | |
48 | } | |
49 | SequenceNumber prev_max = max_evicted_seq_; | |
50 | SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); | |
51 | AdvanceMaxEvictedSeq(prev_max, last_seq); | |
494da23a TL |
52 | // Create a gap between max and the next snapshot. This simplifies the logic |
53 | // in IsInSnapshot by not having to consider the special case of max == | |
54 | // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest. | |
55 | if (last_seq) { | |
56 | db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1); | |
57 | db_impl_->versions_->SetLastSequence(last_seq + 1); | |
58 | db_impl_->versions_->SetLastPublishedSequence(last_seq + 1); | |
59 | } | |
11fdf7f2 TL |
60 | |
61 | db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this)); | |
62 | // A callback to commit a single sub-batch | |
63 | class CommitSubBatchPreReleaseCallback : public PreReleaseCallback { | |
64 | public: | |
65 | explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db) | |
66 | : db_(db) {} | |
494da23a TL |
67 | Status Callback(SequenceNumber commit_seq, |
68 | bool is_mem_disabled __attribute__((__unused__)), | |
69 | uint64_t) override { | |
11fdf7f2 TL |
70 | assert(!is_mem_disabled); |
71 | db_->AddCommitted(commit_seq, commit_seq); | |
72 | return Status::OK(); | |
73 | } | |
74 | ||
75 | private: | |
76 | WritePreparedTxnDB* db_; | |
77 | }; | |
78 | db_impl_->SetRecoverableStatePreReleaseCallback( | |
79 | new CommitSubBatchPreReleaseCallback(this)); | |
80 | ||
81 | auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, | |
82 | handles); | |
83 | return s; | |
84 | } | |
85 | ||
86 | Status WritePreparedTxnDB::VerifyCFOptions( | |
87 | const ColumnFamilyOptions& cf_options) { | |
88 | Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options); | |
89 | if (!s.ok()) { | |
90 | return s; | |
91 | } | |
92 | if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) { | |
93 | return Status::InvalidArgument( | |
94 | "memtable_factory->CanHandleDuplicatedKey() cannot be false with " | |
95 | "WritePrpeared transactions"); | |
96 | } | |
97 | return Status::OK(); | |
98 | } | |
99 | ||
100 | Transaction* WritePreparedTxnDB::BeginTransaction( | |
101 | const WriteOptions& write_options, const TransactionOptions& txn_options, | |
102 | Transaction* old_txn) { | |
103 | if (old_txn != nullptr) { | |
104 | ReinitializeTransaction(old_txn, write_options, txn_options); | |
105 | return old_txn; | |
106 | } else { | |
107 | return new WritePreparedTxn(this, write_options, txn_options); | |
108 | } | |
109 | } | |
110 | ||
111 | Status WritePreparedTxnDB::Write( | |
112 | const WriteOptions& opts, | |
113 | const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { | |
114 | if (optimizations.skip_concurrency_control) { | |
115 | // Skip locking the rows | |
116 | const size_t UNKNOWN_BATCH_CNT = 0; | |
117 | const size_t ONE_BATCH_CNT = 1; | |
118 | const size_t batch_cnt = optimizations.skip_duplicate_key_check | |
119 | ? ONE_BATCH_CNT | |
120 | : UNKNOWN_BATCH_CNT; | |
121 | WritePreparedTxn* NO_TXN = nullptr; | |
122 | return WriteInternal(opts, updates, batch_cnt, NO_TXN); | |
123 | } else { | |
124 | // TODO(myabandeh): Make use of skip_duplicate_key_check hint | |
125 | // Fall back to unoptimized version | |
126 | return PessimisticTransactionDB::Write(opts, updates); | |
127 | } | |
128 | } | |
129 | ||
130 | Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, | |
131 | WriteBatch* batch, size_t batch_cnt, | |
132 | WritePreparedTxn* txn) { | |
133 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, | |
134 | "CommitBatchInternal"); | |
135 | if (batch->Count() == 0) { | |
136 | // Otherwise our 1 seq per batch logic will break since there is no seq | |
137 | // increased for this batch. | |
138 | return Status::OK(); | |
139 | } | |
140 | if (batch_cnt == 0) { // not provided, then compute it | |
141 | // TODO(myabandeh): add an option to allow user skipping this cost | |
142 | SubBatchCounter counter(*GetCFComparatorMap()); | |
143 | auto s = batch->Iterate(&counter); | |
144 | assert(s.ok()); | |
145 | batch_cnt = counter.BatchCount(); | |
146 | WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD); | |
147 | ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches", | |
148 | static_cast<uint64_t>(batch_cnt)); | |
149 | } | |
150 | assert(batch_cnt); | |
151 | ||
152 | bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; | |
153 | WriteOptions write_options(write_options_orig); | |
154 | bool sync = write_options.sync; | |
155 | if (!do_one_write) { | |
156 | // No need to sync on the first write | |
157 | write_options.sync = false; | |
158 | } | |
159 | // In the absence of Prepare markers, use Noop as a batch separator | |
160 | WriteBatchInternal::InsertNoop(batch); | |
161 | const bool DISABLE_MEMTABLE = true; | |
162 | const uint64_t no_log_ref = 0; | |
163 | uint64_t seq_used = kMaxSequenceNumber; | |
164 | const size_t ZERO_PREPARES = 0; | |
494da23a | 165 | const bool kSeperatePrepareCommitBatches = true; |
11fdf7f2 TL |
166 | // Since this is not 2pc, there is no need for AddPrepared but having it in |
167 | // the PreReleaseCallback enables an optimization. Refer to | |
168 | // SmallestUnCommittedSeq for more details. | |
169 | AddPreparedCallback add_prepared_callback( | |
494da23a TL |
170 | this, db_impl_, batch_cnt, |
171 | db_impl_->immutable_db_options().two_write_queues, | |
172 | !kSeperatePrepareCommitBatches); | |
11fdf7f2 TL |
173 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( |
174 | this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt); | |
175 | PreReleaseCallback* pre_release_callback; | |
176 | if (do_one_write) { | |
177 | pre_release_callback = &update_commit_map; | |
178 | } else { | |
179 | pre_release_callback = &add_prepared_callback; | |
180 | } | |
181 | auto s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, | |
182 | no_log_ref, !DISABLE_MEMTABLE, &seq_used, | |
183 | batch_cnt, pre_release_callback); | |
184 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
185 | uint64_t prepare_seq = seq_used; | |
186 | if (txn != nullptr) { | |
187 | txn->SetId(prepare_seq); | |
188 | } | |
189 | if (!s.ok()) { | |
190 | return s; | |
191 | } | |
192 | if (do_one_write) { | |
193 | return s; | |
194 | } // else do the 2nd write for commit | |
195 | // Set the original value of sync | |
196 | write_options.sync = sync; | |
197 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, | |
198 | "CommitBatchInternal 2nd write prepare_seq: %" PRIu64, | |
199 | prepare_seq); | |
200 | // Commit the batch by writing an empty batch to the 2nd queue that will | |
201 | // release the commit sequence number to readers. | |
202 | const size_t ZERO_COMMITS = 0; | |
203 | WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( | |
204 | this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS); | |
205 | WriteBatch empty_batch; | |
206 | empty_batch.PutLogData(Slice()); | |
207 | const size_t ONE_BATCH = 1; | |
208 | // In the absence of Prepare markers, use Noop as a batch separator | |
209 | WriteBatchInternal::InsertNoop(&empty_batch); | |
210 | s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr, | |
211 | no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
212 | &update_commit_map_with_prepare); | |
213 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
214 | // Note RemovePrepared should be called after WriteImpl that publishsed the | |
215 | // seq. Otherwise SmallestUnCommittedSeq optimization breaks. | |
216 | RemovePrepared(prepare_seq, batch_cnt); | |
217 | return s; | |
218 | } | |
219 | ||
220 | Status WritePreparedTxnDB::Get(const ReadOptions& options, | |
221 | ColumnFamilyHandle* column_family, | |
222 | const Slice& key, PinnableSlice* value) { | |
223 | // We are fine with the latest committed value. This could be done by | |
224 | // specifying the snapshot as kMaxSequenceNumber. | |
225 | SequenceNumber seq = kMaxSequenceNumber; | |
226 | SequenceNumber min_uncommitted = 0; | |
227 | if (options.snapshot != nullptr) { | |
228 | seq = options.snapshot->GetSequenceNumber(); | |
229 | min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>( | |
230 | options.snapshot) | |
231 | ->min_uncommitted_; | |
232 | } else { | |
233 | min_uncommitted = SmallestUnCommittedSeq(); | |
234 | } | |
235 | WritePreparedTxnReadCallback callback(this, seq, min_uncommitted); | |
236 | bool* dont_care = nullptr; | |
237 | // Note: no need to specify a snapshot for read options as no specific | |
238 | // snapshot is requested by the user. | |
239 | return db_impl_->GetImpl(options, column_family, key, value, dont_care, | |
240 | &callback); | |
241 | } | |
242 | ||
243 | void WritePreparedTxnDB::UpdateCFComparatorMap( | |
244 | const std::vector<ColumnFamilyHandle*>& handles) { | |
245 | auto cf_map = new std::map<uint32_t, const Comparator*>(); | |
246 | auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>(); | |
247 | for (auto h : handles) { | |
248 | auto id = h->GetID(); | |
249 | const Comparator* comparator = h->GetComparator(); | |
250 | (*cf_map)[id] = comparator; | |
251 | if (id != 0) { | |
252 | (*handle_map)[id] = h; | |
253 | } else { | |
254 | // The pointer to the default cf handle in the handles will be deleted. | |
255 | // Use the pointer maintained by the db instead. | |
256 | (*handle_map)[id] = DefaultColumnFamily(); | |
257 | } | |
258 | } | |
259 | cf_map_.reset(cf_map); | |
260 | handle_map_.reset(handle_map); | |
261 | } | |
262 | ||
263 | void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) { | |
264 | auto old_cf_map_ptr = cf_map_.get(); | |
265 | assert(old_cf_map_ptr); | |
266 | auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr); | |
267 | auto old_handle_map_ptr = handle_map_.get(); | |
268 | assert(old_handle_map_ptr); | |
269 | auto handle_map = | |
270 | new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr); | |
271 | auto id = h->GetID(); | |
272 | const Comparator* comparator = h->GetComparator(); | |
273 | (*cf_map)[id] = comparator; | |
274 | (*handle_map)[id] = h; | |
275 | cf_map_.reset(cf_map); | |
276 | handle_map_.reset(handle_map); | |
277 | } | |
278 | ||
279 | ||
280 | std::vector<Status> WritePreparedTxnDB::MultiGet( | |
281 | const ReadOptions& options, | |
282 | const std::vector<ColumnFamilyHandle*>& column_family, | |
283 | const std::vector<Slice>& keys, std::vector<std::string>* values) { | |
284 | assert(values); | |
285 | size_t num_keys = keys.size(); | |
286 | values->resize(num_keys); | |
287 | ||
288 | std::vector<Status> stat_list(num_keys); | |
289 | for (size_t i = 0; i < num_keys; ++i) { | |
290 | std::string* value = values ? &(*values)[i] : nullptr; | |
291 | stat_list[i] = this->Get(options, column_family[i], keys[i], value); | |
292 | } | |
293 | return stat_list; | |
294 | } | |
295 | ||
296 | // Struct to hold ownership of snapshot and read callback for iterator cleanup. | |
297 | struct WritePreparedTxnDB::IteratorState { | |
298 | IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, | |
299 | std::shared_ptr<ManagedSnapshot> s, | |
300 | SequenceNumber min_uncommitted) | |
301 | : callback(txn_db, sequence, min_uncommitted), snapshot(s) {} | |
302 | ||
303 | WritePreparedTxnReadCallback callback; | |
304 | std::shared_ptr<ManagedSnapshot> snapshot; | |
305 | }; | |
306 | ||
307 | namespace { | |
308 | static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) { | |
309 | delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1); | |
310 | } | |
311 | } // anonymous namespace | |
312 | ||
313 | Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, | |
314 | ColumnFamilyHandle* column_family) { | |
315 | constexpr bool ALLOW_BLOB = true; | |
316 | constexpr bool ALLOW_REFRESH = true; | |
317 | std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; | |
318 | SequenceNumber snapshot_seq = kMaxSequenceNumber; | |
319 | SequenceNumber min_uncommitted = 0; | |
320 | if (options.snapshot != nullptr) { | |
321 | snapshot_seq = options.snapshot->GetSequenceNumber(); | |
322 | min_uncommitted = | |
323 | static_cast_with_check<const SnapshotImpl, const Snapshot>( | |
324 | options.snapshot) | |
325 | ->min_uncommitted_; | |
326 | } else { | |
327 | auto* snapshot = GetSnapshot(); | |
328 | // We take a snapshot to make sure that the related data in the commit map | |
329 | // are not deleted. | |
330 | snapshot_seq = snapshot->GetSequenceNumber(); | |
331 | min_uncommitted = | |
332 | static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot) | |
333 | ->min_uncommitted_; | |
334 | own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); | |
335 | } | |
336 | assert(snapshot_seq != kMaxSequenceNumber); | |
337 | auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); | |
338 | auto* state = | |
339 | new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted); | |
340 | auto* db_iter = | |
341 | db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, | |
342 | !ALLOW_BLOB, !ALLOW_REFRESH); | |
343 | db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); | |
344 | return db_iter; | |
345 | } | |
346 | ||
347 | Status WritePreparedTxnDB::NewIterators( | |
348 | const ReadOptions& options, | |
349 | const std::vector<ColumnFamilyHandle*>& column_families, | |
350 | std::vector<Iterator*>* iterators) { | |
351 | constexpr bool ALLOW_BLOB = true; | |
352 | constexpr bool ALLOW_REFRESH = true; | |
353 | std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; | |
354 | SequenceNumber snapshot_seq = kMaxSequenceNumber; | |
355 | SequenceNumber min_uncommitted = 0; | |
356 | if (options.snapshot != nullptr) { | |
357 | snapshot_seq = options.snapshot->GetSequenceNumber(); | |
358 | min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>( | |
359 | options.snapshot) | |
360 | ->min_uncommitted_; | |
361 | } else { | |
362 | auto* snapshot = GetSnapshot(); | |
363 | // We take a snapshot to make sure that the related data in the commit map | |
364 | // are not deleted. | |
365 | snapshot_seq = snapshot->GetSequenceNumber(); | |
366 | own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); | |
367 | min_uncommitted = | |
368 | static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot) | |
369 | ->min_uncommitted_; | |
370 | } | |
371 | iterators->clear(); | |
372 | iterators->reserve(column_families.size()); | |
373 | for (auto* column_family : column_families) { | |
374 | auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); | |
375 | auto* state = | |
376 | new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted); | |
377 | auto* db_iter = | |
378 | db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, | |
379 | !ALLOW_BLOB, !ALLOW_REFRESH); | |
380 | db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); | |
381 | iterators->push_back(db_iter); | |
382 | } | |
383 | return Status::OK(); | |
384 | } | |
385 | ||
386 | void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { | |
387 | // Adcance max_evicted_seq_ no more than 100 times before the cache wraps | |
388 | // around. | |
389 | INC_STEP_FOR_MAX_EVICTED = | |
390 | std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1)); | |
494da23a | 391 | snapshot_cache_ = std::unique_ptr<std::atomic<SequenceNumber>[]>( |
11fdf7f2 | 392 | new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {}); |
494da23a | 393 | commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>( |
11fdf7f2 TL |
394 | new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {}); |
395 | } | |
396 | ||
494da23a TL |
397 | void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max) { |
398 | prepared_mutex_.AssertHeld(); | |
399 | // When max_evicted_seq_ advances, move older entries from prepared_txns_ | |
400 | // to delayed_prepared_. This guarantees that if a seq is lower than max, | |
401 | // then it is not in prepared_txns_ and save an expensive, synchronized | |
402 | // lookup from a shared set. delayed_prepared_ is expected to be empty in | |
403 | // normal cases. | |
404 | ROCKS_LOG_DETAILS( | |
405 | info_log_, | |
406 | "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64, | |
407 | prepared_txns_.empty(), | |
408 | prepared_txns_.empty() ? 0 : prepared_txns_.top()); | |
409 | while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { | |
410 | auto to_be_popped = prepared_txns_.top(); | |
411 | delayed_prepared_.insert(to_be_popped); | |
412 | ROCKS_LOG_WARN(info_log_, | |
413 | "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64 | |
414 | " new_max=%" PRIu64, | |
415 | static_cast<uint64_t>(delayed_prepared_.size()), | |
416 | to_be_popped, new_max); | |
417 | prepared_txns_.pop(); | |
418 | delayed_prepared_empty_.store(false, std::memory_order_release); | |
11fdf7f2 | 419 | } |
494da23a TL |
420 | } |
421 | ||
422 | void WritePreparedTxnDB::AddPrepared(uint64_t seq) { | |
423 | ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64, | |
424 | seq, max_evicted_seq_.load()); | |
425 | TEST_SYNC_POINT("AddPrepared::begin:pause"); | |
426 | TEST_SYNC_POINT("AddPrepared::begin:resume"); | |
11fdf7f2 TL |
427 | WriteLock wl(&prepared_mutex_); |
428 | prepared_txns_.push(seq); | |
494da23a TL |
429 | auto new_max = future_max_evicted_seq_.load(); |
430 | if (UNLIKELY(seq <= new_max)) { | |
431 | // This should not happen in normal case | |
432 | ROCKS_LOG_ERROR( | |
433 | info_log_, | |
434 | "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64 | |
435 | " <= %" PRIu64, | |
436 | seq, new_max); | |
437 | CheckPreparedAgainstMax(new_max); | |
438 | } | |
439 | TEST_SYNC_POINT("AddPrepared::end"); | |
11fdf7f2 TL |
440 | } |
441 | ||
442 | void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, | |
443 | uint8_t loop_cnt) { | |
444 | ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, | |
445 | prepare_seq, commit_seq); | |
446 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start"); | |
447 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause"); | |
448 | auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; | |
449 | CommitEntry64b evicted_64b; | |
450 | CommitEntry evicted; | |
451 | bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted); | |
452 | if (LIKELY(to_be_evicted)) { | |
453 | assert(evicted.prep_seq != prepare_seq); | |
454 | auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); | |
455 | ROCKS_LOG_DETAILS(info_log_, | |
456 | "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64, | |
457 | evicted.prep_seq, evicted.commit_seq, prev_max); | |
458 | if (prev_max < evicted.commit_seq) { | |
494da23a TL |
459 | auto last = db_impl_->GetLastPublishedSequence(); // could be 0 |
460 | SequenceNumber max_evicted_seq; | |
461 | if (LIKELY(evicted.commit_seq < last)) { | |
462 | assert(last > 0); | |
463 | // Inc max in larger steps to avoid frequent updates | |
464 | max_evicted_seq = | |
465 | std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1); | |
466 | } else { | |
467 | // legit when a commit entry in a write batch overwrite the previous one | |
468 | max_evicted_seq = evicted.commit_seq; | |
469 | } | |
470 | ROCKS_LOG_DETAILS(info_log_, | |
471 | "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64 | |
472 | " => %lu", | |
473 | prepare_seq, evicted.prep_seq, evicted.commit_seq, | |
474 | prev_max, max_evicted_seq); | |
11fdf7f2 TL |
475 | AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); |
476 | } | |
477 | // After each eviction from commit cache, check if the commit entry should | |
478 | // be kept around because it overlaps with a live snapshot. | |
479 | CheckAgainstSnapshots(evicted); | |
494da23a TL |
480 | if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) { |
481 | WriteLock wl(&prepared_mutex_); | |
482 | for (auto dp : delayed_prepared_) { | |
483 | if (dp == evicted.prep_seq) { | |
484 | // This is a rare case that txn is committed but prepared_txns_ is not | |
485 | // cleaned up yet. Refer to delayed_prepared_commits_ definition for | |
486 | // why it should be kept updated. | |
487 | delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq; | |
488 | ROCKS_LOG_DEBUG(info_log_, | |
489 | "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64, | |
490 | evicted.prep_seq, evicted.commit_seq); | |
491 | break; | |
492 | } | |
493 | } | |
494 | } | |
11fdf7f2 TL |
495 | } |
496 | bool succ = | |
497 | ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq}); | |
498 | if (UNLIKELY(!succ)) { | |
499 | ROCKS_LOG_ERROR(info_log_, | |
500 | "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64 | |
501 | ",%" PRIu64 " retrying...", | |
502 | indexed_seq, prepare_seq, commit_seq); | |
503 | // A very rare event, in which the commit entry is updated before we do. | |
504 | // Here we apply a very simple solution of retrying. | |
505 | if (loop_cnt > 100) { | |
506 | throw std::runtime_error("Infinite loop in AddCommitted!"); | |
507 | } | |
508 | AddCommitted(prepare_seq, commit_seq, ++loop_cnt); | |
509 | return; | |
510 | } | |
511 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end"); | |
512 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause"); | |
513 | } | |
514 | ||
515 | void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq, | |
516 | const size_t batch_cnt) { | |
494da23a TL |
517 | TEST_SYNC_POINT_CALLBACK( |
518 | "RemovePrepared:Start", | |
519 | const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq))); | |
520 | TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause"); | |
521 | TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume"); | |
522 | ROCKS_LOG_DETAILS(info_log_, | |
523 | "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt, | |
524 | prepare_seq, batch_cnt); | |
11fdf7f2 TL |
525 | WriteLock wl(&prepared_mutex_); |
526 | for (size_t i = 0; i < batch_cnt; i++) { | |
527 | prepared_txns_.erase(prepare_seq + i); | |
528 | bool was_empty = delayed_prepared_.empty(); | |
529 | if (!was_empty) { | |
530 | delayed_prepared_.erase(prepare_seq + i); | |
494da23a TL |
531 | auto it = delayed_prepared_commits_.find(prepare_seq + i); |
532 | if (it != delayed_prepared_commits_.end()) { | |
533 | ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64, | |
534 | prepare_seq + i); | |
535 | delayed_prepared_commits_.erase(it); | |
536 | } | |
11fdf7f2 TL |
537 | bool is_empty = delayed_prepared_.empty(); |
538 | if (was_empty != is_empty) { | |
539 | delayed_prepared_empty_.store(is_empty, std::memory_order_release); | |
540 | } | |
541 | } | |
542 | } | |
543 | } | |
544 | ||
545 | bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, | |
546 | CommitEntry64b* entry_64b, | |
547 | CommitEntry* entry) const { | |
548 | *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(std::memory_order_acquire); | |
549 | bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT); | |
550 | return valid; | |
551 | } | |
552 | ||
553 | bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq, | |
554 | const CommitEntry& new_entry, | |
555 | CommitEntry* evicted_entry) { | |
556 | CommitEntry64b new_entry_64b(new_entry, FORMAT); | |
557 | CommitEntry64b evicted_entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].exchange( | |
558 | new_entry_64b, std::memory_order_acq_rel); | |
559 | bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT); | |
560 | return valid; | |
561 | } | |
562 | ||
563 | bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, | |
564 | CommitEntry64b& expected_entry_64b, | |
565 | const CommitEntry& new_entry) { | |
566 | auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)]; | |
567 | CommitEntry64b new_entry_64b(new_entry, FORMAT); | |
568 | bool succ = atomic_entry.compare_exchange_strong( | |
569 | expected_entry_64b, new_entry_64b, std::memory_order_acq_rel, | |
570 | std::memory_order_acquire); | |
571 | return succ; | |
572 | } | |
573 | ||
574 | void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, | |
575 | const SequenceNumber& new_max) { | |
576 | ROCKS_LOG_DETAILS(info_log_, | |
577 | "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64, | |
578 | prev_max, new_max); | |
494da23a TL |
579 | // Declare the intention before getting snapshot from the DB. This helps a |
580 | // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if | |
581 | // it has not already. Otherwise the new snapshot is when we ask DB for | |
582 | // snapshots smaller than future max. | |
583 | auto updated_future_max = prev_max; | |
584 | while (updated_future_max < new_max && | |
585 | !future_max_evicted_seq_.compare_exchange_weak( | |
586 | updated_future_max, new_max, std::memory_order_acq_rel, | |
587 | std::memory_order_relaxed)) { | |
588 | }; | |
589 | ||
11fdf7f2 TL |
590 | { |
591 | WriteLock wl(&prepared_mutex_); | |
494da23a | 592 | CheckPreparedAgainstMax(new_max); |
11fdf7f2 TL |
593 | } |
594 | ||
595 | // With each change to max_evicted_seq_ fetch the live snapshots behind it. | |
596 | // We use max as the version of snapshots to identify how fresh are the | |
597 | // snapshot list. This works because the snapshots are between 0 and | |
598 | // max, so the larger the max, the more complete they are. | |
599 | SequenceNumber new_snapshots_version = new_max; | |
600 | std::vector<SequenceNumber> snapshots; | |
601 | bool update_snapshots = false; | |
602 | if (new_snapshots_version > snapshots_version_) { | |
603 | // This is to avoid updating the snapshots_ if it already updated | |
604 | // with a more recent vesion by a concrrent thread | |
605 | update_snapshots = true; | |
606 | // We only care about snapshots lower then max | |
607 | snapshots = GetSnapshotListFromDB(new_max); | |
608 | } | |
609 | if (update_snapshots) { | |
610 | UpdateSnapshots(snapshots, new_snapshots_version); | |
494da23a TL |
611 | if (!snapshots.empty()) { |
612 | WriteLock wl(&old_commit_map_mutex_); | |
613 | for (auto snap : snapshots) { | |
614 | // This allows IsInSnapshot to tell apart the reads from in valid | |
615 | // snapshots from the reads from committed values in valid snapshots. | |
616 | old_commit_map_[snap]; | |
617 | } | |
618 | old_commit_map_empty_.store(false, std::memory_order_release); | |
619 | } | |
11fdf7f2 TL |
620 | } |
621 | auto updated_prev_max = prev_max; | |
494da23a TL |
622 | TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause"); |
623 | TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume"); | |
11fdf7f2 TL |
624 | while (updated_prev_max < new_max && |
625 | !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max, | |
626 | std::memory_order_acq_rel, | |
627 | std::memory_order_relaxed)) { | |
628 | }; | |
629 | } | |
630 | ||
631 | const Snapshot* WritePreparedTxnDB::GetSnapshot() { | |
494da23a TL |
632 | const bool kForWWConflictCheck = true; |
633 | return GetSnapshotInternal(!kForWWConflictCheck); | |
634 | } | |
635 | ||
636 | SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal( | |
637 | bool for_ww_conflict_check) { | |
638 | // Note: for this optimization setting the last sequence number and obtaining | |
639 | // the smallest uncommitted seq should be done atomically. However to avoid | |
640 | // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the | |
641 | // snapshot. Since we always updated the list of unprepared seq (via | |
642 | // AddPrepared) AFTER the last sequence is updated, this guarantees that the | |
643 | // smallest uncommitted seq that we pair with the snapshot is smaller or equal | |
644 | // the value that would be obtained otherwise atomically. That is ok since | |
645 | // this optimization works as long as min_uncommitted is less than or equal | |
646 | // than the smallest uncommitted seq when the snapshot was taken. | |
11fdf7f2 | 647 | auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq(); |
494da23a | 648 | SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check); |
11fdf7f2 | 649 | assert(snap_impl); |
494da23a TL |
650 | SequenceNumber snap_seq = snap_impl->GetSequenceNumber(); |
651 | // Note: Check against future_max_evicted_seq_ (in contrast with | |
652 | // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq. | |
653 | if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) { | |
654 | // There is a very rare case in which the commit entry evicts another commit | |
655 | // entry that is not published yet thus advancing max evicted seq beyond the | |
656 | // last published seq. This case is not likely in real-world setup so we | |
657 | // handle it with a few retries. | |
658 | size_t retry = 0; | |
659 | SequenceNumber max; | |
660 | while ((max = future_max_evicted_seq_.load()) != 0 && | |
661 | snap_impl->GetSequenceNumber() <= max && retry < 100) { | |
662 | ROCKS_LOG_WARN(info_log_, | |
663 | "GetSnapshot snap: %" PRIu64 " max: %" PRIu64 | |
664 | " retry %" ROCKSDB_PRIszt, | |
665 | snap_impl->GetSequenceNumber(), max, retry); | |
666 | ReleaseSnapshot(snap_impl); | |
667 | // Wait for last visible seq to catch up with max, and also go beyond it | |
668 | // by one. | |
669 | AdvanceSeqByOne(); | |
670 | snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check); | |
671 | assert(snap_impl); | |
672 | retry++; | |
673 | } | |
674 | assert(snap_impl->GetSequenceNumber() > max); | |
675 | if (snap_impl->GetSequenceNumber() <= max) { | |
676 | throw std::runtime_error( | |
677 | "Snapshot seq " + ToString(snap_impl->GetSequenceNumber()) + | |
678 | " after " + ToString(retry) + | |
679 | " retries is still less than futre_max_evicted_seq_" + ToString(max)); | |
680 | } | |
681 | } | |
11fdf7f2 | 682 | EnhanceSnapshot(snap_impl, min_uncommitted); |
494da23a TL |
683 | ROCKS_LOG_DETAILS( |
684 | db_impl_->immutable_db_options().info_log, | |
685 | "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64, | |
686 | snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted); | |
11fdf7f2 TL |
687 | return snap_impl; |
688 | } | |
689 | ||
494da23a TL |
690 | void WritePreparedTxnDB::AdvanceSeqByOne() { |
691 | // Inserting an empty value will i) let the max evicted entry to be | |
692 | // published, i.e., max == last_published, increase the last published to | |
693 | // be one beyond max, i.e., max < last_published. | |
694 | WriteOptions woptions; | |
695 | TransactionOptions txn_options; | |
696 | Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr); | |
697 | std::hash<std::thread::id> hasher; | |
698 | char name[64]; | |
699 | snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id())); | |
700 | assert(strlen(name) < 64 - 1); | |
701 | Status s = txn0->SetName(name); | |
702 | assert(s.ok()); | |
703 | if (s.ok()) { | |
704 | // Without prepare it would simply skip the commit | |
705 | s = txn0->Prepare(); | |
706 | } | |
707 | assert(s.ok()); | |
708 | if (s.ok()) { | |
709 | s = txn0->Commit(); | |
710 | } | |
711 | assert(s.ok()); | |
712 | delete txn0; | |
713 | } | |
714 | ||
11fdf7f2 TL |
715 | const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB( |
716 | SequenceNumber max) { | |
717 | ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max); | |
494da23a TL |
718 | InstrumentedMutexLock dblock(db_impl_->mutex()); |
719 | db_impl_->mutex()->AssertHeld(); | |
11fdf7f2 TL |
720 | return db_impl_->snapshots().GetAll(nullptr, max); |
721 | } | |
722 | ||
11fdf7f2 TL |
723 | void WritePreparedTxnDB::ReleaseSnapshotInternal( |
724 | const SequenceNumber snap_seq) { | |
494da23a TL |
725 | // TODO(myabandeh): relax should enough since the synchronizatin is already |
726 | // done by snapshots_mutex_ under which this function is called. | |
727 | if (snap_seq <= max_evicted_seq_.load(std::memory_order_acquire)) { | |
11fdf7f2 TL |
728 | // Then this is a rare case that transaction did not finish before max |
729 | // advances. It is expected for a few read-only backup snapshots. For such | |
730 | // snapshots we might have kept around a couple of entries in the | |
731 | // old_commit_map_. Check and do garbage collection if that is the case. | |
732 | bool need_gc = false; | |
733 | { | |
734 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
494da23a TL |
735 | ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64, |
736 | snap_seq); | |
11fdf7f2 TL |
737 | ReadLock rl(&old_commit_map_mutex_); |
738 | auto prep_set_entry = old_commit_map_.find(snap_seq); | |
739 | need_gc = prep_set_entry != old_commit_map_.end(); | |
740 | } | |
741 | if (need_gc) { | |
742 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
494da23a TL |
743 | ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64, |
744 | snap_seq); | |
11fdf7f2 TL |
745 | WriteLock wl(&old_commit_map_mutex_); |
746 | old_commit_map_.erase(snap_seq); | |
747 | old_commit_map_empty_.store(old_commit_map_.empty(), | |
748 | std::memory_order_release); | |
749 | } | |
750 | } | |
751 | } | |
752 | ||
494da23a TL |
753 | void WritePreparedTxnDB::CleanupReleasedSnapshots( |
754 | const std::vector<SequenceNumber>& new_snapshots, | |
755 | const std::vector<SequenceNumber>& old_snapshots) { | |
756 | auto newi = new_snapshots.begin(); | |
757 | auto oldi = old_snapshots.begin(); | |
758 | for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) { | |
759 | assert(*newi >= *oldi); // cannot have new snapshots with lower seq | |
760 | if (*newi == *oldi) { // still not released | |
761 | auto value = *newi; | |
762 | while (newi != new_snapshots.end() && *newi == value) { | |
763 | newi++; | |
764 | } | |
765 | while (oldi != old_snapshots.end() && *oldi == value) { | |
766 | oldi++; | |
767 | } | |
768 | } else { | |
769 | assert(*newi > *oldi); // *oldi is released | |
770 | ReleaseSnapshotInternal(*oldi); | |
771 | oldi++; | |
772 | } | |
773 | } | |
774 | // Everything remained in old_snapshots is released and must be cleaned up | |
775 | for (; oldi != old_snapshots.end(); oldi++) { | |
776 | ReleaseSnapshotInternal(*oldi); | |
777 | } | |
778 | } | |
779 | ||
11fdf7f2 TL |
780 | void WritePreparedTxnDB::UpdateSnapshots( |
781 | const std::vector<SequenceNumber>& snapshots, | |
782 | const SequenceNumber& version) { | |
783 | ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64, | |
784 | version); | |
785 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start"); | |
786 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start"); | |
787 | #ifndef NDEBUG | |
788 | size_t sync_i = 0; | |
789 | #endif | |
790 | ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead"); | |
791 | WriteLock wl(&snapshots_mutex_); | |
792 | snapshots_version_ = version; | |
793 | // We update the list concurrently with the readers. | |
794 | // Both new and old lists are sorted and the new list is subset of the | |
795 | // previous list plus some new items. Thus if a snapshot repeats in | |
796 | // both new and old lists, it will appear upper in the new list. So if | |
797 | // we simply insert the new snapshots in order, if an overwritten item | |
798 | // is still valid in the new list is either written to the same place in | |
799 | // the array or it is written in a higher palce before it gets | |
800 | // overwritten by another item. This guarantess a reader that reads the | |
801 | // list bottom-up will eventaully see a snapshot that repeats in the | |
802 | // update, either before it gets overwritten by the writer or | |
803 | // afterwards. | |
804 | size_t i = 0; | |
805 | auto it = snapshots.begin(); | |
806 | for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) { | |
807 | snapshot_cache_[i].store(*it, std::memory_order_release); | |
808 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i); | |
809 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); | |
810 | } | |
811 | #ifndef NDEBUG | |
812 | // Release the remaining sync points since they are useless given that the | |
813 | // reader would also use lock to access snapshots | |
814 | for (++sync_i; sync_i <= 10; ++sync_i) { | |
815 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i); | |
816 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); | |
817 | } | |
818 | #endif | |
819 | snapshots_.clear(); | |
820 | for (; it != snapshots.end(); it++) { | |
821 | // Insert them to a vector that is less efficient to access | |
822 | // concurrently | |
823 | snapshots_.push_back(*it); | |
824 | } | |
825 | // Update the size at the end. Otherwise a parallel reader might read | |
826 | // items that are not set yet. | |
827 | snapshots_total_.store(snapshots.size(), std::memory_order_release); | |
494da23a TL |
828 | |
829 | // Note: this must be done after the snapshots data structures are updated | |
830 | // with the new list of snapshots. | |
831 | CleanupReleasedSnapshots(snapshots, snapshots_all_); | |
832 | snapshots_all_ = snapshots; | |
833 | ||
11fdf7f2 TL |
834 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end"); |
835 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end"); | |
836 | } | |
837 | ||
838 | void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) { | |
839 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start"); | |
840 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start"); | |
841 | #ifndef NDEBUG | |
842 | size_t sync_i = 0; | |
843 | #endif | |
844 | // First check the snapshot cache that is efficient for concurrent access | |
845 | auto cnt = snapshots_total_.load(std::memory_order_acquire); | |
846 | // The list might get updated concurrently as we are reading from it. The | |
847 | // reader should be able to read all the snapshots that are still valid | |
848 | // after the update. Since the survived snapshots are written in a higher | |
849 | // place before gets overwritten the reader that reads bottom-up will | |
850 | // eventully see it. | |
851 | const bool next_is_larger = true; | |
494da23a TL |
852 | // We will set to true if the border line snapshot suggests that. |
853 | bool search_larger_list = false; | |
11fdf7f2 TL |
854 | size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE); |
855 | for (; 0 < ip1; ip1--) { | |
494da23a TL |
856 | SequenceNumber snapshot_seq = |
857 | snapshot_cache_[ip1 - 1].load(std::memory_order_acquire); | |
11fdf7f2 TL |
858 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", |
859 | ++sync_i); | |
860 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); | |
494da23a TL |
861 | if (ip1 == SNAPSHOT_CACHE_SIZE) { // border line snapshot |
862 | // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq | |
863 | // then later also continue the search to larger snapshots | |
864 | search_larger_list = snapshot_seq < evicted.commit_seq; | |
865 | } | |
11fdf7f2 TL |
866 | if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, |
867 | snapshot_seq, !next_is_larger)) { | |
868 | break; | |
869 | } | |
870 | } | |
871 | #ifndef NDEBUG | |
872 | // Release the remaining sync points before accquiring the lock | |
873 | for (++sync_i; sync_i <= 10; ++sync_i) { | |
874 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i); | |
875 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); | |
876 | } | |
877 | #endif | |
878 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end"); | |
879 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end"); | |
494da23a | 880 | if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) { |
11fdf7f2 TL |
881 | // Then access the less efficient list of snapshots_ |
882 | WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD); | |
494da23a TL |
883 | ROCKS_LOG_WARN(info_log_, |
884 | "snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64 | |
885 | "> with %" ROCKSDB_PRIszt " snapshots", | |
886 | evicted.prep_seq, evicted.commit_seq, cnt); | |
11fdf7f2 TL |
887 | ReadLock rl(&snapshots_mutex_); |
888 | // Items could have moved from the snapshots_ to snapshot_cache_ before | |
889 | // accquiring the lock. To make sure that we do not miss a valid snapshot, | |
890 | // read snapshot_cache_ again while holding the lock. | |
891 | for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) { | |
494da23a TL |
892 | SequenceNumber snapshot_seq = |
893 | snapshot_cache_[i].load(std::memory_order_acquire); | |
11fdf7f2 TL |
894 | if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, |
895 | snapshot_seq, next_is_larger)) { | |
896 | break; | |
897 | } | |
898 | } | |
899 | for (auto snapshot_seq_2 : snapshots_) { | |
900 | if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, | |
901 | snapshot_seq_2, next_is_larger)) { | |
902 | break; | |
903 | } | |
904 | } | |
905 | } | |
906 | } | |
907 | ||
908 | bool WritePreparedTxnDB::MaybeUpdateOldCommitMap( | |
909 | const uint64_t& prep_seq, const uint64_t& commit_seq, | |
910 | const uint64_t& snapshot_seq, const bool next_is_larger = true) { | |
911 | // If we do not store an entry in old_commit_map_ we assume it is committed in | |
912 | // all snapshots. If commit_seq <= snapshot_seq, it is considered already in | |
913 | // the snapshot so we need not to keep the entry around for this snapshot. | |
914 | if (commit_seq <= snapshot_seq) { | |
915 | // continue the search if the next snapshot could be smaller than commit_seq | |
916 | return !next_is_larger; | |
917 | } | |
918 | // then snapshot_seq < commit_seq | |
919 | if (prep_seq <= snapshot_seq) { // overlapping range | |
920 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
494da23a TL |
921 | ROCKS_LOG_WARN(info_log_, |
922 | "old_commit_map_mutex_ overhead for %" PRIu64 | |
923 | " commit entry: <%" PRIu64 ",%" PRIu64 ">", | |
924 | snapshot_seq, prep_seq, commit_seq); | |
11fdf7f2 TL |
925 | WriteLock wl(&old_commit_map_mutex_); |
926 | old_commit_map_empty_.store(false, std::memory_order_release); | |
927 | auto& vec = old_commit_map_[snapshot_seq]; | |
928 | vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq); | |
929 | // We need to store it once for each overlapping snapshot. Returning true to | |
930 | // continue the search if there is more overlapping snapshot. | |
931 | return true; | |
932 | } | |
933 | // continue the search if the next snapshot could be larger than prep_seq | |
934 | return next_is_larger; | |
935 | } | |
936 | ||
937 | WritePreparedTxnDB::~WritePreparedTxnDB() { | |
938 | // At this point there could be running compaction/flush holding a | |
939 | // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB. | |
940 | // Make sure those jobs finished before destructing WritePreparedTxnDB. | |
941 | db_impl_->CancelAllBackgroundWork(true /*wait*/); | |
942 | } | |
943 | ||
944 | void SubBatchCounter::InitWithComp(const uint32_t cf) { | |
945 | auto cmp = comparators_[cf]; | |
946 | keys_[cf] = CFKeys(SetComparator(cmp)); | |
947 | } | |
948 | ||
949 | void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) { | |
950 | CFKeys& cf_keys = keys_[cf]; | |
951 | if (cf_keys.size() == 0) { // just inserted | |
952 | InitWithComp(cf); | |
953 | } | |
954 | auto it = cf_keys.insert(key); | |
955 | if (it.second == false) { // second is false if a element already existed. | |
956 | batches_++; | |
957 | keys_.clear(); | |
958 | InitWithComp(cf); | |
959 | keys_[cf].insert(key); | |
960 | } | |
961 | } | |
962 | ||
963 | } // namespace rocksdb | |
964 | #endif // ROCKSDB_LITE |