]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #ifndef __STDC_FORMAT_MACROS | |
9 | #define __STDC_FORMAT_MACROS | |
10 | #endif | |
11 | ||
12 | #include "utilities/transactions/write_prepared_txn_db.h" | |
13 | ||
14 | #include <inttypes.h> | |
15 | #include <algorithm> | |
16 | #include <string> | |
17 | #include <unordered_set> | |
18 | #include <vector> | |
19 | ||
20 | #include "db/db_impl.h" | |
21 | #include "rocksdb/db.h" | |
22 | #include "rocksdb/options.h" | |
23 | #include "rocksdb/utilities/transaction_db.h" | |
24 | #include "util/cast_util.h" | |
25 | #include "util/mutexlock.h" | |
26 | #include "util/string_util.h" | |
27 | #include "util/sync_point.h" | |
28 | #include "utilities/transactions/pessimistic_transaction.h" | |
29 | #include "utilities/transactions/transaction_db_mutex_impl.h" | |
30 | ||
31 | namespace rocksdb { | |
32 | ||
33 | Status WritePreparedTxnDB::Initialize( | |
34 | const std::vector<size_t>& compaction_enabled_cf_indices, | |
35 | const std::vector<ColumnFamilyHandle*>& handles) { | |
36 | auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB()); | |
37 | assert(dbimpl != nullptr); | |
38 | auto rtxns = dbimpl->recovered_transactions(); | |
39 | for (auto rtxn : rtxns) { | |
40 | // There should only one batch for WritePrepared policy. | |
41 | assert(rtxn.second->batches_.size() == 1); | |
42 | const auto& seq = rtxn.second->batches_.begin()->first; | |
43 | const auto& batch_info = rtxn.second->batches_.begin()->second; | |
44 | auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1; | |
45 | for (size_t i = 0; i < cnt; i++) { | |
46 | AddPrepared(seq + i); | |
47 | } | |
48 | } | |
49 | SequenceNumber prev_max = max_evicted_seq_; | |
50 | SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); | |
51 | AdvanceMaxEvictedSeq(prev_max, last_seq); | |
52 | ||
53 | db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this)); | |
54 | // A callback to commit a single sub-batch | |
55 | class CommitSubBatchPreReleaseCallback : public PreReleaseCallback { | |
56 | public: | |
57 | explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db) | |
58 | : db_(db) {} | |
59 | virtual Status Callback(SequenceNumber commit_seq, | |
60 | bool is_mem_disabled) override { | |
61 | #ifdef NDEBUG | |
62 | (void)is_mem_disabled; | |
63 | #endif | |
64 | assert(!is_mem_disabled); | |
65 | db_->AddCommitted(commit_seq, commit_seq); | |
66 | return Status::OK(); | |
67 | } | |
68 | ||
69 | private: | |
70 | WritePreparedTxnDB* db_; | |
71 | }; | |
72 | db_impl_->SetRecoverableStatePreReleaseCallback( | |
73 | new CommitSubBatchPreReleaseCallback(this)); | |
74 | ||
75 | auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, | |
76 | handles); | |
77 | return s; | |
78 | } | |
79 | ||
80 | Status WritePreparedTxnDB::VerifyCFOptions( | |
81 | const ColumnFamilyOptions& cf_options) { | |
82 | Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options); | |
83 | if (!s.ok()) { | |
84 | return s; | |
85 | } | |
86 | if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) { | |
87 | return Status::InvalidArgument( | |
88 | "memtable_factory->CanHandleDuplicatedKey() cannot be false with " | |
89 | "WritePrpeared transactions"); | |
90 | } | |
91 | return Status::OK(); | |
92 | } | |
93 | ||
94 | Transaction* WritePreparedTxnDB::BeginTransaction( | |
95 | const WriteOptions& write_options, const TransactionOptions& txn_options, | |
96 | Transaction* old_txn) { | |
97 | if (old_txn != nullptr) { | |
98 | ReinitializeTransaction(old_txn, write_options, txn_options); | |
99 | return old_txn; | |
100 | } else { | |
101 | return new WritePreparedTxn(this, write_options, txn_options); | |
102 | } | |
103 | } | |
104 | ||
105 | Status WritePreparedTxnDB::Write( | |
106 | const WriteOptions& opts, | |
107 | const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { | |
108 | if (optimizations.skip_concurrency_control) { | |
109 | // Skip locking the rows | |
110 | const size_t UNKNOWN_BATCH_CNT = 0; | |
111 | const size_t ONE_BATCH_CNT = 1; | |
112 | const size_t batch_cnt = optimizations.skip_duplicate_key_check | |
113 | ? ONE_BATCH_CNT | |
114 | : UNKNOWN_BATCH_CNT; | |
115 | WritePreparedTxn* NO_TXN = nullptr; | |
116 | return WriteInternal(opts, updates, batch_cnt, NO_TXN); | |
117 | } else { | |
118 | // TODO(myabandeh): Make use of skip_duplicate_key_check hint | |
119 | // Fall back to unoptimized version | |
120 | return PessimisticTransactionDB::Write(opts, updates); | |
121 | } | |
122 | } | |
123 | ||
124 | Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, | |
125 | WriteBatch* batch, size_t batch_cnt, | |
126 | WritePreparedTxn* txn) { | |
127 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, | |
128 | "CommitBatchInternal"); | |
129 | if (batch->Count() == 0) { | |
130 | // Otherwise our 1 seq per batch logic will break since there is no seq | |
131 | // increased for this batch. | |
132 | return Status::OK(); | |
133 | } | |
134 | if (batch_cnt == 0) { // not provided, then compute it | |
135 | // TODO(myabandeh): add an option to allow user skipping this cost | |
136 | SubBatchCounter counter(*GetCFComparatorMap()); | |
137 | auto s = batch->Iterate(&counter); | |
138 | assert(s.ok()); | |
139 | batch_cnt = counter.BatchCount(); | |
140 | WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD); | |
141 | ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches", | |
142 | static_cast<uint64_t>(batch_cnt)); | |
143 | } | |
144 | assert(batch_cnt); | |
145 | ||
146 | bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; | |
147 | WriteOptions write_options(write_options_orig); | |
148 | bool sync = write_options.sync; | |
149 | if (!do_one_write) { | |
150 | // No need to sync on the first write | |
151 | write_options.sync = false; | |
152 | } | |
153 | // In the absence of Prepare markers, use Noop as a batch separator | |
154 | WriteBatchInternal::InsertNoop(batch); | |
155 | const bool DISABLE_MEMTABLE = true; | |
156 | const uint64_t no_log_ref = 0; | |
157 | uint64_t seq_used = kMaxSequenceNumber; | |
158 | const size_t ZERO_PREPARES = 0; | |
159 | // Since this is not 2pc, there is no need for AddPrepared but having it in | |
160 | // the PreReleaseCallback enables an optimization. Refer to | |
161 | // SmallestUnCommittedSeq for more details. | |
162 | AddPreparedCallback add_prepared_callback( | |
163 | this, batch_cnt, db_impl_->immutable_db_options().two_write_queues); | |
164 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( | |
165 | this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt); | |
166 | PreReleaseCallback* pre_release_callback; | |
167 | if (do_one_write) { | |
168 | pre_release_callback = &update_commit_map; | |
169 | } else { | |
170 | pre_release_callback = &add_prepared_callback; | |
171 | } | |
172 | auto s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, | |
173 | no_log_ref, !DISABLE_MEMTABLE, &seq_used, | |
174 | batch_cnt, pre_release_callback); | |
175 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
176 | uint64_t prepare_seq = seq_used; | |
177 | if (txn != nullptr) { | |
178 | txn->SetId(prepare_seq); | |
179 | } | |
180 | if (!s.ok()) { | |
181 | return s; | |
182 | } | |
183 | if (do_one_write) { | |
184 | return s; | |
185 | } // else do the 2nd write for commit | |
186 | // Set the original value of sync | |
187 | write_options.sync = sync; | |
188 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, | |
189 | "CommitBatchInternal 2nd write prepare_seq: %" PRIu64, | |
190 | prepare_seq); | |
191 | // Commit the batch by writing an empty batch to the 2nd queue that will | |
192 | // release the commit sequence number to readers. | |
193 | const size_t ZERO_COMMITS = 0; | |
194 | WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( | |
195 | this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS); | |
196 | WriteBatch empty_batch; | |
197 | empty_batch.PutLogData(Slice()); | |
198 | const size_t ONE_BATCH = 1; | |
199 | // In the absence of Prepare markers, use Noop as a batch separator | |
200 | WriteBatchInternal::InsertNoop(&empty_batch); | |
201 | s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr, | |
202 | no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
203 | &update_commit_map_with_prepare); | |
204 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
205 | // Note RemovePrepared should be called after WriteImpl that publishsed the | |
206 | // seq. Otherwise SmallestUnCommittedSeq optimization breaks. | |
207 | RemovePrepared(prepare_seq, batch_cnt); | |
208 | return s; | |
209 | } | |
210 | ||
211 | Status WritePreparedTxnDB::Get(const ReadOptions& options, | |
212 | ColumnFamilyHandle* column_family, | |
213 | const Slice& key, PinnableSlice* value) { | |
214 | // We are fine with the latest committed value. This could be done by | |
215 | // specifying the snapshot as kMaxSequenceNumber. | |
216 | SequenceNumber seq = kMaxSequenceNumber; | |
217 | SequenceNumber min_uncommitted = 0; | |
218 | if (options.snapshot != nullptr) { | |
219 | seq = options.snapshot->GetSequenceNumber(); | |
220 | min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>( | |
221 | options.snapshot) | |
222 | ->min_uncommitted_; | |
223 | } else { | |
224 | min_uncommitted = SmallestUnCommittedSeq(); | |
225 | } | |
226 | WritePreparedTxnReadCallback callback(this, seq, min_uncommitted); | |
227 | bool* dont_care = nullptr; | |
228 | // Note: no need to specify a snapshot for read options as no specific | |
229 | // snapshot is requested by the user. | |
230 | return db_impl_->GetImpl(options, column_family, key, value, dont_care, | |
231 | &callback); | |
232 | } | |
233 | ||
234 | void WritePreparedTxnDB::UpdateCFComparatorMap( | |
235 | const std::vector<ColumnFamilyHandle*>& handles) { | |
236 | auto cf_map = new std::map<uint32_t, const Comparator*>(); | |
237 | auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>(); | |
238 | for (auto h : handles) { | |
239 | auto id = h->GetID(); | |
240 | const Comparator* comparator = h->GetComparator(); | |
241 | (*cf_map)[id] = comparator; | |
242 | if (id != 0) { | |
243 | (*handle_map)[id] = h; | |
244 | } else { | |
245 | // The pointer to the default cf handle in the handles will be deleted. | |
246 | // Use the pointer maintained by the db instead. | |
247 | (*handle_map)[id] = DefaultColumnFamily(); | |
248 | } | |
249 | } | |
250 | cf_map_.reset(cf_map); | |
251 | handle_map_.reset(handle_map); | |
252 | } | |
253 | ||
254 | void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) { | |
255 | auto old_cf_map_ptr = cf_map_.get(); | |
256 | assert(old_cf_map_ptr); | |
257 | auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr); | |
258 | auto old_handle_map_ptr = handle_map_.get(); | |
259 | assert(old_handle_map_ptr); | |
260 | auto handle_map = | |
261 | new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr); | |
262 | auto id = h->GetID(); | |
263 | const Comparator* comparator = h->GetComparator(); | |
264 | (*cf_map)[id] = comparator; | |
265 | (*handle_map)[id] = h; | |
266 | cf_map_.reset(cf_map); | |
267 | handle_map_.reset(handle_map); | |
268 | } | |
269 | ||
270 | ||
271 | std::vector<Status> WritePreparedTxnDB::MultiGet( | |
272 | const ReadOptions& options, | |
273 | const std::vector<ColumnFamilyHandle*>& column_family, | |
274 | const std::vector<Slice>& keys, std::vector<std::string>* values) { | |
275 | assert(values); | |
276 | size_t num_keys = keys.size(); | |
277 | values->resize(num_keys); | |
278 | ||
279 | std::vector<Status> stat_list(num_keys); | |
280 | for (size_t i = 0; i < num_keys; ++i) { | |
281 | std::string* value = values ? &(*values)[i] : nullptr; | |
282 | stat_list[i] = this->Get(options, column_family[i], keys[i], value); | |
283 | } | |
284 | return stat_list; | |
285 | } | |
286 | ||
287 | // Struct to hold ownership of snapshot and read callback for iterator cleanup. | |
288 | struct WritePreparedTxnDB::IteratorState { | |
289 | IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, | |
290 | std::shared_ptr<ManagedSnapshot> s, | |
291 | SequenceNumber min_uncommitted) | |
292 | : callback(txn_db, sequence, min_uncommitted), snapshot(s) {} | |
293 | ||
294 | WritePreparedTxnReadCallback callback; | |
295 | std::shared_ptr<ManagedSnapshot> snapshot; | |
296 | }; | |
297 | ||
298 | namespace { | |
299 | static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) { | |
300 | delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1); | |
301 | } | |
302 | } // anonymous namespace | |
303 | ||
304 | Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, | |
305 | ColumnFamilyHandle* column_family) { | |
306 | constexpr bool ALLOW_BLOB = true; | |
307 | constexpr bool ALLOW_REFRESH = true; | |
308 | std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; | |
309 | SequenceNumber snapshot_seq = kMaxSequenceNumber; | |
310 | SequenceNumber min_uncommitted = 0; | |
311 | if (options.snapshot != nullptr) { | |
312 | snapshot_seq = options.snapshot->GetSequenceNumber(); | |
313 | min_uncommitted = | |
314 | static_cast_with_check<const SnapshotImpl, const Snapshot>( | |
315 | options.snapshot) | |
316 | ->min_uncommitted_; | |
317 | } else { | |
318 | auto* snapshot = GetSnapshot(); | |
319 | // We take a snapshot to make sure that the related data in the commit map | |
320 | // are not deleted. | |
321 | snapshot_seq = snapshot->GetSequenceNumber(); | |
322 | min_uncommitted = | |
323 | static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot) | |
324 | ->min_uncommitted_; | |
325 | own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); | |
326 | } | |
327 | assert(snapshot_seq != kMaxSequenceNumber); | |
328 | auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); | |
329 | auto* state = | |
330 | new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted); | |
331 | auto* db_iter = | |
332 | db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, | |
333 | !ALLOW_BLOB, !ALLOW_REFRESH); | |
334 | db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); | |
335 | return db_iter; | |
336 | } | |
337 | ||
338 | Status WritePreparedTxnDB::NewIterators( | |
339 | const ReadOptions& options, | |
340 | const std::vector<ColumnFamilyHandle*>& column_families, | |
341 | std::vector<Iterator*>* iterators) { | |
342 | constexpr bool ALLOW_BLOB = true; | |
343 | constexpr bool ALLOW_REFRESH = true; | |
344 | std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; | |
345 | SequenceNumber snapshot_seq = kMaxSequenceNumber; | |
346 | SequenceNumber min_uncommitted = 0; | |
347 | if (options.snapshot != nullptr) { | |
348 | snapshot_seq = options.snapshot->GetSequenceNumber(); | |
349 | min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>( | |
350 | options.snapshot) | |
351 | ->min_uncommitted_; | |
352 | } else { | |
353 | auto* snapshot = GetSnapshot(); | |
354 | // We take a snapshot to make sure that the related data in the commit map | |
355 | // are not deleted. | |
356 | snapshot_seq = snapshot->GetSequenceNumber(); | |
357 | own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); | |
358 | min_uncommitted = | |
359 | static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot) | |
360 | ->min_uncommitted_; | |
361 | } | |
362 | iterators->clear(); | |
363 | iterators->reserve(column_families.size()); | |
364 | for (auto* column_family : column_families) { | |
365 | auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); | |
366 | auto* state = | |
367 | new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted); | |
368 | auto* db_iter = | |
369 | db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, | |
370 | !ALLOW_BLOB, !ALLOW_REFRESH); | |
371 | db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); | |
372 | iterators->push_back(db_iter); | |
373 | } | |
374 | return Status::OK(); | |
375 | } | |
376 | ||
377 | void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { | |
378 | // Adcance max_evicted_seq_ no more than 100 times before the cache wraps | |
379 | // around. | |
380 | INC_STEP_FOR_MAX_EVICTED = | |
381 | std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1)); | |
382 | snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>( | |
383 | new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {}); | |
384 | commit_cache_ = unique_ptr<std::atomic<CommitEntry64b>[]>( | |
385 | new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {}); | |
386 | } | |
387 | ||
388 | void WritePreparedTxnDB::AddPrepared(uint64_t seq) { | |
389 | ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq); | |
390 | assert(seq > max_evicted_seq_); | |
391 | if (seq <= max_evicted_seq_) { | |
392 | throw std::runtime_error( | |
393 | "Added prepare_seq is larger than max_evicted_seq_: " + ToString(seq) + | |
394 | " <= " + ToString(max_evicted_seq_.load())); | |
395 | } | |
396 | WriteLock wl(&prepared_mutex_); | |
397 | prepared_txns_.push(seq); | |
398 | } | |
399 | ||
400 | void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, | |
401 | uint8_t loop_cnt) { | |
402 | ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, | |
403 | prepare_seq, commit_seq); | |
404 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start"); | |
405 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause"); | |
406 | auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; | |
407 | CommitEntry64b evicted_64b; | |
408 | CommitEntry evicted; | |
409 | bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted); | |
410 | if (LIKELY(to_be_evicted)) { | |
411 | assert(evicted.prep_seq != prepare_seq); | |
412 | auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); | |
413 | ROCKS_LOG_DETAILS(info_log_, | |
414 | "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64, | |
415 | evicted.prep_seq, evicted.commit_seq, prev_max); | |
416 | if (prev_max < evicted.commit_seq) { | |
417 | // Inc max in larger steps to avoid frequent updates | |
418 | auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED; | |
419 | AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); | |
420 | } | |
421 | // After each eviction from commit cache, check if the commit entry should | |
422 | // be kept around because it overlaps with a live snapshot. | |
423 | CheckAgainstSnapshots(evicted); | |
424 | } | |
425 | bool succ = | |
426 | ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq}); | |
427 | if (UNLIKELY(!succ)) { | |
428 | ROCKS_LOG_ERROR(info_log_, | |
429 | "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64 | |
430 | ",%" PRIu64 " retrying...", | |
431 | indexed_seq, prepare_seq, commit_seq); | |
432 | // A very rare event, in which the commit entry is updated before we do. | |
433 | // Here we apply a very simple solution of retrying. | |
434 | if (loop_cnt > 100) { | |
435 | throw std::runtime_error("Infinite loop in AddCommitted!"); | |
436 | } | |
437 | AddCommitted(prepare_seq, commit_seq, ++loop_cnt); | |
438 | return; | |
439 | } | |
440 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end"); | |
441 | TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause"); | |
442 | } | |
443 | ||
444 | void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq, | |
445 | const size_t batch_cnt) { | |
446 | WriteLock wl(&prepared_mutex_); | |
447 | for (size_t i = 0; i < batch_cnt; i++) { | |
448 | prepared_txns_.erase(prepare_seq + i); | |
449 | bool was_empty = delayed_prepared_.empty(); | |
450 | if (!was_empty) { | |
451 | delayed_prepared_.erase(prepare_seq + i); | |
452 | bool is_empty = delayed_prepared_.empty(); | |
453 | if (was_empty != is_empty) { | |
454 | delayed_prepared_empty_.store(is_empty, std::memory_order_release); | |
455 | } | |
456 | } | |
457 | } | |
458 | } | |
459 | ||
460 | bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, | |
461 | CommitEntry64b* entry_64b, | |
462 | CommitEntry* entry) const { | |
463 | *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(std::memory_order_acquire); | |
464 | bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT); | |
465 | return valid; | |
466 | } | |
467 | ||
468 | bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq, | |
469 | const CommitEntry& new_entry, | |
470 | CommitEntry* evicted_entry) { | |
471 | CommitEntry64b new_entry_64b(new_entry, FORMAT); | |
472 | CommitEntry64b evicted_entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].exchange( | |
473 | new_entry_64b, std::memory_order_acq_rel); | |
474 | bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT); | |
475 | return valid; | |
476 | } | |
477 | ||
478 | bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, | |
479 | CommitEntry64b& expected_entry_64b, | |
480 | const CommitEntry& new_entry) { | |
481 | auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)]; | |
482 | CommitEntry64b new_entry_64b(new_entry, FORMAT); | |
483 | bool succ = atomic_entry.compare_exchange_strong( | |
484 | expected_entry_64b, new_entry_64b, std::memory_order_acq_rel, | |
485 | std::memory_order_acquire); | |
486 | return succ; | |
487 | } | |
488 | ||
489 | void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, | |
490 | const SequenceNumber& new_max) { | |
491 | ROCKS_LOG_DETAILS(info_log_, | |
492 | "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64, | |
493 | prev_max, new_max); | |
494 | // When max_evicted_seq_ advances, move older entries from prepared_txns_ | |
495 | // to delayed_prepared_. This guarantees that if a seq is lower than max, | |
496 | // then it is not in prepared_txns_ ans save an expensive, synchronized | |
497 | // lookup from a shared set. delayed_prepared_ is expected to be empty in | |
498 | // normal cases. | |
499 | { | |
500 | WriteLock wl(&prepared_mutex_); | |
501 | while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { | |
502 | auto to_be_popped = prepared_txns_.top(); | |
503 | delayed_prepared_.insert(to_be_popped); | |
504 | ROCKS_LOG_WARN(info_log_, | |
505 | "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64 | |
506 | " new_max=%" PRIu64 " oldmax=%" PRIu64, | |
507 | static_cast<uint64_t>(delayed_prepared_.size()), | |
508 | to_be_popped, new_max, prev_max); | |
509 | prepared_txns_.pop(); | |
510 | delayed_prepared_empty_.store(false, std::memory_order_release); | |
511 | } | |
512 | } | |
513 | ||
514 | // With each change to max_evicted_seq_ fetch the live snapshots behind it. | |
515 | // We use max as the version of snapshots to identify how fresh are the | |
516 | // snapshot list. This works because the snapshots are between 0 and | |
517 | // max, so the larger the max, the more complete they are. | |
518 | SequenceNumber new_snapshots_version = new_max; | |
519 | std::vector<SequenceNumber> snapshots; | |
520 | bool update_snapshots = false; | |
521 | if (new_snapshots_version > snapshots_version_) { | |
522 | // This is to avoid updating the snapshots_ if it already updated | |
523 | // with a more recent vesion by a concrrent thread | |
524 | update_snapshots = true; | |
525 | // We only care about snapshots lower then max | |
526 | snapshots = GetSnapshotListFromDB(new_max); | |
527 | } | |
528 | if (update_snapshots) { | |
529 | UpdateSnapshots(snapshots, new_snapshots_version); | |
530 | } | |
531 | auto updated_prev_max = prev_max; | |
532 | while (updated_prev_max < new_max && | |
533 | !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max, | |
534 | std::memory_order_acq_rel, | |
535 | std::memory_order_relaxed)) { | |
536 | }; | |
537 | } | |
538 | ||
539 | const Snapshot* WritePreparedTxnDB::GetSnapshot() { | |
540 | // Note: SmallestUnCommittedSeq must be called before GetSnapshotImpl. Refer | |
541 | // to WritePreparedTxn::SetSnapshot for more explanation. | |
542 | auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq(); | |
543 | const bool FOR_WW_CONFLICT_CHECK = true; | |
544 | SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK); | |
545 | assert(snap_impl); | |
546 | EnhanceSnapshot(snap_impl, min_uncommitted); | |
547 | return snap_impl; | |
548 | } | |
549 | ||
550 | const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB( | |
551 | SequenceNumber max) { | |
552 | ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max); | |
553 | InstrumentedMutex(db_impl_->mutex()); | |
554 | return db_impl_->snapshots().GetAll(nullptr, max); | |
555 | } | |
556 | ||
557 | void WritePreparedTxnDB::ReleaseSnapshot(const Snapshot* snapshot) { | |
558 | auto snap_seq = snapshot->GetSequenceNumber(); | |
559 | ReleaseSnapshotInternal(snap_seq); | |
560 | db_impl_->ReleaseSnapshot(snapshot); | |
561 | } | |
562 | ||
563 | void WritePreparedTxnDB::ReleaseSnapshotInternal( | |
564 | const SequenceNumber snap_seq) { | |
565 | // relax is enough since max increases monotonically, i.e., if snap_seq < | |
566 | // old_max => snap_seq < new_max as well. | |
567 | if (snap_seq < max_evicted_seq_.load(std::memory_order_relaxed)) { | |
568 | // Then this is a rare case that transaction did not finish before max | |
569 | // advances. It is expected for a few read-only backup snapshots. For such | |
570 | // snapshots we might have kept around a couple of entries in the | |
571 | // old_commit_map_. Check and do garbage collection if that is the case. | |
572 | bool need_gc = false; | |
573 | { | |
574 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
575 | ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); | |
576 | ReadLock rl(&old_commit_map_mutex_); | |
577 | auto prep_set_entry = old_commit_map_.find(snap_seq); | |
578 | need_gc = prep_set_entry != old_commit_map_.end(); | |
579 | } | |
580 | if (need_gc) { | |
581 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
582 | ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); | |
583 | WriteLock wl(&old_commit_map_mutex_); | |
584 | old_commit_map_.erase(snap_seq); | |
585 | old_commit_map_empty_.store(old_commit_map_.empty(), | |
586 | std::memory_order_release); | |
587 | } | |
588 | } | |
589 | } | |
590 | ||
591 | void WritePreparedTxnDB::UpdateSnapshots( | |
592 | const std::vector<SequenceNumber>& snapshots, | |
593 | const SequenceNumber& version) { | |
594 | ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64, | |
595 | version); | |
596 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start"); | |
597 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start"); | |
598 | #ifndef NDEBUG | |
599 | size_t sync_i = 0; | |
600 | #endif | |
601 | ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead"); | |
602 | WriteLock wl(&snapshots_mutex_); | |
603 | snapshots_version_ = version; | |
604 | // We update the list concurrently with the readers. | |
605 | // Both new and old lists are sorted and the new list is subset of the | |
606 | // previous list plus some new items. Thus if a snapshot repeats in | |
607 | // both new and old lists, it will appear upper in the new list. So if | |
608 | // we simply insert the new snapshots in order, if an overwritten item | |
609 | // is still valid in the new list is either written to the same place in | |
610 | // the array or it is written in a higher palce before it gets | |
611 | // overwritten by another item. This guarantess a reader that reads the | |
612 | // list bottom-up will eventaully see a snapshot that repeats in the | |
613 | // update, either before it gets overwritten by the writer or | |
614 | // afterwards. | |
615 | size_t i = 0; | |
616 | auto it = snapshots.begin(); | |
617 | for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) { | |
618 | snapshot_cache_[i].store(*it, std::memory_order_release); | |
619 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i); | |
620 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); | |
621 | } | |
622 | #ifndef NDEBUG | |
623 | // Release the remaining sync points since they are useless given that the | |
624 | // reader would also use lock to access snapshots | |
625 | for (++sync_i; sync_i <= 10; ++sync_i) { | |
626 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i); | |
627 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i); | |
628 | } | |
629 | #endif | |
630 | snapshots_.clear(); | |
631 | for (; it != snapshots.end(); it++) { | |
632 | // Insert them to a vector that is less efficient to access | |
633 | // concurrently | |
634 | snapshots_.push_back(*it); | |
635 | } | |
636 | // Update the size at the end. Otherwise a parallel reader might read | |
637 | // items that are not set yet. | |
638 | snapshots_total_.store(snapshots.size(), std::memory_order_release); | |
639 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end"); | |
640 | TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end"); | |
641 | } | |
642 | ||
643 | void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) { | |
644 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start"); | |
645 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start"); | |
646 | #ifndef NDEBUG | |
647 | size_t sync_i = 0; | |
648 | #endif | |
649 | // First check the snapshot cache that is efficient for concurrent access | |
650 | auto cnt = snapshots_total_.load(std::memory_order_acquire); | |
651 | // The list might get updated concurrently as we are reading from it. The | |
652 | // reader should be able to read all the snapshots that are still valid | |
653 | // after the update. Since the survived snapshots are written in a higher | |
654 | // place before gets overwritten the reader that reads bottom-up will | |
655 | // eventully see it. | |
656 | const bool next_is_larger = true; | |
657 | SequenceNumber snapshot_seq = kMaxSequenceNumber; | |
658 | size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE); | |
659 | for (; 0 < ip1; ip1--) { | |
660 | snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire); | |
661 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", | |
662 | ++sync_i); | |
663 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); | |
664 | if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, | |
665 | snapshot_seq, !next_is_larger)) { | |
666 | break; | |
667 | } | |
668 | } | |
669 | #ifndef NDEBUG | |
670 | // Release the remaining sync points before accquiring the lock | |
671 | for (++sync_i; sync_i <= 10; ++sync_i) { | |
672 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i); | |
673 | TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); | |
674 | } | |
675 | #endif | |
676 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end"); | |
677 | TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end"); | |
678 | if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE && | |
679 | snapshot_seq < evicted.prep_seq)) { | |
680 | // Then access the less efficient list of snapshots_ | |
681 | WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD); | |
682 | ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead"); | |
683 | ReadLock rl(&snapshots_mutex_); | |
684 | // Items could have moved from the snapshots_ to snapshot_cache_ before | |
685 | // accquiring the lock. To make sure that we do not miss a valid snapshot, | |
686 | // read snapshot_cache_ again while holding the lock. | |
687 | for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) { | |
688 | snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire); | |
689 | if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, | |
690 | snapshot_seq, next_is_larger)) { | |
691 | break; | |
692 | } | |
693 | } | |
694 | for (auto snapshot_seq_2 : snapshots_) { | |
695 | if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, | |
696 | snapshot_seq_2, next_is_larger)) { | |
697 | break; | |
698 | } | |
699 | } | |
700 | } | |
701 | } | |
702 | ||
703 | bool WritePreparedTxnDB::MaybeUpdateOldCommitMap( | |
704 | const uint64_t& prep_seq, const uint64_t& commit_seq, | |
705 | const uint64_t& snapshot_seq, const bool next_is_larger = true) { | |
706 | // If we do not store an entry in old_commit_map_ we assume it is committed in | |
707 | // all snapshots. If commit_seq <= snapshot_seq, it is considered already in | |
708 | // the snapshot so we need not to keep the entry around for this snapshot. | |
709 | if (commit_seq <= snapshot_seq) { | |
710 | // continue the search if the next snapshot could be smaller than commit_seq | |
711 | return !next_is_larger; | |
712 | } | |
713 | // then snapshot_seq < commit_seq | |
714 | if (prep_seq <= snapshot_seq) { // overlapping range | |
715 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |
716 | ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); | |
717 | WriteLock wl(&old_commit_map_mutex_); | |
718 | old_commit_map_empty_.store(false, std::memory_order_release); | |
719 | auto& vec = old_commit_map_[snapshot_seq]; | |
720 | vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq); | |
721 | // We need to store it once for each overlapping snapshot. Returning true to | |
722 | // continue the search if there is more overlapping snapshot. | |
723 | return true; | |
724 | } | |
725 | // continue the search if the next snapshot could be larger than prep_seq | |
726 | return next_is_larger; | |
727 | } | |
728 | ||
729 | WritePreparedTxnDB::~WritePreparedTxnDB() { | |
730 | // At this point there could be running compaction/flush holding a | |
731 | // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB. | |
732 | // Make sure those jobs finished before destructing WritePreparedTxnDB. | |
733 | db_impl_->CancelAllBackgroundWork(true /*wait*/); | |
734 | } | |
735 | ||
736 | void SubBatchCounter::InitWithComp(const uint32_t cf) { | |
737 | auto cmp = comparators_[cf]; | |
738 | keys_[cf] = CFKeys(SetComparator(cmp)); | |
739 | } | |
740 | ||
741 | void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) { | |
742 | CFKeys& cf_keys = keys_[cf]; | |
743 | if (cf_keys.size() == 0) { // just inserted | |
744 | InitWithComp(cf); | |
745 | } | |
746 | auto it = cf_keys.insert(key); | |
747 | if (it.second == false) { // second is false if a element already existed. | |
748 | batches_++; | |
749 | keys_.clear(); | |
750 | InitWithComp(cf); | |
751 | keys_[cf].insert(key); | |
752 | } | |
753 | } | |
754 | ||
755 | } // namespace rocksdb | |
756 | #endif // ROCKSDB_LITE |