]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
11fdf7f2 TL |
8 | #include "utilities/transactions/write_prepared_txn_db.h" |
9 | ||
11fdf7f2 | 10 | #include <algorithm> |
f67539c2 | 11 | #include <cinttypes> |
11fdf7f2 TL |
12 | #include <string> |
13 | #include <unordered_set> | |
14 | #include <vector> | |
15 | ||
f67539c2 TL |
16 | #include "db/arena_wrapped_db_iter.h" |
17 | #include "db/db_impl/db_impl.h" | |
11fdf7f2 TL |
18 | #include "rocksdb/db.h" |
19 | #include "rocksdb/options.h" | |
20 | #include "rocksdb/utilities/transaction_db.h" | |
f67539c2 | 21 | #include "test_util/sync_point.h" |
11fdf7f2 TL |
22 | #include "util/cast_util.h" |
23 | #include "util/mutexlock.h" | |
24 | #include "util/string_util.h" | |
11fdf7f2 TL |
25 | #include "utilities/transactions/pessimistic_transaction.h" |
26 | #include "utilities/transactions/transaction_db_mutex_impl.h" | |
27 | ||
f67539c2 | 28 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
29 | |
30 | Status WritePreparedTxnDB::Initialize( | |
31 | const std::vector<size_t>& compaction_enabled_cf_indices, | |
32 | const std::vector<ColumnFamilyHandle*>& handles) { | |
20effc67 | 33 | auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB()); |
11fdf7f2 TL |
34 | assert(dbimpl != nullptr); |
35 | auto rtxns = dbimpl->recovered_transactions(); | |
f67539c2 | 36 | std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt; |
11fdf7f2 TL |
37 | for (auto rtxn : rtxns) { |
38 | // There should only one batch for WritePrepared policy. | |
39 | assert(rtxn.second->batches_.size() == 1); | |
40 | const auto& seq = rtxn.second->batches_.begin()->first; | |
41 | const auto& batch_info = rtxn.second->batches_.begin()->second; | |
42 | auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1; | |
f67539c2 TL |
43 | ordered_seq_cnt[seq] = cnt; |
44 | } | |
45 | // AddPrepared must be called in order | |
46 | for (auto seq_cnt : ordered_seq_cnt) { | |
47 | auto seq = seq_cnt.first; | |
48 | auto cnt = seq_cnt.second; | |
11fdf7f2 TL |
49 | for (size_t i = 0; i < cnt; i++) { |
50 | AddPrepared(seq + i); | |
51 | } | |
52 | } | |
53 | SequenceNumber prev_max = max_evicted_seq_; | |
54 | SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); | |
55 | AdvanceMaxEvictedSeq(prev_max, last_seq); | |
494da23a TL |
56 | // Create a gap between max and the next snapshot. This simplifies the logic |
57 | // in IsInSnapshot by not having to consider the special case of max == | |
58 | // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest. | |
59 | if (last_seq) { | |
60 | db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1); | |
61 | db_impl_->versions_->SetLastSequence(last_seq + 1); | |
62 | db_impl_->versions_->SetLastPublishedSequence(last_seq + 1); | |
63 | } | |
11fdf7f2 TL |
64 | |
65 | db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this)); | |
66 | // A callback to commit a single sub-batch | |
67 | class CommitSubBatchPreReleaseCallback : public PreReleaseCallback { | |
68 | public: | |
69 | explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db) | |
70 | : db_(db) {} | |
494da23a | 71 | Status Callback(SequenceNumber commit_seq, |
f67539c2 TL |
72 | bool is_mem_disabled __attribute__((__unused__)), uint64_t, |
73 | size_t /*index*/, size_t /*total*/) override { | |
11fdf7f2 TL |
74 | assert(!is_mem_disabled); |
75 | db_->AddCommitted(commit_seq, commit_seq); | |
76 | return Status::OK(); | |
77 | } | |
78 | ||
79 | private: | |
80 | WritePreparedTxnDB* db_; | |
81 | }; | |
82 | db_impl_->SetRecoverableStatePreReleaseCallback( | |
83 | new CommitSubBatchPreReleaseCallback(this)); | |
84 | ||
85 | auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, | |
86 | handles); | |
87 | return s; | |
88 | } | |
89 | ||
90 | Status WritePreparedTxnDB::VerifyCFOptions( | |
91 | const ColumnFamilyOptions& cf_options) { | |
92 | Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options); | |
93 | if (!s.ok()) { | |
94 | return s; | |
95 | } | |
96 | if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) { | |
97 | return Status::InvalidArgument( | |
98 | "memtable_factory->CanHandleDuplicatedKey() cannot be false with " | |
99 | "WritePrpeared transactions"); | |
100 | } | |
101 | return Status::OK(); | |
102 | } | |
103 | ||
104 | Transaction* WritePreparedTxnDB::BeginTransaction( | |
105 | const WriteOptions& write_options, const TransactionOptions& txn_options, | |
106 | Transaction* old_txn) { | |
107 | if (old_txn != nullptr) { | |
108 | ReinitializeTransaction(old_txn, write_options, txn_options); | |
109 | return old_txn; | |
110 | } else { | |
111 | return new WritePreparedTxn(this, write_options, txn_options); | |
112 | } | |
113 | } | |
114 | ||
f67539c2 TL |
115 | Status WritePreparedTxnDB::Write(const WriteOptions& opts, |
116 | WriteBatch* updates) { | |
117 | if (txn_db_options_.skip_concurrency_control) { | |
118 | // Skip locking the rows | |
119 | const size_t UNKNOWN_BATCH_CNT = 0; | |
120 | WritePreparedTxn* NO_TXN = nullptr; | |
121 | return WriteInternal(opts, updates, UNKNOWN_BATCH_CNT, NO_TXN); | |
122 | } else { | |
123 | return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates); | |
124 | } | |
125 | } | |
126 | ||
11fdf7f2 TL |
127 | Status WritePreparedTxnDB::Write( |
128 | const WriteOptions& opts, | |
129 | const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { | |
130 | if (optimizations.skip_concurrency_control) { | |
131 | // Skip locking the rows | |
132 | const size_t UNKNOWN_BATCH_CNT = 0; | |
133 | const size_t ONE_BATCH_CNT = 1; | |
134 | const size_t batch_cnt = optimizations.skip_duplicate_key_check | |
135 | ? ONE_BATCH_CNT | |
136 | : UNKNOWN_BATCH_CNT; | |
137 | WritePreparedTxn* NO_TXN = nullptr; | |
138 | return WriteInternal(opts, updates, batch_cnt, NO_TXN); | |
139 | } else { | |
140 | // TODO(myabandeh): Make use of skip_duplicate_key_check hint | |
141 | // Fall back to unoptimized version | |
f67539c2 | 142 | return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates); |
11fdf7f2 TL |
143 | } |
144 | } | |
145 | ||
146 | Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, | |
147 | WriteBatch* batch, size_t batch_cnt, | |
148 | WritePreparedTxn* txn) { | |
149 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, | |
150 | "CommitBatchInternal"); | |
151 | if (batch->Count() == 0) { | |
152 | // Otherwise our 1 seq per batch logic will break since there is no seq | |
153 | // increased for this batch. | |
154 | return Status::OK(); | |
155 | } | |
156 | if (batch_cnt == 0) { // not provided, then compute it | |
157 | // TODO(myabandeh): add an option to allow user skipping this cost | |
158 | SubBatchCounter counter(*GetCFComparatorMap()); | |
159 | auto s = batch->Iterate(&counter); | |
20effc67 TL |
160 | if (!s.ok()) { |
161 | return s; | |
162 | } | |
11fdf7f2 TL |
163 | batch_cnt = counter.BatchCount(); |
164 | WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD); | |
165 | ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches", | |
166 | static_cast<uint64_t>(batch_cnt)); | |
167 | } | |
168 | assert(batch_cnt); | |
169 | ||
170 | bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; | |
171 | WriteOptions write_options(write_options_orig); | |
11fdf7f2 | 172 | // In the absence of Prepare markers, use Noop as a batch separator |
20effc67 TL |
173 | auto s = WriteBatchInternal::InsertNoop(batch); |
174 | assert(s.ok()); | |
11fdf7f2 TL |
175 | const bool DISABLE_MEMTABLE = true; |
176 | const uint64_t no_log_ref = 0; | |
177 | uint64_t seq_used = kMaxSequenceNumber; | |
178 | const size_t ZERO_PREPARES = 0; | |
494da23a | 179 | const bool kSeperatePrepareCommitBatches = true; |
11fdf7f2 TL |
180 | // Since this is not 2pc, there is no need for AddPrepared but having it in |
181 | // the PreReleaseCallback enables an optimization. Refer to | |
182 | // SmallestUnCommittedSeq for more details. | |
183 | AddPreparedCallback add_prepared_callback( | |
494da23a TL |
184 | this, db_impl_, batch_cnt, |
185 | db_impl_->immutable_db_options().two_write_queues, | |
186 | !kSeperatePrepareCommitBatches); | |
11fdf7f2 TL |
187 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( |
188 | this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt); | |
189 | PreReleaseCallback* pre_release_callback; | |
190 | if (do_one_write) { | |
191 | pre_release_callback = &update_commit_map; | |
192 | } else { | |
193 | pre_release_callback = &add_prepared_callback; | |
194 | } | |
20effc67 TL |
195 | s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, no_log_ref, |
196 | !DISABLE_MEMTABLE, &seq_used, batch_cnt, | |
197 | pre_release_callback); | |
11fdf7f2 TL |
198 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
199 | uint64_t prepare_seq = seq_used; | |
200 | if (txn != nullptr) { | |
201 | txn->SetId(prepare_seq); | |
202 | } | |
203 | if (!s.ok()) { | |
204 | return s; | |
205 | } | |
206 | if (do_one_write) { | |
207 | return s; | |
208 | } // else do the 2nd write for commit | |
11fdf7f2 TL |
209 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, |
210 | "CommitBatchInternal 2nd write prepare_seq: %" PRIu64, | |
211 | prepare_seq); | |
212 | // Commit the batch by writing an empty batch to the 2nd queue that will | |
213 | // release the commit sequence number to readers. | |
214 | const size_t ZERO_COMMITS = 0; | |
215 | WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( | |
216 | this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS); | |
217 | WriteBatch empty_batch; | |
f67539c2 TL |
218 | write_options.disableWAL = true; |
219 | write_options.sync = false; | |
220 | const size_t ONE_BATCH = 1; // Just to inc the seq | |
11fdf7f2 TL |
221 | s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr, |
222 | no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
223 | &update_commit_map_with_prepare); | |
224 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
f67539c2 | 225 | // Note: RemovePrepared is called from within PreReleaseCallback |
11fdf7f2 TL |
226 | return s; |
227 | } | |
228 | ||
229 | Status WritePreparedTxnDB::Get(const ReadOptions& options, | |
230 | ColumnFamilyHandle* column_family, | |
231 | const Slice& key, PinnableSlice* value) { | |
f67539c2 TL |
232 | SequenceNumber min_uncommitted, snap_seq; |
233 | const SnapshotBackup backed_by_snapshot = | |
234 | AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); | |
235 | WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted, | |
236 | backed_by_snapshot); | |
237 | bool* dont_care = nullptr; | |
238 | DBImpl::GetImplOptions get_impl_options; | |
239 | get_impl_options.column_family = column_family; | |
240 | get_impl_options.value = value; | |
241 | get_impl_options.value_found = dont_care; | |
242 | get_impl_options.callback = &callback; | |
243 | auto res = db_impl_->GetImpl(options, key, get_impl_options); | |
244 | if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(), | |
245 | backed_by_snapshot))) { | |
246 | return res; | |
11fdf7f2 | 247 | } else { |
f67539c2 TL |
248 | WPRecordTick(TXN_GET_TRY_AGAIN); |
249 | return Status::TryAgain(); | |
11fdf7f2 | 250 | } |
11fdf7f2 TL |
251 | } |
252 | ||
253 | void WritePreparedTxnDB::UpdateCFComparatorMap( | |
254 | const std::vector<ColumnFamilyHandle*>& handles) { | |
255 | auto cf_map = new std::map<uint32_t, const Comparator*>(); | |
256 | auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>(); | |
257 | for (auto h : handles) { | |
258 | auto id = h->GetID(); | |
259 | const Comparator* comparator = h->GetComparator(); | |
260 | (*cf_map)[id] = comparator; | |
261 | if (id != 0) { | |
262 | (*handle_map)[id] = h; | |
263 | } else { | |
264 | // The pointer to the default cf handle in the handles will be deleted. | |
265 | // Use the pointer maintained by the db instead. | |
266 | (*handle_map)[id] = DefaultColumnFamily(); | |
267 | } | |
268 | } | |
269 | cf_map_.reset(cf_map); | |
270 | handle_map_.reset(handle_map); | |
271 | } | |
272 | ||
273 | void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) { | |
274 | auto old_cf_map_ptr = cf_map_.get(); | |
275 | assert(old_cf_map_ptr); | |
276 | auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr); | |
277 | auto old_handle_map_ptr = handle_map_.get(); | |
278 | assert(old_handle_map_ptr); | |
279 | auto handle_map = | |
280 | new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr); | |
281 | auto id = h->GetID(); | |
282 | const Comparator* comparator = h->GetComparator(); | |
283 | (*cf_map)[id] = comparator; | |
284 | (*handle_map)[id] = h; | |
285 | cf_map_.reset(cf_map); | |
286 | handle_map_.reset(handle_map); | |
287 | } | |
288 | ||
289 | ||
290 | std::vector<Status> WritePreparedTxnDB::MultiGet( | |
291 | const ReadOptions& options, | |
292 | const std::vector<ColumnFamilyHandle*>& column_family, | |
293 | const std::vector<Slice>& keys, std::vector<std::string>* values) { | |
294 | assert(values); | |
295 | size_t num_keys = keys.size(); | |
296 | values->resize(num_keys); | |
297 | ||
298 | std::vector<Status> stat_list(num_keys); | |
299 | for (size_t i = 0; i < num_keys; ++i) { | |
20effc67 | 300 | stat_list[i] = this->Get(options, column_family[i], keys[i], &(*values)[i]); |
11fdf7f2 TL |
301 | } |
302 | return stat_list; | |
303 | } | |
304 | ||
305 | // Struct to hold ownership of snapshot and read callback for iterator cleanup. | |
306 | struct WritePreparedTxnDB::IteratorState { | |
307 | IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, | |
308 | std::shared_ptr<ManagedSnapshot> s, | |
309 | SequenceNumber min_uncommitted) | |
f67539c2 TL |
310 | : callback(txn_db, sequence, min_uncommitted, kBackedByDBSnapshot), |
311 | snapshot(s) {} | |
11fdf7f2 TL |
312 | |
313 | WritePreparedTxnReadCallback callback; | |
314 | std::shared_ptr<ManagedSnapshot> snapshot; | |
315 | }; | |
316 | ||
317 | namespace { | |
318 | static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) { | |
319 | delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1); | |
320 | } | |
321 | } // anonymous namespace | |
322 | ||
323 | Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, | |
324 | ColumnFamilyHandle* column_family) { | |
325 | constexpr bool ALLOW_BLOB = true; | |
326 | constexpr bool ALLOW_REFRESH = true; | |
327 | std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; | |
328 | SequenceNumber snapshot_seq = kMaxSequenceNumber; | |
329 | SequenceNumber min_uncommitted = 0; | |
330 | if (options.snapshot != nullptr) { | |
331 | snapshot_seq = options.snapshot->GetSequenceNumber(); | |
332 | min_uncommitted = | |
20effc67 | 333 | static_cast_with_check<const SnapshotImpl>(options.snapshot) |
11fdf7f2 TL |
334 | ->min_uncommitted_; |
335 | } else { | |
336 | auto* snapshot = GetSnapshot(); | |
337 | // We take a snapshot to make sure that the related data in the commit map | |
338 | // are not deleted. | |
339 | snapshot_seq = snapshot->GetSequenceNumber(); | |
340 | min_uncommitted = | |
20effc67 | 341 | static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_; |
11fdf7f2 TL |
342 | own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); |
343 | } | |
344 | assert(snapshot_seq != kMaxSequenceNumber); | |
20effc67 TL |
345 | auto* cfd = |
346 | static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd(); | |
11fdf7f2 TL |
347 | auto* state = |
348 | new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted); | |
349 | auto* db_iter = | |
350 | db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, | |
351 | !ALLOW_BLOB, !ALLOW_REFRESH); | |
352 | db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); | |
353 | return db_iter; | |
354 | } | |
355 | ||
356 | Status WritePreparedTxnDB::NewIterators( | |
357 | const ReadOptions& options, | |
358 | const std::vector<ColumnFamilyHandle*>& column_families, | |
359 | std::vector<Iterator*>* iterators) { | |
360 | constexpr bool ALLOW_BLOB = true; | |
361 | constexpr bool ALLOW_REFRESH = true; | |
362 | std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; | |
363 | SequenceNumber snapshot_seq = kMaxSequenceNumber; | |
364 | SequenceNumber min_uncommitted = 0; | |
365 | if (options.snapshot != nullptr) { | |
366 | snapshot_seq = options.snapshot->GetSequenceNumber(); | |
20effc67 TL |
367 | min_uncommitted = |
368 | static_cast_with_check<const SnapshotImpl>(options.snapshot) | |
369 | ->min_uncommitted_; | |
11fdf7f2 TL |
370 | } else { |
371 | auto* snapshot = GetSnapshot(); | |
372 | // We take a snapshot to make sure that the related data in the commit map | |
373 | // are not deleted. | |
374 | snapshot_seq = snapshot->GetSequenceNumber(); | |
375 | own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); | |
376 | min_uncommitted = | |
20effc67 | 377 | static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_; |
11fdf7f2 TL |
378 | } |
379 | iterators->clear(); | |
380 | iterators->reserve(column_families.size()); | |
381 | for (auto* column_family : column_families) { | |
20effc67 TL |
382 | auto* cfd = |
383 | static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd(); | |
11fdf7f2 TL |
384 | auto* state = |
385 | new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted); | |
386 | auto* db_iter = | |
387 | db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, | |
388 | !ALLOW_BLOB, !ALLOW_REFRESH); | |
389 | db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); | |
390 | iterators->push_back(db_iter); | |
391 | } | |
392 | return Status::OK(); | |
393 | } | |
394 | ||
395 | void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { | |
396 | // Adcance max_evicted_seq_ no more than 100 times before the cache wraps | |
397 | // around. | |
398 | INC_STEP_FOR_MAX_EVICTED = | |
399 | std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1)); | |
494da23a | 400 | snapshot_cache_ = std::unique_ptr<std::atomic<SequenceNumber>[]>( |
11fdf7f2 | 401 | new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {}); |
494da23a | 402 | commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>( |
11fdf7f2 | 403 | new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {}); |
f67539c2 | 404 | dummy_max_snapshot_.number_ = kMaxSequenceNumber; |
11fdf7f2 TL |
405 | } |
406 | ||
f67539c2 TL |
407 | void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max, |
408 | bool locked) { | |
494da23a TL |
409 | // When max_evicted_seq_ advances, move older entries from prepared_txns_ |
410 | // to delayed_prepared_. This guarantees that if a seq is lower than max, | |
411 | // then it is not in prepared_txns_ and save an expensive, synchronized | |
412 | // lookup from a shared set. delayed_prepared_ is expected to be empty in | |
413 | // normal cases. | |
414 | ROCKS_LOG_DETAILS( | |
415 | info_log_, | |
416 | "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64, | |
417 | prepared_txns_.empty(), | |
418 | prepared_txns_.empty() ? 0 : prepared_txns_.top()); | |
f67539c2 TL |
419 | const SequenceNumber prepared_top = prepared_txns_.top(); |
420 | const bool empty = prepared_top == kMaxSequenceNumber; | |
421 | // Preliminary check to avoid the synchronization cost | |
422 | if (!empty && prepared_top <= new_max) { | |
423 | if (locked) { | |
424 | // Needed to avoid double locking in pop(). | |
425 | prepared_txns_.push_pop_mutex()->Unlock(); | |
426 | } | |
427 | WriteLock wl(&prepared_mutex_); | |
428 | // Need to fetch fresh values of ::top after mutex is acquired | |
429 | while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { | |
430 | auto to_be_popped = prepared_txns_.top(); | |
431 | delayed_prepared_.insert(to_be_popped); | |
432 | ROCKS_LOG_WARN(info_log_, | |
433 | "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64 | |
434 | " new_max=%" PRIu64, | |
435 | static_cast<uint64_t>(delayed_prepared_.size()), | |
436 | to_be_popped, new_max); | |
437 | delayed_prepared_empty_.store(false, std::memory_order_release); | |
438 | // Update prepared_txns_ after updating delayed_prepared_empty_ otherwise | |
439 | // there will be a point in time that the entry is neither in | |
440 | // prepared_txns_ nor in delayed_prepared_, which will not be checked if | |
441 | // delayed_prepared_empty_ is false. | |
442 | prepared_txns_.pop(); | |
443 | } | |
444 | if (locked) { | |
445 | prepared_txns_.push_pop_mutex()->Lock(); | |
446 | } | |
11fdf7f2 | 447 | } |
494da23a TL |
448 | } |
449 | ||
f67539c2 | 450 | void WritePreparedTxnDB::AddPrepared(uint64_t seq, bool locked) { |
494da23a TL |
451 | ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64, |
452 | seq, max_evicted_seq_.load()); | |
453 | TEST_SYNC_POINT("AddPrepared::begin:pause"); | |
454 | TEST_SYNC_POINT("AddPrepared::begin:resume"); | |
f67539c2 TL |
455 | if (!locked) { |
456 | prepared_txns_.push_pop_mutex()->Lock(); | |
457 | } | |
458 | prepared_txns_.push_pop_mutex()->AssertHeld(); | |
11fdf7f2 | 459 | prepared_txns_.push(seq); |
494da23a TL |
460 | auto new_max = future_max_evicted_seq_.load(); |
461 | if (UNLIKELY(seq <= new_max)) { | |
462 | // This should not happen in normal case | |
463 | ROCKS_LOG_ERROR( | |
464 | info_log_, | |
465 | "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64 | |
466 | " <= %" PRIu64, | |
467 | seq, new_max); | |
f67539c2 TL |
468 | CheckPreparedAgainstMax(new_max, true /*locked*/); |
469 | } | |
470 | if (!locked) { | |
471 | prepared_txns_.push_pop_mutex()->Unlock(); | |
494da23a TL |
472 | } |
473 | TEST_SYNC_POINT("AddPrepared::end"); | |
11fdf7f2 TL |
474 | } |
475 | ||
476 | void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, | |
477 | uint8_t loop_cnt) { | |
478 | ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, | |
479 | prepare_seq, commit_seq); | |
480 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start"); | |
481 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause"); | |
482 | auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; | |
483 | CommitEntry64b evicted_64b; | |
484 | CommitEntry evicted; | |
485 | bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted); | |
486 | if (LIKELY(to_be_evicted)) { | |
487 | assert(evicted.prep_seq != prepare_seq); | |
488 | auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); | |
489 | ROCKS_LOG_DETAILS(info_log_, | |
490 | "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64, | |
491 | evicted.prep_seq, evicted.commit_seq, prev_max); | |
492 | if (prev_max < evicted.commit_seq) { | |
494da23a TL |
493 | auto last = db_impl_->GetLastPublishedSequence(); // could be 0 |
494 | SequenceNumber max_evicted_seq; | |
495 | if (LIKELY(evicted.commit_seq < last)) { | |
496 | assert(last > 0); | |
497 | // Inc max in larger steps to avoid frequent updates | |
498 | max_evicted_seq = | |
499 | std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1); | |
500 | } else { | |
501 | // legit when a commit entry in a write batch overwrite the previous one | |
502 | max_evicted_seq = evicted.commit_seq; | |
503 | } | |
504 | ROCKS_LOG_DETAILS(info_log_, | |
505 | "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64 | |
506 | " => %lu", | |
507 | prepare_seq, evicted.prep_seq, evicted.commit_seq, | |
508 | prev_max, max_evicted_seq); | |
11fdf7f2 TL |
509 | AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); |
510 | } | |
511 | // After each eviction from commit cache, check if the commit entry should | |
512 | // be kept around because it overlaps with a live snapshot. | |
513 | CheckAgainstSnapshots(evicted); | |
494da23a TL |
514 | if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) { |
515 | WriteLock wl(&prepared_mutex_); | |
516 | for (auto dp : delayed_prepared_) { | |
517 | if (dp == evicted.prep_seq) { | |
518 | // This is a rare case that txn is committed but prepared_txns_ is not | |
519 | // cleaned up yet. Refer to delayed_prepared_commits_ definition for | |
520 | // why it should be kept updated. | |
521 | delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq; | |
522 | ROCKS_LOG_DEBUG(info_log_, | |
523 | "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64, | |
524 | evicted.prep_seq, evicted.commit_seq); | |
525 | break; | |
526 | } | |
527 | } | |
528 | } | |
11fdf7f2 TL |
529 | } |
530 | bool succ = | |
531 | ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq}); | |
532 | if (UNLIKELY(!succ)) { | |
533 | ROCKS_LOG_ERROR(info_log_, | |
534 | "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64 | |
535 | ",%" PRIu64 " retrying...", | |
536 | indexed_seq, prepare_seq, commit_seq); | |
537 | // A very rare event, in which the commit entry is updated before we do. | |
538 | // Here we apply a very simple solution of retrying. | |
539 | if (loop_cnt > 100) { | |
540 | throw std::runtime_error("Infinite loop in AddCommitted!"); | |
541 | } | |
542 | AddCommitted(prepare_seq, commit_seq, ++loop_cnt); | |
543 | return; | |
544 | } | |
545 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end"); | |
546 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause"); | |
547 | } | |
548 | ||
549 | void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq, | |
550 | const size_t batch_cnt) { | |
494da23a TL |
551 | TEST_SYNC_POINT_CALLBACK( |
552 | "RemovePrepared:Start", | |
553 | const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq))); | |
554 | TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause"); | |
555 | TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume"); | |
556 | ROCKS_LOG_DETAILS(info_log_, | |
557 | "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt, | |
558 | prepare_seq, batch_cnt); | |
11fdf7f2 TL |
559 | WriteLock wl(&prepared_mutex_); |
560 | for (size_t i = 0; i < batch_cnt; i++) { | |
561 | prepared_txns_.erase(prepare_seq + i); | |
562 | bool was_empty = delayed_prepared_.empty(); | |
563 | if (!was_empty) { | |
564 | delayed_prepared_.erase(prepare_seq + i); | |
494da23a TL |
565 | auto it = delayed_prepared_commits_.find(prepare_seq + i); |
566 | if (it != delayed_prepared_commits_.end()) { | |
567 | ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64, | |
568 | prepare_seq + i); | |
569 | delayed_prepared_commits_.erase(it); | |
570 | } | |
11fdf7f2 TL |
571 | bool is_empty = delayed_prepared_.empty(); |
572 | if (was_empty != is_empty) { | |
573 | delayed_prepared_empty_.store(is_empty, std::memory_order_release); | |
574 | } | |
575 | } | |
576 | } | |
577 | } | |
578 | ||
579 | bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, | |
580 | CommitEntry64b* entry_64b, | |
581 | CommitEntry* entry) const { | |
582 | *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(std::memory_order_acquire); | |
583 | bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT); | |
584 | return valid; | |
585 | } | |
586 | ||
587 | bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq, | |
588 | const CommitEntry& new_entry, | |
589 | CommitEntry* evicted_entry) { | |
590 | CommitEntry64b new_entry_64b(new_entry, FORMAT); | |
591 | CommitEntry64b evicted_entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].exchange( | |
592 | new_entry_64b, std::memory_order_acq_rel); | |
593 | bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT); | |
594 | return valid; | |
595 | } | |
596 | ||
597 | bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, | |
598 | CommitEntry64b& expected_entry_64b, | |
599 | const CommitEntry& new_entry) { | |
600 | auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)]; | |
601 | CommitEntry64b new_entry_64b(new_entry, FORMAT); | |
602 | bool succ = atomic_entry.compare_exchange_strong( | |
603 | expected_entry_64b, new_entry_64b, std::memory_order_acq_rel, | |
604 | std::memory_order_acquire); | |
605 | return succ; | |
606 | } | |
607 | ||
608 | void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, | |
609 | const SequenceNumber& new_max) { | |
610 | ROCKS_LOG_DETAILS(info_log_, | |
611 | "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64, | |
612 | prev_max, new_max); | |
494da23a TL |
613 | // Declare the intention before getting snapshot from the DB. This helps a |
614 | // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if | |
615 | // it has not already. Otherwise the new snapshot is when we ask DB for | |
616 | // snapshots smaller than future max. | |
617 | auto updated_future_max = prev_max; | |
618 | while (updated_future_max < new_max && | |
619 | !future_max_evicted_seq_.compare_exchange_weak( | |
620 | updated_future_max, new_max, std::memory_order_acq_rel, | |
621 | std::memory_order_relaxed)) { | |
622 | }; | |
623 | ||
f67539c2 | 624 | CheckPreparedAgainstMax(new_max, false /*locked*/); |
11fdf7f2 TL |
625 | |
626 | // With each change to max_evicted_seq_ fetch the live snapshots behind it. | |
627 | // We use max as the version of snapshots to identify how fresh are the | |
628 | // snapshot list. This works because the snapshots are between 0 and | |
629 | // max, so the larger the max, the more complete they are. | |
630 | SequenceNumber new_snapshots_version = new_max; | |
631 | std::vector<SequenceNumber> snapshots; | |
632 | bool update_snapshots = false; | |
633 | if (new_snapshots_version > snapshots_version_) { | |
634 | // This is to avoid updating the snapshots_ if it already updated | |
635 | // with a more recent vesion by a concrrent thread | |
636 | update_snapshots = true; | |
637 | // We only care about snapshots lower then max | |
638 | snapshots = GetSnapshotListFromDB(new_max); | |
639 | } | |
640 | if (update_snapshots) { | |
641 | UpdateSnapshots(snapshots, new_snapshots_version); | |
494da23a TL |
642 | if (!snapshots.empty()) { |
643 | WriteLock wl(&old_commit_map_mutex_); | |
644 | for (auto snap : snapshots) { | |
645 | // This allows IsInSnapshot to tell apart the reads from in valid | |
646 | // snapshots from the reads from committed values in valid snapshots. | |
647 | old_commit_map_[snap]; | |
648 | } | |
649 | old_commit_map_empty_.store(false, std::memory_order_release); | |
650 | } | |
11fdf7f2 TL |
651 | } |
652 | auto updated_prev_max = prev_max; | |
494da23a TL |
653 | TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause"); |
654 | TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume"); | |
11fdf7f2 TL |
655 | while (updated_prev_max < new_max && |
656 | !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max, | |
657 | std::memory_order_acq_rel, | |
658 | std::memory_order_relaxed)) { | |
659 | }; | |
660 | } | |
661 | ||
662 | const Snapshot* WritePreparedTxnDB::GetSnapshot() { | |
494da23a TL |
663 | const bool kForWWConflictCheck = true; |
664 | return GetSnapshotInternal(!kForWWConflictCheck); | |
665 | } | |
666 | ||
667 | SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal( | |
668 | bool for_ww_conflict_check) { | |
669 | // Note: for this optimization setting the last sequence number and obtaining | |
670 | // the smallest uncommitted seq should be done atomically. However to avoid | |
671 | // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the | |
672 | // snapshot. Since we always updated the list of unprepared seq (via | |
673 | // AddPrepared) AFTER the last sequence is updated, this guarantees that the | |
674 | // smallest uncommitted seq that we pair with the snapshot is smaller or equal | |
675 | // the value that would be obtained otherwise atomically. That is ok since | |
676 | // this optimization works as long as min_uncommitted is less than or equal | |
677 | // than the smallest uncommitted seq when the snapshot was taken. | |
11fdf7f2 | 678 | auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq(); |
494da23a | 679 | SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check); |
f67539c2 | 680 | TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:first"); |
11fdf7f2 | 681 | assert(snap_impl); |
494da23a TL |
682 | SequenceNumber snap_seq = snap_impl->GetSequenceNumber(); |
683 | // Note: Check against future_max_evicted_seq_ (in contrast with | |
684 | // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq. | |
685 | if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) { | |
686 | // There is a very rare case in which the commit entry evicts another commit | |
687 | // entry that is not published yet thus advancing max evicted seq beyond the | |
688 | // last published seq. This case is not likely in real-world setup so we | |
689 | // handle it with a few retries. | |
690 | size_t retry = 0; | |
691 | SequenceNumber max; | |
692 | while ((max = future_max_evicted_seq_.load()) != 0 && | |
693 | snap_impl->GetSequenceNumber() <= max && retry < 100) { | |
694 | ROCKS_LOG_WARN(info_log_, | |
695 | "GetSnapshot snap: %" PRIu64 " max: %" PRIu64 | |
696 | " retry %" ROCKSDB_PRIszt, | |
697 | snap_impl->GetSequenceNumber(), max, retry); | |
698 | ReleaseSnapshot(snap_impl); | |
699 | // Wait for last visible seq to catch up with max, and also go beyond it | |
700 | // by one. | |
701 | AdvanceSeqByOne(); | |
702 | snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check); | |
703 | assert(snap_impl); | |
704 | retry++; | |
705 | } | |
706 | assert(snap_impl->GetSequenceNumber() > max); | |
707 | if (snap_impl->GetSequenceNumber() <= max) { | |
708 | throw std::runtime_error( | |
709 | "Snapshot seq " + ToString(snap_impl->GetSequenceNumber()) + | |
710 | " after " + ToString(retry) + | |
711 | " retries is still less than futre_max_evicted_seq_" + ToString(max)); | |
712 | } | |
713 | } | |
11fdf7f2 | 714 | EnhanceSnapshot(snap_impl, min_uncommitted); |
494da23a TL |
715 | ROCKS_LOG_DETAILS( |
716 | db_impl_->immutable_db_options().info_log, | |
717 | "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64, | |
718 | snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted); | |
f67539c2 | 719 | TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:end"); |
11fdf7f2 TL |
720 | return snap_impl; |
721 | } | |
722 | ||
494da23a TL |
723 | void WritePreparedTxnDB::AdvanceSeqByOne() { |
724 | // Inserting an empty value will i) let the max evicted entry to be | |
725 | // published, i.e., max == last_published, increase the last published to | |
726 | // be one beyond max, i.e., max < last_published. | |
727 | WriteOptions woptions; | |
728 | TransactionOptions txn_options; | |
729 | Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr); | |
730 | std::hash<std::thread::id> hasher; | |
731 | char name[64]; | |
732 | snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id())); | |
733 | assert(strlen(name) < 64 - 1); | |
734 | Status s = txn0->SetName(name); | |
735 | assert(s.ok()); | |
736 | if (s.ok()) { | |
737 | // Without prepare it would simply skip the commit | |
738 | s = txn0->Prepare(); | |
739 | } | |
740 | assert(s.ok()); | |
741 | if (s.ok()) { | |
742 | s = txn0->Commit(); | |
743 | } | |
744 | assert(s.ok()); | |
745 | delete txn0; | |
746 | } | |
747 | ||
11fdf7f2 TL |
748 | const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB( |
749 | SequenceNumber max) { | |
750 | ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max); | |
494da23a TL |
751 | InstrumentedMutexLock dblock(db_impl_->mutex()); |
752 | db_impl_->mutex()->AssertHeld(); | |
11fdf7f2 TL |
753 | return db_impl_->snapshots().GetAll(nullptr, max); |
754 | } | |
755 | ||
11fdf7f2 TL |
756 | void WritePreparedTxnDB::ReleaseSnapshotInternal( |
757 | const SequenceNumber snap_seq) { | |
494da23a TL |
758 | // TODO(myabandeh): relax should enough since the synchronizatin is already |
759 | // done by snapshots_mutex_ under which this function is called. | |
760 | if (snap_seq <= max_evicted_seq_.load(std::memory_order_acquire)) { | |
11fdf7f2 TL |
761 | // Then this is a rare case that transaction did not finish before max |
762 | // advances. It is expected for a few read-only backup snapshots. For such | |
763 | // snapshots we might have kept around a couple of entries in the | |
764 | // old_commit_map_. Check and do garbage collection if that is the case. | |
765 | bool need_gc = false; | |
766 | { | |
767 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
494da23a TL |
768 | ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64, |
769 | snap_seq); | |
11fdf7f2 TL |
770 | ReadLock rl(&old_commit_map_mutex_); |
771 | auto prep_set_entry = old_commit_map_.find(snap_seq); | |
772 | need_gc = prep_set_entry != old_commit_map_.end(); | |
773 | } | |
774 | if (need_gc) { | |
775 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
494da23a TL |
776 | ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64, |
777 | snap_seq); | |
11fdf7f2 TL |
778 | WriteLock wl(&old_commit_map_mutex_); |
779 | old_commit_map_.erase(snap_seq); | |
780 | old_commit_map_empty_.store(old_commit_map_.empty(), | |
781 | std::memory_order_release); | |
782 | } | |
783 | } | |
784 | } | |
785 | ||
494da23a TL |
786 | void WritePreparedTxnDB::CleanupReleasedSnapshots( |
787 | const std::vector<SequenceNumber>& new_snapshots, | |
788 | const std::vector<SequenceNumber>& old_snapshots) { | |
789 | auto newi = new_snapshots.begin(); | |
790 | auto oldi = old_snapshots.begin(); | |
791 | for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) { | |
792 | assert(*newi >= *oldi); // cannot have new snapshots with lower seq | |
793 | if (*newi == *oldi) { // still not released | |
794 | auto value = *newi; | |
795 | while (newi != new_snapshots.end() && *newi == value) { | |
796 | newi++; | |
797 | } | |
798 | while (oldi != old_snapshots.end() && *oldi == value) { | |
799 | oldi++; | |
800 | } | |
801 | } else { | |
802 | assert(*newi > *oldi); // *oldi is released | |
803 | ReleaseSnapshotInternal(*oldi); | |
804 | oldi++; | |
805 | } | |
806 | } | |
807 | // Everything remained in old_snapshots is released and must be cleaned up | |
808 | for (; oldi != old_snapshots.end(); oldi++) { | |
809 | ReleaseSnapshotInternal(*oldi); | |
810 | } | |
811 | } | |
812 | ||
11fdf7f2 TL |
813 | void WritePreparedTxnDB::UpdateSnapshots( |
814 | const std::vector<SequenceNumber>& snapshots, | |
815 | const SequenceNumber& version) { | |
816 | ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64, | |
817 | version); | |
818 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start"); | |
819 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start"); | |
820 | #ifndef NDEBUG | |
821 | size_t sync_i = 0; | |
822 | #endif | |
823 | ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead"); | |
824 | WriteLock wl(&snapshots_mutex_); | |
825 | snapshots_version_ = version; | |
826 | // We update the list concurrently with the readers. | |
827 | // Both new and old lists are sorted and the new list is subset of the | |
828 | // previous list plus some new items. Thus if a snapshot repeats in | |
829 | // both new and old lists, it will appear upper in the new list. So if | |
830 | // we simply insert the new snapshots in order, if an overwritten item | |
831 | // is still valid in the new list is either written to the same place in | |
832 | // the array or it is written in a higher palce before it gets | |
833 | // overwritten by another item. This guarantess a reader that reads the | |
834 | // list bottom-up will eventaully see a snapshot that repeats in the | |
835 | // update, either before it gets overwritten by the writer or | |
836 | // afterwards. | |
837 | size_t i = 0; | |
838 | auto it = snapshots.begin(); | |
f67539c2 | 839 | for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; ++it, ++i) { |
11fdf7f2 TL |
840 | snapshot_cache_[i].store(*it, std::memory_order_release); |
841 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i); | |
842 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); | |
843 | } | |
844 | #ifndef NDEBUG | |
845 | // Release the remaining sync points since they are useless given that the | |
846 | // reader would also use lock to access snapshots | |
847 | for (++sync_i; sync_i <= 10; ++sync_i) { | |
848 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i); | |
849 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); | |
850 | } | |
851 | #endif | |
852 | snapshots_.clear(); | |
f67539c2 | 853 | for (; it != snapshots.end(); ++it) { |
11fdf7f2 TL |
854 | // Insert them to a vector that is less efficient to access |
855 | // concurrently | |
856 | snapshots_.push_back(*it); | |
857 | } | |
858 | // Update the size at the end. Otherwise a parallel reader might read | |
859 | // items that are not set yet. | |
860 | snapshots_total_.store(snapshots.size(), std::memory_order_release); | |
494da23a TL |
861 | |
862 | // Note: this must be done after the snapshots data structures are updated | |
863 | // with the new list of snapshots. | |
864 | CleanupReleasedSnapshots(snapshots, snapshots_all_); | |
865 | snapshots_all_ = snapshots; | |
866 | ||
11fdf7f2 TL |
867 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end"); |
868 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end"); | |
869 | } | |
870 | ||
871 | void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) { | |
872 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start"); | |
873 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start"); | |
874 | #ifndef NDEBUG | |
875 | size_t sync_i = 0; | |
876 | #endif | |
877 | // First check the snapshot cache that is efficient for concurrent access | |
878 | auto cnt = snapshots_total_.load(std::memory_order_acquire); | |
879 | // The list might get updated concurrently as we are reading from it. The | |
880 | // reader should be able to read all the snapshots that are still valid | |
881 | // after the update. Since the survived snapshots are written in a higher | |
882 | // place before gets overwritten the reader that reads bottom-up will | |
883 | // eventully see it. | |
884 | const bool next_is_larger = true; | |
494da23a TL |
885 | // We will set to true if the border line snapshot suggests that. |
886 | bool search_larger_list = false; | |
11fdf7f2 TL |
887 | size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE); |
888 | for (; 0 < ip1; ip1--) { | |
494da23a TL |
889 | SequenceNumber snapshot_seq = |
890 | snapshot_cache_[ip1 - 1].load(std::memory_order_acquire); | |
11fdf7f2 TL |
891 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", |
892 | ++sync_i); | |
893 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); | |
494da23a TL |
894 | if (ip1 == SNAPSHOT_CACHE_SIZE) { // border line snapshot |
895 | // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq | |
896 | // then later also continue the search to larger snapshots | |
897 | search_larger_list = snapshot_seq < evicted.commit_seq; | |
898 | } | |
11fdf7f2 TL |
899 | if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, |
900 | snapshot_seq, !next_is_larger)) { | |
901 | break; | |
902 | } | |
903 | } | |
904 | #ifndef NDEBUG | |
905 | // Release the remaining sync points before accquiring the lock | |
906 | for (++sync_i; sync_i <= 10; ++sync_i) { | |
907 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i); | |
908 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); | |
909 | } | |
910 | #endif | |
911 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end"); | |
912 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end"); | |
494da23a | 913 | if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) { |
11fdf7f2 TL |
914 | // Then access the less efficient list of snapshots_ |
915 | WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD); | |
494da23a TL |
916 | ROCKS_LOG_WARN(info_log_, |
917 | "snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64 | |
918 | "> with %" ROCKSDB_PRIszt " snapshots", | |
919 | evicted.prep_seq, evicted.commit_seq, cnt); | |
11fdf7f2 TL |
920 | ReadLock rl(&snapshots_mutex_); |
921 | // Items could have moved from the snapshots_ to snapshot_cache_ before | |
922 | // accquiring the lock. To make sure that we do not miss a valid snapshot, | |
923 | // read snapshot_cache_ again while holding the lock. | |
924 | for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) { | |
494da23a TL |
925 | SequenceNumber snapshot_seq = |
926 | snapshot_cache_[i].load(std::memory_order_acquire); | |
11fdf7f2 TL |
927 | if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, |
928 | snapshot_seq, next_is_larger)) { | |
929 | break; | |
930 | } | |
931 | } | |
932 | for (auto snapshot_seq_2 : snapshots_) { | |
933 | if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, | |
934 | snapshot_seq_2, next_is_larger)) { | |
935 | break; | |
936 | } | |
937 | } | |
938 | } | |
939 | } | |
940 | ||
941 | bool WritePreparedTxnDB::MaybeUpdateOldCommitMap( | |
942 | const uint64_t& prep_seq, const uint64_t& commit_seq, | |
943 | const uint64_t& snapshot_seq, const bool next_is_larger = true) { | |
944 | // If we do not store an entry in old_commit_map_ we assume it is committed in | |
945 | // all snapshots. If commit_seq <= snapshot_seq, it is considered already in | |
946 | // the snapshot so we need not to keep the entry around for this snapshot. | |
947 | if (commit_seq <= snapshot_seq) { | |
948 | // continue the search if the next snapshot could be smaller than commit_seq | |
949 | return !next_is_larger; | |
950 | } | |
951 | // then snapshot_seq < commit_seq | |
952 | if (prep_seq <= snapshot_seq) { // overlapping range | |
953 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
494da23a TL |
954 | ROCKS_LOG_WARN(info_log_, |
955 | "old_commit_map_mutex_ overhead for %" PRIu64 | |
956 | " commit entry: <%" PRIu64 ",%" PRIu64 ">", | |
957 | snapshot_seq, prep_seq, commit_seq); | |
11fdf7f2 TL |
958 | WriteLock wl(&old_commit_map_mutex_); |
959 | old_commit_map_empty_.store(false, std::memory_order_release); | |
960 | auto& vec = old_commit_map_[snapshot_seq]; | |
961 | vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq); | |
962 | // We need to store it once for each overlapping snapshot. Returning true to | |
963 | // continue the search if there is more overlapping snapshot. | |
964 | return true; | |
965 | } | |
966 | // continue the search if the next snapshot could be larger than prep_seq | |
967 | return next_is_larger; | |
968 | } | |
969 | ||
970 | WritePreparedTxnDB::~WritePreparedTxnDB() { | |
971 | // At this point there could be running compaction/flush holding a | |
972 | // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB. | |
973 | // Make sure those jobs finished before destructing WritePreparedTxnDB. | |
f67539c2 TL |
974 | if (!db_impl_->shutting_down_) { |
975 | db_impl_->CancelAllBackgroundWork(true /*wait*/); | |
976 | } | |
11fdf7f2 TL |
977 | } |
978 | ||
979 | void SubBatchCounter::InitWithComp(const uint32_t cf) { | |
980 | auto cmp = comparators_[cf]; | |
981 | keys_[cf] = CFKeys(SetComparator(cmp)); | |
982 | } | |
983 | ||
984 | void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) { | |
985 | CFKeys& cf_keys = keys_[cf]; | |
986 | if (cf_keys.size() == 0) { // just inserted | |
987 | InitWithComp(cf); | |
988 | } | |
989 | auto it = cf_keys.insert(key); | |
990 | if (it.second == false) { // second is false if a element already existed. | |
991 | batches_++; | |
992 | keys_.clear(); | |
993 | InitWithComp(cf); | |
994 | keys_[cf].insert(key); | |
995 | } | |
996 | } | |
997 | ||
f67539c2 | 998 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 | 999 | #endif // ROCKSDB_LITE |