]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_prepared_txn.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#ifndef ROCKSDB_LITE
7
8#include "utilities/transactions/write_prepared_txn.h"
9
10#ifndef __STDC_FORMAT_MACROS
11#define __STDC_FORMAT_MACROS
12#endif
13
14#include <inttypes.h>
15#include <map>
16#include <set>
17
18#include "db/column_family.h"
19#include "db/db_impl.h"
20#include "rocksdb/db.h"
21#include "rocksdb/status.h"
22#include "rocksdb/utilities/transaction_db.h"
23#include "util/cast_util.h"
24#include "utilities/transactions/pessimistic_transaction.h"
25#include "utilities/transactions/write_prepared_txn_db.h"
26
27namespace rocksdb {
28
29struct WriteOptions;
30
31WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
32 const WriteOptions& write_options,
33 const TransactionOptions& txn_options)
494da23a
TL
34 : PessimisticTransaction(txn_db, write_options, txn_options, false),
35 wpt_db_(txn_db) {
36 // Call Initialize outside PessimisticTransaction constructor otherwise it
37 // would skip overridden functions in WritePreparedTxn since they are not
38 // defined yet in the constructor of PessimisticTransaction
39 Initialize(txn_options);
40}
11fdf7f2
TL
41
42void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
43 PessimisticTransaction::Initialize(txn_options);
44 prepare_batch_cnt_ = 0;
45}
46
47Status WritePreparedTxn::Get(const ReadOptions& read_options,
48 ColumnFamilyHandle* column_family,
49 const Slice& key, PinnableSlice* pinnable_val) {
50 auto snapshot = read_options.snapshot;
51 auto snap_seq =
52 snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
494da23a
TL
53 SequenceNumber min_uncommitted =
54 kMinUnCommittedSeq; // by default disable the optimization
11fdf7f2
TL
55 if (snapshot != nullptr) {
56 min_uncommitted =
57 static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
58 ->min_uncommitted_;
59 }
60
61 WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted);
62 return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
63 pinnable_val, &callback);
64}
65
66Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
67 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
68 Iterator* db_iter = wpt_db_->NewIterator(options);
69 assert(db_iter);
70
71 return write_batch_.NewIteratorWithBase(db_iter);
72}
73
74Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
75 ColumnFamilyHandle* column_family) {
76 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
77 Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
78 assert(db_iter);
79
80 return write_batch_.NewIteratorWithBase(column_family, db_iter);
81}
82
83Status WritePreparedTxn::PrepareInternal() {
84 WriteOptions write_options = write_options_;
85 write_options.disableWAL = false;
86 const bool WRITE_AFTER_COMMIT = true;
494da23a 87 const bool kFirstPrepareBatch = true;
11fdf7f2
TL
88 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
89 !WRITE_AFTER_COMMIT);
90 // For each duplicate key we account for a new sub-batch
91 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
494da23a
TL
92 // Having AddPrepared in the PreReleaseCallback allows in-order addition of
93 // prepared entries to PreparedHeap and hence enables an optimization. Refer to
11fdf7f2
TL
94 // SmallestUnCommittedSeq for more details.
95 AddPreparedCallback add_prepared_callback(
494da23a
TL
96 wpt_db_, db_impl_, prepare_batch_cnt_,
97 db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
11fdf7f2
TL
98 const bool DISABLE_MEMTABLE = true;
99 uint64_t seq_used = kMaxSequenceNumber;
100 Status s = db_impl_->WriteImpl(
101 write_options, GetWriteBatch()->GetWriteBatch(),
102 /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE,
103 &seq_used, prepare_batch_cnt_, &add_prepared_callback);
104 assert(!s.ok() || seq_used != kMaxSequenceNumber);
105 auto prepare_seq = seq_used;
106 SetId(prepare_seq);
107 return s;
108}
109
110Status WritePreparedTxn::CommitWithoutPrepareInternal() {
111 // For each duplicate key we account for a new sub-batch
112 const size_t batch_cnt = GetWriteBatch()->SubBatchCnt();
113 return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
114}
115
116Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
117 size_t batch_cnt) {
118 return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this);
119}
120
121Status WritePreparedTxn::CommitInternal() {
122 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
123 "CommitInternal prepare_seq: %" PRIu64, GetID());
124 // We take the commit-time batch and append the Commit marker.
125 // The Memtable will ignore the Commit marker in non-recovery mode
126 WriteBatch* working_batch = GetCommitTimeWriteBatch();
127 const bool empty = working_batch->Count() == 0;
128 WriteBatchInternal::MarkCommit(working_batch, name_);
129
130 const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
131 if (!empty && for_recovery) {
132 // When not writing to memtable, we can still cache the latest write batch.
133 // The cached batch will be written to memtable in WriteRecoverableState
134 // during FlushMemTable
135 WriteBatchInternal::SetAsLastestPersistentState(working_batch);
136 }
137
138 auto prepare_seq = GetId();
139 const bool includes_data = !empty && !for_recovery;
140 assert(prepare_batch_cnt_);
141 size_t commit_batch_cnt = 0;
142 if (UNLIKELY(includes_data)) {
143 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
144 "Duplicate key overhead");
145 SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
146 auto s = working_batch->Iterate(&counter);
147 assert(s.ok());
148 commit_batch_cnt = counter.BatchCount();
149 }
150 const bool disable_memtable = !includes_data;
151 const bool do_one_write =
152 !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
11fdf7f2 153 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
494da23a
TL
154 wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
155 // This is to call AddPrepared on CommitTimeWriteBatch
156 const bool kFirstPrepareBatch = true;
157 AddPreparedCallback add_prepared_callback(
158 wpt_db_, db_impl_, commit_batch_cnt,
159 db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
160 PreReleaseCallback* pre_release_callback;
161 if (do_one_write) {
162 pre_release_callback = &update_commit_map;
163 } else {
164 pre_release_callback = &add_prepared_callback;
165 }
11fdf7f2
TL
166 uint64_t seq_used = kMaxSequenceNumber;
167 // Since the prepared batch is directly written to memtable, there is already
168 // a connection between the memtable and its WAL, so there is no need to
169 // redundantly reference the log that contains the prepared data.
170 const uint64_t zero_log_number = 0ull;
171 size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
172 auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
173 zero_log_number, disable_memtable, &seq_used,
494da23a 174 batch_cnt, pre_release_callback);
11fdf7f2 175 assert(!s.ok() || seq_used != kMaxSequenceNumber);
494da23a 176 const SequenceNumber commit_batch_seq = seq_used;
11fdf7f2
TL
177 if (LIKELY(do_one_write || !s.ok())) {
178 if (LIKELY(s.ok())) {
179 // Note RemovePrepared should be called after WriteImpl that publishsed
180 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
181 wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
182 }
494da23a
TL
183 if (UNLIKELY(!do_one_write)) {
184 wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
185 }
11fdf7f2
TL
186 return s;
187 } // else do the 2nd write to publish seq
188 // Note: the 2nd write comes with a performance penality. So if we have too
189 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
190 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
191 // two_write_queues should be disabled to avoid many additional writes here.
494da23a
TL
192 const size_t kZeroData = 0;
193 // Update commit map only from the 2nd queue
194 WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch(
195 wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
196 commit_batch_seq, commit_batch_cnt);
11fdf7f2
TL
197 WriteBatch empty_batch;
198 empty_batch.PutLogData(Slice());
199 // In the absence of Prepare markers, use Noop as a batch separator
200 WriteBatchInternal::InsertNoop(&empty_batch);
201 const bool DISABLE_MEMTABLE = true;
202 const size_t ONE_BATCH = 1;
203 const uint64_t NO_REF_LOG = 0;
204 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
205 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
494da23a 206 &update_commit_map_with_aux_batch);
11fdf7f2
TL
207 assert(!s.ok() || seq_used != kMaxSequenceNumber);
208 // Note RemovePrepared should be called after WriteImpl that publishsed the
209 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
210 wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
494da23a 211 wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
11fdf7f2
TL
212 return s;
213}
214
215Status WritePreparedTxn::RollbackInternal() {
216 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
217 "RollbackInternal prepare_seq: %" PRIu64, GetId());
218 WriteBatch rollback_batch;
219 assert(GetId() != kMaxSequenceNumber);
220 assert(GetId() > 0);
221 auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
222 auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
494da23a 223 auto read_at_seq = kMaxSequenceNumber;
11fdf7f2
TL
224 struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
225 DBImpl* db_;
226 ReadOptions roptions;
227 WritePreparedTxnReadCallback callback;
228 WriteBatch* rollback_batch_;
229 std::map<uint32_t, const Comparator*>& comparators_;
230 std::map<uint32_t, ColumnFamilyHandle*>& handles_;
231 using CFKeys = std::set<Slice, SetComparator>;
232 std::map<uint32_t, CFKeys> keys_;
233 bool rollback_merge_operands_;
234 RollbackWriteBatchBuilder(
235 DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
236 WriteBatch* dst_batch,
237 std::map<uint32_t, const Comparator*>& comparators,
238 std::map<uint32_t, ColumnFamilyHandle*>& handles,
239 bool rollback_merge_operands)
240 : db_(db),
494da23a 241 callback(wpt_db, snap_seq), // disable min_uncommitted optimization
11fdf7f2
TL
242 rollback_batch_(dst_batch),
243 comparators_(comparators),
244 handles_(handles),
245 rollback_merge_operands_(rollback_merge_operands) {}
246
247 Status Rollback(uint32_t cf, const Slice& key) {
248 Status s;
249 CFKeys& cf_keys = keys_[cf];
250 if (cf_keys.size() == 0) { // just inserted
251 auto cmp = comparators_[cf];
252 keys_[cf] = CFKeys(SetComparator(cmp));
253 }
254 auto it = cf_keys.insert(key);
255 if (it.second ==
256 false) { // second is false if a element already existed.
257 return s;
258 }
259
260 PinnableSlice pinnable_val;
261 bool not_used;
262 auto cf_handle = handles_[cf];
263 s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
264 &callback);
265 assert(s.ok() || s.IsNotFound());
266 if (s.ok()) {
267 s = rollback_batch_->Put(cf_handle, key, pinnable_val);
268 assert(s.ok());
269 } else if (s.IsNotFound()) {
270 // There has been no readable value before txn. By adding a delete we
271 // make sure that there will be none afterwards either.
272 s = rollback_batch_->Delete(cf_handle, key);
273 assert(s.ok());
274 } else {
275 // Unexpected status. Return it to the user.
276 }
277 return s;
278 }
279
280 Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override {
281 return Rollback(cf, key);
282 }
283
284 Status DeleteCF(uint32_t cf, const Slice& key) override {
285 return Rollback(cf, key);
286 }
287
288 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
289 return Rollback(cf, key);
290 }
291
292 Status MergeCF(uint32_t cf, const Slice& key,
293 const Slice& /*val*/) override {
294 if (rollback_merge_operands_) {
295 return Rollback(cf, key);
296 } else {
297 return Status::OK();
298 }
299 }
300
301 Status MarkNoop(bool) override { return Status::OK(); }
302 Status MarkBeginPrepare(bool) override { return Status::OK(); }
303 Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
304 Status MarkCommit(const Slice&) override { return Status::OK(); }
305 Status MarkRollback(const Slice&) override {
306 return Status::InvalidArgument();
307 }
308
309 protected:
494da23a
TL
310 bool WriteAfterCommit() const override { return false; }
311 } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
11fdf7f2
TL
312 *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
313 wpt_db_->txn_db_options_.rollback_merge_operands);
314 auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
315 assert(s.ok());
316 if (!s.ok()) {
317 return s;
318 }
319 // The Rollback marker will be used as a batch separator
320 WriteBatchInternal::MarkRollback(&rollback_batch, name_);
321 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
322 const bool DISABLE_MEMTABLE = true;
323 const uint64_t NO_REF_LOG = 0;
324 uint64_t seq_used = kMaxSequenceNumber;
325 const size_t ONE_BATCH = 1;
494da23a
TL
326 const bool kFirstPrepareBatch = true;
327 // We commit the rolled back prepared batches. Although this is
11fdf7f2
TL
328 // counter-intuitive, i) it is safe to do so, since the prepared batches are
329 // already canceled out by the rollback batch, ii) adding the commit entry to
330 // CommitCache will allow us to benefit from the existing mechanism in
331 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
332 // with a live snapshot around so that the live snapshot properly skips the
333 // entry even if its prepare seq is lower than max_evicted_seq_.
494da23a
TL
334 AddPreparedCallback add_prepared_callback(
335 wpt_db_, db_impl_, ONE_BATCH,
336 db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
11fdf7f2
TL
337 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
338 wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
494da23a
TL
339 PreReleaseCallback* pre_release_callback;
340 if (do_one_write) {
341 pre_release_callback = &update_commit_map;
342 } else {
343 pre_release_callback = &add_prepared_callback;
344 }
11fdf7f2
TL
345 // Note: the rollback batch does not need AddPrepared since it is written to
346 // DB in one shot. min_uncommitted still works since it requires capturing
347 // data that is written to DB but not yet committed, while
494da23a 348 // the rollback batch commits with PreReleaseCallback.
11fdf7f2
TL
349 s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
350 NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
494da23a 351 pre_release_callback);
11fdf7f2
TL
352 assert(!s.ok() || seq_used != kMaxSequenceNumber);
353 if (!s.ok()) {
354 return s;
355 }
356 if (do_one_write) {
357 wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
358 return s;
359 } // else do the 2nd write for commit
494da23a 360 uint64_t rollback_seq = seq_used;
11fdf7f2 361 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
494da23a
TL
362 "RollbackInternal 2nd write rollback_seq: %" PRIu64,
363 rollback_seq);
11fdf7f2
TL
364 // Commit the batch by writing an empty batch to the queue that will release
365 // the commit sequence number to readers.
494da23a
TL
366 WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
367 wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
11fdf7f2
TL
368 WriteBatch empty_batch;
369 empty_batch.PutLogData(Slice());
370 // In the absence of Prepare markers, use Noop as a batch separator
371 WriteBatchInternal::InsertNoop(&empty_batch);
372 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
373 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
374 &update_commit_map_with_prepare);
375 assert(!s.ok() || seq_used != kMaxSequenceNumber);
494da23a
TL
376 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
377 "RollbackInternal (status=%s) commit: %" PRIu64,
378 s.ToString().c_str(), GetId());
11fdf7f2 379 if (s.ok()) {
11fdf7f2
TL
380 wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
381 }
494da23a 382 wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
11fdf7f2
TL
383
384 return s;
385}
386
387Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
388 const Slice& key,
389 SequenceNumber* tracked_at_seq) {
390 assert(snapshot_);
391
392 SequenceNumber min_uncommitted =
393 static_cast_with_check<const SnapshotImpl, const Snapshot>(
394 snapshot_.get())
395 ->min_uncommitted_;
396 SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
397 // tracked_at_seq is either max or the last snapshot with which this key was
398 // trackeed so there is no need to apply the IsInSnapshot to this comparison
399 // here as tracked_at_seq is not a prepare seq.
400 if (*tracked_at_seq <= snap_seq) {
401 // If the key has been previous validated at a sequence number earlier
402 // than the curent snapshot's sequence number, we already know it has not
403 // been modified.
404 return Status::OK();
405 }
406
407 *tracked_at_seq = snap_seq;
408
409 ColumnFamilyHandle* cfh =
410 column_family ? column_family : db_impl_->DefaultColumnFamily();
411
412 WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted);
413 return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
414 snap_seq, false /* cache_only */,
494da23a 415 &snap_checker, min_uncommitted);
11fdf7f2
TL
416}
417
418void WritePreparedTxn::SetSnapshot() {
494da23a
TL
419 const bool kForWWConflictCheck = true;
420 SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck);
11fdf7f2
TL
421 SetSnapshotInternal(snapshot);
422}
423
424Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
425 auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
426 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
427 return ret;
428}
429
430} // namespace rocksdb
431
432#endif // ROCKSDB_LITE