]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn_db.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/write_prepared_txn_db.h"
9
10 #include <algorithm>
11 #include <cinttypes>
12 #include <string>
13 #include <unordered_set>
14 #include <vector>
15
16 #include "db/arena_wrapped_db_iter.h"
17 #include "db/db_impl/db_impl.h"
18 #include "logging/logging.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/utilities/transaction_db.h"
22 #include "test_util/sync_point.h"
23 #include "util/cast_util.h"
24 #include "util/mutexlock.h"
25 #include "util/string_util.h"
26 #include "utilities/transactions/pessimistic_transaction.h"
27 #include "utilities/transactions/transaction_db_mutex_impl.h"
28
29 // This function is for testing only. If it returns true, then all entries in
30 // the commit cache will be evicted. Unit and/or stress tests (db_stress)
31 // can implement this function and customize how frequently commit cache
32 // eviction occurs.
33 // TODO: remove this function once we can configure commit cache to be very
34 // small so that eviction occurs very frequently. This requires the commit
35 // cache entry to be able to encode prepare and commit sequence numbers so that
36 // the commit sequence number does not have to be within a certain range of
37 // prepare sequence number.
38 extern "C" bool rocksdb_write_prepared_TEST_ShouldClearCommitCache(void)
39 __attribute__((__weak__));
40
41 namespace ROCKSDB_NAMESPACE {
42
43 Status WritePreparedTxnDB::Initialize(
44 const std::vector<size_t>& compaction_enabled_cf_indices,
45 const std::vector<ColumnFamilyHandle*>& handles) {
46 auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
47 assert(dbimpl != nullptr);
48 auto rtxns = dbimpl->recovered_transactions();
49 std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
50 for (auto rtxn : rtxns) {
51 // There should only one batch for WritePrepared policy.
52 assert(rtxn.second->batches_.size() == 1);
53 const auto& seq = rtxn.second->batches_.begin()->first;
54 const auto& batch_info = rtxn.second->batches_.begin()->second;
55 auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
56 ordered_seq_cnt[seq] = cnt;
57 }
58 // AddPrepared must be called in order
59 for (auto seq_cnt : ordered_seq_cnt) {
60 auto seq = seq_cnt.first;
61 auto cnt = seq_cnt.second;
62 for (size_t i = 0; i < cnt; i++) {
63 AddPrepared(seq + i);
64 }
65 }
66 SequenceNumber prev_max = max_evicted_seq_;
67 SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
68 AdvanceMaxEvictedSeq(prev_max, last_seq);
69 // Create a gap between max and the next snapshot. This simplifies the logic
70 // in IsInSnapshot by not having to consider the special case of max ==
71 // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
72 if (last_seq) {
73 db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
74 db_impl_->versions_->SetLastSequence(last_seq + 1);
75 db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
76 }
77
78 db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
79 // A callback to commit a single sub-batch
80 class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
81 public:
82 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
83 : db_(db) {}
84 Status Callback(SequenceNumber commit_seq,
85 bool is_mem_disabled __attribute__((__unused__)), uint64_t,
86 size_t /*index*/, size_t /*total*/) override {
87 assert(!is_mem_disabled);
88 db_->AddCommitted(commit_seq, commit_seq);
89 return Status::OK();
90 }
91
92 private:
93 WritePreparedTxnDB* db_;
94 };
95 db_impl_->SetRecoverableStatePreReleaseCallback(
96 new CommitSubBatchPreReleaseCallback(this));
97
98 auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
99 handles);
100 return s;
101 }
102
103 Status WritePreparedTxnDB::VerifyCFOptions(
104 const ColumnFamilyOptions& cf_options) {
105 Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
106 if (!s.ok()) {
107 return s;
108 }
109 if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
110 return Status::InvalidArgument(
111 "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
112 "WritePrpeared transactions");
113 }
114 return Status::OK();
115 }
116
117 Transaction* WritePreparedTxnDB::BeginTransaction(
118 const WriteOptions& write_options, const TransactionOptions& txn_options,
119 Transaction* old_txn) {
120 if (old_txn != nullptr) {
121 ReinitializeTransaction(old_txn, write_options, txn_options);
122 return old_txn;
123 } else {
124 return new WritePreparedTxn(this, write_options, txn_options);
125 }
126 }
127
128 Status WritePreparedTxnDB::Write(const WriteOptions& opts,
129 WriteBatch* updates) {
130 if (txn_db_options_.skip_concurrency_control) {
131 // Skip locking the rows
132 const size_t UNKNOWN_BATCH_CNT = 0;
133 WritePreparedTxn* NO_TXN = nullptr;
134 return WriteInternal(opts, updates, UNKNOWN_BATCH_CNT, NO_TXN);
135 } else {
136 return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
137 }
138 }
139
140 Status WritePreparedTxnDB::Write(
141 const WriteOptions& opts,
142 const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
143 if (optimizations.skip_concurrency_control) {
144 // Skip locking the rows
145 const size_t UNKNOWN_BATCH_CNT = 0;
146 const size_t ONE_BATCH_CNT = 1;
147 const size_t batch_cnt = optimizations.skip_duplicate_key_check
148 ? ONE_BATCH_CNT
149 : UNKNOWN_BATCH_CNT;
150 WritePreparedTxn* NO_TXN = nullptr;
151 return WriteInternal(opts, updates, batch_cnt, NO_TXN);
152 } else {
153 // TODO(myabandeh): Make use of skip_duplicate_key_check hint
154 // Fall back to unoptimized version
155 return PessimisticTransactionDB::WriteWithConcurrencyControl(opts, updates);
156 }
157 }
158
159 Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
160 WriteBatch* batch, size_t batch_cnt,
161 WritePreparedTxn* txn) {
162 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
163 "CommitBatchInternal");
164 if (batch->Count() == 0) {
165 // Otherwise our 1 seq per batch logic will break since there is no seq
166 // increased for this batch.
167 return Status::OK();
168 }
169
170 if (write_options_orig.protection_bytes_per_key > 0) {
171 auto s = WriteBatchInternal::UpdateProtectionInfo(
172 batch, write_options_orig.protection_bytes_per_key);
173 if (!s.ok()) {
174 return s;
175 }
176 }
177
178 if (batch_cnt == 0) { // not provided, then compute it
179 // TODO(myabandeh): add an option to allow user skipping this cost
180 SubBatchCounter counter(*GetCFComparatorMap());
181 auto s = batch->Iterate(&counter);
182 if (!s.ok()) {
183 return s;
184 }
185 batch_cnt = counter.BatchCount();
186 WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
187 ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
188 static_cast<uint64_t>(batch_cnt));
189 }
190 assert(batch_cnt);
191
192 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
193 WriteOptions write_options(write_options_orig);
194 // In the absence of Prepare markers, use Noop as a batch separator
195 auto s = WriteBatchInternal::InsertNoop(batch);
196 assert(s.ok());
197 const bool DISABLE_MEMTABLE = true;
198 const uint64_t no_log_ref = 0;
199 uint64_t seq_used = kMaxSequenceNumber;
200 const size_t ZERO_PREPARES = 0;
201 const bool kSeperatePrepareCommitBatches = true;
202 // Since this is not 2pc, there is no need for AddPrepared but having it in
203 // the PreReleaseCallback enables an optimization. Refer to
204 // SmallestUnCommittedSeq for more details.
205 AddPreparedCallback add_prepared_callback(
206 this, db_impl_, batch_cnt,
207 db_impl_->immutable_db_options().two_write_queues,
208 !kSeperatePrepareCommitBatches);
209 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
210 this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
211 PreReleaseCallback* pre_release_callback;
212 if (do_one_write) {
213 pre_release_callback = &update_commit_map;
214 } else {
215 pre_release_callback = &add_prepared_callback;
216 }
217 s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, no_log_ref,
218 !DISABLE_MEMTABLE, &seq_used, batch_cnt,
219 pre_release_callback);
220 assert(!s.ok() || seq_used != kMaxSequenceNumber);
221 uint64_t prepare_seq = seq_used;
222 if (txn != nullptr) {
223 txn->SetId(prepare_seq);
224 }
225 if (!s.ok()) {
226 return s;
227 }
228 if (do_one_write) {
229 return s;
230 } // else do the 2nd write for commit
231 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
232 "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
233 prepare_seq);
234 // Commit the batch by writing an empty batch to the 2nd queue that will
235 // release the commit sequence number to readers.
236 const size_t ZERO_COMMITS = 0;
237 WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
238 this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
239 WriteBatch empty_batch;
240 write_options.disableWAL = true;
241 write_options.sync = false;
242 const size_t ONE_BATCH = 1; // Just to inc the seq
243 s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
244 no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
245 &update_commit_map_with_prepare);
246 assert(!s.ok() || seq_used != kMaxSequenceNumber);
247 // Note: RemovePrepared is called from within PreReleaseCallback
248 return s;
249 }
250
251 Status WritePreparedTxnDB::Get(const ReadOptions& options,
252 ColumnFamilyHandle* column_family,
253 const Slice& key, PinnableSlice* value) {
254 SequenceNumber min_uncommitted, snap_seq;
255 const SnapshotBackup backed_by_snapshot =
256 AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
257 WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted,
258 backed_by_snapshot);
259 bool* dont_care = nullptr;
260 DBImpl::GetImplOptions get_impl_options;
261 get_impl_options.column_family = column_family;
262 get_impl_options.value = value;
263 get_impl_options.value_found = dont_care;
264 get_impl_options.callback = &callback;
265 auto res = db_impl_->GetImpl(options, key, get_impl_options);
266 if (LIKELY(callback.valid() && ValidateSnapshot(callback.max_visible_seq(),
267 backed_by_snapshot))) {
268 return res;
269 } else {
270 res.PermitUncheckedError();
271 WPRecordTick(TXN_GET_TRY_AGAIN);
272 return Status::TryAgain();
273 }
274 }
275
276 void WritePreparedTxnDB::UpdateCFComparatorMap(
277 const std::vector<ColumnFamilyHandle*>& handles) {
278 auto cf_map = new std::map<uint32_t, const Comparator*>();
279 auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>();
280 for (auto h : handles) {
281 auto id = h->GetID();
282 const Comparator* comparator = h->GetComparator();
283 (*cf_map)[id] = comparator;
284 if (id != 0) {
285 (*handle_map)[id] = h;
286 } else {
287 // The pointer to the default cf handle in the handles will be deleted.
288 // Use the pointer maintained by the db instead.
289 (*handle_map)[id] = DefaultColumnFamily();
290 }
291 }
292 cf_map_.reset(cf_map);
293 handle_map_.reset(handle_map);
294 }
295
296 void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) {
297 auto old_cf_map_ptr = cf_map_.get();
298 assert(old_cf_map_ptr);
299 auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
300 auto old_handle_map_ptr = handle_map_.get();
301 assert(old_handle_map_ptr);
302 auto handle_map =
303 new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr);
304 auto id = h->GetID();
305 const Comparator* comparator = h->GetComparator();
306 (*cf_map)[id] = comparator;
307 (*handle_map)[id] = h;
308 cf_map_.reset(cf_map);
309 handle_map_.reset(handle_map);
310 }
311
312 std::vector<Status> WritePreparedTxnDB::MultiGet(
313 const ReadOptions& options,
314 const std::vector<ColumnFamilyHandle*>& column_family,
315 const std::vector<Slice>& keys, std::vector<std::string>* values) {
316 assert(values);
317 size_t num_keys = keys.size();
318 values->resize(num_keys);
319
320 std::vector<Status> stat_list(num_keys);
321 for (size_t i = 0; i < num_keys; ++i) {
322 stat_list[i] = this->Get(options, column_family[i], keys[i], &(*values)[i]);
323 }
324 return stat_list;
325 }
326
327 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
328 struct WritePreparedTxnDB::IteratorState {
329 IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
330 std::shared_ptr<ManagedSnapshot> s,
331 SequenceNumber min_uncommitted)
332 : callback(txn_db, sequence, min_uncommitted, kBackedByDBSnapshot),
333 snapshot(s) {}
334
335 WritePreparedTxnReadCallback callback;
336 std::shared_ptr<ManagedSnapshot> snapshot;
337 };
338
339 namespace {
340 static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
341 delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
342 }
343 } // anonymous namespace
344
345 Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
346 ColumnFamilyHandle* column_family) {
347 constexpr bool expose_blob_index = false;
348 constexpr bool allow_refresh = false;
349 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
350 SequenceNumber snapshot_seq = kMaxSequenceNumber;
351 SequenceNumber min_uncommitted = 0;
352 if (options.snapshot != nullptr) {
353 snapshot_seq = options.snapshot->GetSequenceNumber();
354 min_uncommitted =
355 static_cast_with_check<const SnapshotImpl>(options.snapshot)
356 ->min_uncommitted_;
357 } else {
358 auto* snapshot = GetSnapshot();
359 // We take a snapshot to make sure that the related data in the commit map
360 // are not deleted.
361 snapshot_seq = snapshot->GetSequenceNumber();
362 min_uncommitted =
363 static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
364 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
365 }
366 assert(snapshot_seq != kMaxSequenceNumber);
367 auto* cfd =
368 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
369 auto* state =
370 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
371 auto* db_iter =
372 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
373 expose_blob_index, allow_refresh);
374 db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
375 return db_iter;
376 }
377
378 Status WritePreparedTxnDB::NewIterators(
379 const ReadOptions& options,
380 const std::vector<ColumnFamilyHandle*>& column_families,
381 std::vector<Iterator*>* iterators) {
382 constexpr bool expose_blob_index = false;
383 constexpr bool allow_refresh = false;
384 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
385 SequenceNumber snapshot_seq = kMaxSequenceNumber;
386 SequenceNumber min_uncommitted = 0;
387 if (options.snapshot != nullptr) {
388 snapshot_seq = options.snapshot->GetSequenceNumber();
389 min_uncommitted =
390 static_cast_with_check<const SnapshotImpl>(options.snapshot)
391 ->min_uncommitted_;
392 } else {
393 auto* snapshot = GetSnapshot();
394 // We take a snapshot to make sure that the related data in the commit map
395 // are not deleted.
396 snapshot_seq = snapshot->GetSequenceNumber();
397 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
398 min_uncommitted =
399 static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
400 }
401 iterators->clear();
402 iterators->reserve(column_families.size());
403 for (auto* column_family : column_families) {
404 auto* cfd =
405 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
406 auto* state =
407 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
408 auto* db_iter =
409 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
410 expose_blob_index, allow_refresh);
411 db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
412 iterators->push_back(db_iter);
413 }
414 return Status::OK();
415 }
416
417 void WritePreparedTxnDB::Init(const TransactionDBOptions& txn_db_opts) {
418 // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
419 // around.
420 INC_STEP_FOR_MAX_EVICTED =
421 std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
422 snapshot_cache_ = std::unique_ptr<std::atomic<SequenceNumber>[]>(
423 new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
424 commit_cache_ = std::unique_ptr<std::atomic<CommitEntry64b>[]>(
425 new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
426 dummy_max_snapshot_.number_ = kMaxSequenceNumber;
427 rollback_deletion_type_callback_ =
428 txn_db_opts.rollback_deletion_type_callback;
429 }
430
431 void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,
432 bool locked) {
433 // When max_evicted_seq_ advances, move older entries from prepared_txns_
434 // to delayed_prepared_. This guarantees that if a seq is lower than max,
435 // then it is not in prepared_txns_ and save an expensive, synchronized
436 // lookup from a shared set. delayed_prepared_ is expected to be empty in
437 // normal cases.
438 ROCKS_LOG_DETAILS(
439 info_log_,
440 "CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64,
441 prepared_txns_.empty(),
442 prepared_txns_.empty() ? 0 : prepared_txns_.top());
443 const SequenceNumber prepared_top = prepared_txns_.top();
444 const bool empty = prepared_top == kMaxSequenceNumber;
445 // Preliminary check to avoid the synchronization cost
446 if (!empty && prepared_top <= new_max) {
447 if (locked) {
448 // Needed to avoid double locking in pop().
449 prepared_txns_.push_pop_mutex()->Unlock();
450 }
451 WriteLock wl(&prepared_mutex_);
452 // Need to fetch fresh values of ::top after mutex is acquired
453 while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
454 auto to_be_popped = prepared_txns_.top();
455 delayed_prepared_.insert(to_be_popped);
456 ROCKS_LOG_WARN(info_log_,
457 "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
458 " new_max=%" PRIu64 ")",
459 static_cast<uint64_t>(delayed_prepared_.size()),
460 to_be_popped, new_max);
461 delayed_prepared_empty_.store(false, std::memory_order_release);
462 // Update prepared_txns_ after updating delayed_prepared_empty_ otherwise
463 // there will be a point in time that the entry is neither in
464 // prepared_txns_ nor in delayed_prepared_, which will not be checked if
465 // delayed_prepared_empty_ is false.
466 prepared_txns_.pop();
467 }
468 if (locked) {
469 prepared_txns_.push_pop_mutex()->Lock();
470 }
471 }
472 }
473
474 void WritePreparedTxnDB::AddPrepared(uint64_t seq, bool locked) {
475 ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64,
476 seq, max_evicted_seq_.load());
477 TEST_SYNC_POINT("AddPrepared::begin:pause");
478 TEST_SYNC_POINT("AddPrepared::begin:resume");
479 if (!locked) {
480 prepared_txns_.push_pop_mutex()->Lock();
481 }
482 prepared_txns_.push_pop_mutex()->AssertHeld();
483 prepared_txns_.push(seq);
484 auto new_max = future_max_evicted_seq_.load();
485 if (UNLIKELY(seq <= new_max)) {
486 // This should not happen in normal case
487 ROCKS_LOG_ERROR(
488 info_log_,
489 "Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
490 " <= %" PRIu64,
491 seq, new_max);
492 CheckPreparedAgainstMax(new_max, true /*locked*/);
493 }
494 if (!locked) {
495 prepared_txns_.push_pop_mutex()->Unlock();
496 }
497 TEST_SYNC_POINT("AddPrepared::end");
498 }
499
500 void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
501 uint8_t loop_cnt) {
502 ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
503 prepare_seq, commit_seq);
504 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
505 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
506 auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
507 CommitEntry64b evicted_64b;
508 CommitEntry evicted;
509 bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
510 if (LIKELY(to_be_evicted)) {
511 assert(evicted.prep_seq != prepare_seq);
512 auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
513 ROCKS_LOG_DETAILS(info_log_,
514 "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
515 evicted.prep_seq, evicted.commit_seq, prev_max);
516 if (prev_max < evicted.commit_seq) {
517 auto last = db_impl_->GetLastPublishedSequence(); // could be 0
518 SequenceNumber max_evicted_seq;
519 if (LIKELY(evicted.commit_seq < last)) {
520 assert(last > 0);
521 // Inc max in larger steps to avoid frequent updates
522 max_evicted_seq =
523 std::min(evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED, last - 1);
524 } else {
525 // legit when a commit entry in a write batch overwrite the previous one
526 max_evicted_seq = evicted.commit_seq;
527 }
528 #ifdef OS_LINUX
529 if (rocksdb_write_prepared_TEST_ShouldClearCommitCache &&
530 rocksdb_write_prepared_TEST_ShouldClearCommitCache()) {
531 max_evicted_seq = last;
532 }
533 #endif // OS_LINUX
534 ROCKS_LOG_DETAILS(info_log_,
535 "%lu Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64
536 " => %lu",
537 prepare_seq, evicted.prep_seq, evicted.commit_seq,
538 prev_max, max_evicted_seq);
539 AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
540 }
541 if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) {
542 WriteLock wl(&prepared_mutex_);
543 auto dp_iter = delayed_prepared_.find(evicted.prep_seq);
544 if (dp_iter != delayed_prepared_.end()) {
545 // This is a rare case that txn is committed but prepared_txns_ is not
546 // cleaned up yet. Refer to delayed_prepared_commits_ definition for
547 // why it should be kept updated.
548 delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq;
549 ROCKS_LOG_DEBUG(info_log_,
550 "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64,
551 evicted.prep_seq, evicted.commit_seq);
552 }
553 }
554 // After each eviction from commit cache, check if the commit entry should
555 // be kept around because it overlaps with a live snapshot.
556 CheckAgainstSnapshots(evicted);
557 }
558 bool succ =
559 ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
560 if (UNLIKELY(!succ)) {
561 ROCKS_LOG_ERROR(info_log_,
562 "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64
563 ",%" PRIu64 " retrying...",
564 indexed_seq, prepare_seq, commit_seq);
565 // A very rare event, in which the commit entry is updated before we do.
566 // Here we apply a very simple solution of retrying.
567 if (loop_cnt > 100) {
568 throw std::runtime_error("Infinite loop in AddCommitted!");
569 }
570 AddCommitted(prepare_seq, commit_seq, ++loop_cnt);
571 return;
572 }
573 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
574 TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
575 }
576
577 void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
578 const size_t batch_cnt) {
579 TEST_SYNC_POINT_CALLBACK(
580 "RemovePrepared:Start",
581 const_cast<void*>(reinterpret_cast<const void*>(&prepare_seq)));
582 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause");
583 TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume");
584 ROCKS_LOG_DETAILS(info_log_,
585 "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt,
586 prepare_seq, batch_cnt);
587 WriteLock wl(&prepared_mutex_);
588 for (size_t i = 0; i < batch_cnt; i++) {
589 prepared_txns_.erase(prepare_seq + i);
590 bool was_empty = delayed_prepared_.empty();
591 if (!was_empty) {
592 delayed_prepared_.erase(prepare_seq + i);
593 auto it = delayed_prepared_commits_.find(prepare_seq + i);
594 if (it != delayed_prepared_commits_.end()) {
595 ROCKS_LOG_DETAILS(info_log_, "delayed_prepared_commits_.erase %" PRIu64,
596 prepare_seq + i);
597 delayed_prepared_commits_.erase(it);
598 }
599 bool is_empty = delayed_prepared_.empty();
600 if (was_empty != is_empty) {
601 delayed_prepared_empty_.store(is_empty, std::memory_order_release);
602 }
603 }
604 }
605 }
606
607 bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
608 CommitEntry64b* entry_64b,
609 CommitEntry* entry) const {
610 *entry_64b = commit_cache_[static_cast<size_t>(indexed_seq)].load(
611 std::memory_order_acquire);
612 bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
613 return valid;
614 }
615
616 bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
617 const CommitEntry& new_entry,
618 CommitEntry* evicted_entry) {
619 CommitEntry64b new_entry_64b(new_entry, FORMAT);
620 CommitEntry64b evicted_entry_64b =
621 commit_cache_[static_cast<size_t>(indexed_seq)].exchange(
622 new_entry_64b, std::memory_order_acq_rel);
623 bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
624 return valid;
625 }
626
627 bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
628 CommitEntry64b& expected_entry_64b,
629 const CommitEntry& new_entry) {
630 auto& atomic_entry = commit_cache_[static_cast<size_t>(indexed_seq)];
631 CommitEntry64b new_entry_64b(new_entry, FORMAT);
632 bool succ = atomic_entry.compare_exchange_strong(
633 expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
634 std::memory_order_acquire);
635 return succ;
636 }
637
638 void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
639 const SequenceNumber& new_max) {
640 ROCKS_LOG_DETAILS(info_log_,
641 "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
642 prev_max, new_max);
643 // Declare the intention before getting snapshot from the DB. This helps a
644 // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if
645 // it has not already. Otherwise the new snapshot is when we ask DB for
646 // snapshots smaller than future max.
647 auto updated_future_max = prev_max;
648 while (updated_future_max < new_max &&
649 !future_max_evicted_seq_.compare_exchange_weak(
650 updated_future_max, new_max, std::memory_order_acq_rel,
651 std::memory_order_relaxed)) {
652 };
653
654 CheckPreparedAgainstMax(new_max, false /*locked*/);
655
656 // With each change to max_evicted_seq_ fetch the live snapshots behind it.
657 // We use max as the version of snapshots to identify how fresh are the
658 // snapshot list. This works because the snapshots are between 0 and
659 // max, so the larger the max, the more complete they are.
660 SequenceNumber new_snapshots_version = new_max;
661 std::vector<SequenceNumber> snapshots;
662 bool update_snapshots = false;
663 if (new_snapshots_version > snapshots_version_) {
664 // This is to avoid updating the snapshots_ if it already updated
665 // with a more recent vesion by a concrrent thread
666 update_snapshots = true;
667 // We only care about snapshots lower then max
668 snapshots = GetSnapshotListFromDB(new_max);
669 }
670 if (update_snapshots) {
671 UpdateSnapshots(snapshots, new_snapshots_version);
672 if (!snapshots.empty()) {
673 WriteLock wl(&old_commit_map_mutex_);
674 for (auto snap : snapshots) {
675 // This allows IsInSnapshot to tell apart the reads from in valid
676 // snapshots from the reads from committed values in valid snapshots.
677 old_commit_map_[snap];
678 }
679 old_commit_map_empty_.store(false, std::memory_order_release);
680 }
681 }
682 auto updated_prev_max = prev_max;
683 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause");
684 TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume");
685 while (updated_prev_max < new_max &&
686 !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
687 std::memory_order_acq_rel,
688 std::memory_order_relaxed)) {
689 };
690 }
691
692 const Snapshot* WritePreparedTxnDB::GetSnapshot() {
693 const bool kForWWConflictCheck = true;
694 return GetSnapshotInternal(!kForWWConflictCheck);
695 }
696
697 SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
698 bool for_ww_conflict_check) {
699 // Note: for this optimization setting the last sequence number and obtaining
700 // the smallest uncommitted seq should be done atomically. However to avoid
701 // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
702 // snapshot. Since we always updated the list of unprepared seq (via
703 // AddPrepared) AFTER the last sequence is updated, this guarantees that the
704 // smallest uncommitted seq that we pair with the snapshot is smaller or equal
705 // the value that would be obtained otherwise atomically. That is ok since
706 // this optimization works as long as min_uncommitted is less than or equal
707 // than the smallest uncommitted seq when the snapshot was taken.
708 auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
709 SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
710 TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:first");
711 assert(snap_impl);
712 SequenceNumber snap_seq = snap_impl->GetSequenceNumber();
713 // Note: Check against future_max_evicted_seq_ (in contrast with
714 // max_evicted_seq_) in case there is a concurrent AdvanceMaxEvictedSeq.
715 if (UNLIKELY(snap_seq != 0 && snap_seq <= future_max_evicted_seq_)) {
716 // There is a very rare case in which the commit entry evicts another commit
717 // entry that is not published yet thus advancing max evicted seq beyond the
718 // last published seq. This case is not likely in real-world setup so we
719 // handle it with a few retries.
720 size_t retry = 0;
721 SequenceNumber max;
722 while ((max = future_max_evicted_seq_.load()) != 0 &&
723 snap_impl->GetSequenceNumber() <= max && retry < 100) {
724 ROCKS_LOG_WARN(info_log_,
725 "GetSnapshot snap: %" PRIu64 " max: %" PRIu64
726 " retry %" ROCKSDB_PRIszt,
727 snap_impl->GetSequenceNumber(), max, retry);
728 ReleaseSnapshot(snap_impl);
729 // Wait for last visible seq to catch up with max, and also go beyond it
730 // by one.
731 AdvanceSeqByOne();
732 snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
733 assert(snap_impl);
734 retry++;
735 }
736 assert(snap_impl->GetSequenceNumber() > max);
737 if (snap_impl->GetSequenceNumber() <= max) {
738 throw std::runtime_error(
739 "Snapshot seq " + std::to_string(snap_impl->GetSequenceNumber()) +
740 " after " + std::to_string(retry) +
741 " retries is still less than futre_max_evicted_seq_" +
742 std::to_string(max));
743 }
744 }
745 EnhanceSnapshot(snap_impl, min_uncommitted);
746 ROCKS_LOG_DETAILS(
747 db_impl_->immutable_db_options().info_log,
748 "GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64,
749 snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted);
750 TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:end");
751 return snap_impl;
752 }
753
754 void WritePreparedTxnDB::AdvanceSeqByOne() {
755 // Inserting an empty value will i) let the max evicted entry to be
756 // published, i.e., max == last_published, increase the last published to
757 // be one beyond max, i.e., max < last_published.
758 WriteOptions woptions;
759 TransactionOptions txn_options;
760 Transaction* txn0 = BeginTransaction(woptions, txn_options, nullptr);
761 std::hash<std::thread::id> hasher;
762 char name[64];
763 snprintf(name, 64, "txn%" ROCKSDB_PRIszt, hasher(std::this_thread::get_id()));
764 assert(strlen(name) < 64 - 1);
765 Status s = txn0->SetName(name);
766 assert(s.ok());
767 if (s.ok()) {
768 // Without prepare it would simply skip the commit
769 s = txn0->Prepare();
770 }
771 assert(s.ok());
772 if (s.ok()) {
773 s = txn0->Commit();
774 }
775 assert(s.ok());
776 delete txn0;
777 }
778
779 const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
780 SequenceNumber max) {
781 ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
782 InstrumentedMutexLock dblock(db_impl_->mutex());
783 db_impl_->mutex()->AssertHeld();
784 return db_impl_->snapshots().GetAll(nullptr, max);
785 }
786
787 void WritePreparedTxnDB::ReleaseSnapshotInternal(
788 const SequenceNumber snap_seq) {
789 // TODO(myabandeh): relax should enough since the synchronizatin is already
790 // done by snapshots_mutex_ under which this function is called.
791 if (snap_seq <= max_evicted_seq_.load(std::memory_order_acquire)) {
792 // Then this is a rare case that transaction did not finish before max
793 // advances. It is expected for a few read-only backup snapshots. For such
794 // snapshots we might have kept around a couple of entries in the
795 // old_commit_map_. Check and do garbage collection if that is the case.
796 bool need_gc = false;
797 {
798 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
799 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
800 snap_seq);
801 ReadLock rl(&old_commit_map_mutex_);
802 auto prep_set_entry = old_commit_map_.find(snap_seq);
803 need_gc = prep_set_entry != old_commit_map_.end();
804 }
805 if (need_gc) {
806 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
807 ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
808 snap_seq);
809 WriteLock wl(&old_commit_map_mutex_);
810 old_commit_map_.erase(snap_seq);
811 old_commit_map_empty_.store(old_commit_map_.empty(),
812 std::memory_order_release);
813 }
814 }
815 }
816
817 void WritePreparedTxnDB::CleanupReleasedSnapshots(
818 const std::vector<SequenceNumber>& new_snapshots,
819 const std::vector<SequenceNumber>& old_snapshots) {
820 auto newi = new_snapshots.begin();
821 auto oldi = old_snapshots.begin();
822 for (; newi != new_snapshots.end() && oldi != old_snapshots.end();) {
823 assert(*newi >= *oldi); // cannot have new snapshots with lower seq
824 if (*newi == *oldi) { // still not released
825 auto value = *newi;
826 while (newi != new_snapshots.end() && *newi == value) {
827 newi++;
828 }
829 while (oldi != old_snapshots.end() && *oldi == value) {
830 oldi++;
831 }
832 } else {
833 assert(*newi > *oldi); // *oldi is released
834 ReleaseSnapshotInternal(*oldi);
835 oldi++;
836 }
837 }
838 // Everything remained in old_snapshots is released and must be cleaned up
839 for (; oldi != old_snapshots.end(); oldi++) {
840 ReleaseSnapshotInternal(*oldi);
841 }
842 }
843
844 void WritePreparedTxnDB::UpdateSnapshots(
845 const std::vector<SequenceNumber>& snapshots,
846 const SequenceNumber& version) {
847 ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
848 version);
849 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
850 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
851 #ifndef NDEBUG
852 size_t sync_i = 0;
853 #endif
854 ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead");
855 WriteLock wl(&snapshots_mutex_);
856 snapshots_version_ = version;
857 // We update the list concurrently with the readers.
858 // Both new and old lists are sorted and the new list is subset of the
859 // previous list plus some new items. Thus if a snapshot repeats in
860 // both new and old lists, it will appear upper in the new list. So if
861 // we simply insert the new snapshots in order, if an overwritten item
862 // is still valid in the new list is either written to the same place in
863 // the array or it is written in a higher palce before it gets
864 // overwritten by another item. This guarantess a reader that reads the
865 // list bottom-up will eventaully see a snapshot that repeats in the
866 // update, either before it gets overwritten by the writer or
867 // afterwards.
868 size_t i = 0;
869 auto it = snapshots.begin();
870 for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; ++it, ++i) {
871 snapshot_cache_[i].store(*it, std::memory_order_release);
872 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
873 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
874 }
875 #ifndef NDEBUG
876 // Release the remaining sync points since they are useless given that the
877 // reader would also use lock to access snapshots
878 for (++sync_i; sync_i <= 10; ++sync_i) {
879 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
880 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
881 }
882 #endif
883 snapshots_.clear();
884 for (; it != snapshots.end(); ++it) {
885 // Insert them to a vector that is less efficient to access
886 // concurrently
887 snapshots_.push_back(*it);
888 }
889 // Update the size at the end. Otherwise a parallel reader might read
890 // items that are not set yet.
891 snapshots_total_.store(snapshots.size(), std::memory_order_release);
892
893 // Note: this must be done after the snapshots data structures are updated
894 // with the new list of snapshots.
895 CleanupReleasedSnapshots(snapshots, snapshots_all_);
896 snapshots_all_ = snapshots;
897
898 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
899 TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
900 }
901
902 void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
903 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
904 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
905 #ifndef NDEBUG
906 size_t sync_i = 0;
907 #endif
908 // First check the snapshot cache that is efficient for concurrent access
909 auto cnt = snapshots_total_.load(std::memory_order_acquire);
910 // The list might get updated concurrently as we are reading from it. The
911 // reader should be able to read all the snapshots that are still valid
912 // after the update. Since the survived snapshots are written in a higher
913 // place before gets overwritten the reader that reads bottom-up will
914 // eventully see it.
915 const bool next_is_larger = true;
916 // We will set to true if the border line snapshot suggests that.
917 bool search_larger_list = false;
918 size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
919 for (; 0 < ip1; ip1--) {
920 SequenceNumber snapshot_seq =
921 snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
922 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
923 ++sync_i);
924 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
925 if (ip1 == SNAPSHOT_CACHE_SIZE) { // border line snapshot
926 // snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
927 // then later also continue the search to larger snapshots
928 search_larger_list = snapshot_seq < evicted.commit_seq;
929 }
930 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
931 snapshot_seq, !next_is_larger)) {
932 break;
933 }
934 }
935 #ifndef NDEBUG
936 // Release the remaining sync points before accquiring the lock
937 for (++sync_i; sync_i <= 10; ++sync_i) {
938 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
939 TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
940 }
941 #endif
942 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
943 TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
944 if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) {
945 // Then access the less efficient list of snapshots_
946 WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
947 ROCKS_LOG_WARN(info_log_,
948 "snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64
949 "> with %" ROCKSDB_PRIszt " snapshots",
950 evicted.prep_seq, evicted.commit_seq, cnt);
951 ReadLock rl(&snapshots_mutex_);
952 // Items could have moved from the snapshots_ to snapshot_cache_ before
953 // accquiring the lock. To make sure that we do not miss a valid snapshot,
954 // read snapshot_cache_ again while holding the lock.
955 for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
956 SequenceNumber snapshot_seq =
957 snapshot_cache_[i].load(std::memory_order_acquire);
958 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
959 snapshot_seq, next_is_larger)) {
960 break;
961 }
962 }
963 for (auto snapshot_seq_2 : snapshots_) {
964 if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
965 snapshot_seq_2, next_is_larger)) {
966 break;
967 }
968 }
969 }
970 }
971
972 bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
973 const uint64_t& prep_seq, const uint64_t& commit_seq,
974 const uint64_t& snapshot_seq, const bool next_is_larger = true) {
975 // If we do not store an entry in old_commit_map_ we assume it is committed in
976 // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
977 // the snapshot so we need not to keep the entry around for this snapshot.
978 if (commit_seq <= snapshot_seq) {
979 // continue the search if the next snapshot could be smaller than commit_seq
980 return !next_is_larger;
981 }
982 // then snapshot_seq < commit_seq
983 if (prep_seq <= snapshot_seq) { // overlapping range
984 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
985 ROCKS_LOG_WARN(info_log_,
986 "old_commit_map_mutex_ overhead for %" PRIu64
987 " commit entry: <%" PRIu64 ",%" PRIu64 ">",
988 snapshot_seq, prep_seq, commit_seq);
989 WriteLock wl(&old_commit_map_mutex_);
990 old_commit_map_empty_.store(false, std::memory_order_release);
991 auto& vec = old_commit_map_[snapshot_seq];
992 vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq);
993 // We need to store it once for each overlapping snapshot. Returning true to
994 // continue the search if there is more overlapping snapshot.
995 return true;
996 }
997 // continue the search if the next snapshot could be larger than prep_seq
998 return next_is_larger;
999 }
1000
1001 WritePreparedTxnDB::~WritePreparedTxnDB() {
1002 // At this point there could be running compaction/flush holding a
1003 // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
1004 // Make sure those jobs finished before destructing WritePreparedTxnDB.
1005 if (!db_impl_->shutting_down_) {
1006 db_impl_->CancelAllBackgroundWork(true /*wait*/);
1007 }
1008 }
1009
1010 void SubBatchCounter::InitWithComp(const uint32_t cf) {
1011 auto cmp = comparators_[cf];
1012 keys_[cf] = CFKeys(SetComparator(cmp));
1013 }
1014
1015 void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
1016 CFKeys& cf_keys = keys_[cf];
1017 if (cf_keys.size() == 0) { // just inserted
1018 InitWithComp(cf);
1019 }
1020 auto it = cf_keys.insert(key);
1021 if (it.second == false) { // second is false if a element already existed.
1022 batches_++;
1023 keys_.clear();
1024 InitWithComp(cf);
1025 keys_[cf].insert(key);
1026 }
1027 }
1028
1029 } // namespace ROCKSDB_NAMESPACE
1030 #endif // ROCKSDB_LITE