]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_prepared_txn.cc
build: use dgit for download target
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#ifndef ROCKSDB_LITE
7
8#include "utilities/transactions/write_prepared_txn.h"
9
10#ifndef __STDC_FORMAT_MACROS
11#define __STDC_FORMAT_MACROS
12#endif
13
14#include <inttypes.h>
15#include <map>
16#include <set>
17
18#include "db/column_family.h"
19#include "db/db_impl.h"
20#include "rocksdb/db.h"
21#include "rocksdb/status.h"
22#include "rocksdb/utilities/transaction_db.h"
23#include "util/cast_util.h"
24#include "utilities/transactions/pessimistic_transaction.h"
25#include "utilities/transactions/write_prepared_txn_db.h"
26
27namespace rocksdb {
28
29struct WriteOptions;
30
31WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
32 const WriteOptions& write_options,
33 const TransactionOptions& txn_options)
34 : PessimisticTransaction(txn_db, write_options, txn_options),
35 wpt_db_(txn_db) {}
36
37void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
38 PessimisticTransaction::Initialize(txn_options);
39 prepare_batch_cnt_ = 0;
40}
41
42Status WritePreparedTxn::Get(const ReadOptions& read_options,
43 ColumnFamilyHandle* column_family,
44 const Slice& key, PinnableSlice* pinnable_val) {
45 auto snapshot = read_options.snapshot;
46 auto snap_seq =
47 snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
48 SequenceNumber min_uncommitted = 0; // by default disable the optimization
49 if (snapshot != nullptr) {
50 min_uncommitted =
51 static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
52 ->min_uncommitted_;
53 }
54
55 WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted);
56 return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
57 pinnable_val, &callback);
58}
59
60Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
61 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
62 Iterator* db_iter = wpt_db_->NewIterator(options);
63 assert(db_iter);
64
65 return write_batch_.NewIteratorWithBase(db_iter);
66}
67
68Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
69 ColumnFamilyHandle* column_family) {
70 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
71 Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
72 assert(db_iter);
73
74 return write_batch_.NewIteratorWithBase(column_family, db_iter);
75}
76
77Status WritePreparedTxn::PrepareInternal() {
78 WriteOptions write_options = write_options_;
79 write_options.disableWAL = false;
80 const bool WRITE_AFTER_COMMIT = true;
81 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
82 !WRITE_AFTER_COMMIT);
83 // For each duplicate key we account for a new sub-batch
84 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
85 // AddPrepared better to be called in the pre-release callback otherwise there
86 // is a non-zero chance of max advancing prepare_seq and readers assume the
87 // data as committed.
88 // Also having it in the PreReleaseCallback allows in-order addition of
89 // prepared entries to PrepareHeap and hence enables an optimization. Refer to
90 // SmallestUnCommittedSeq for more details.
91 AddPreparedCallback add_prepared_callback(
92 wpt_db_, prepare_batch_cnt_,
93 db_impl_->immutable_db_options().two_write_queues);
94 const bool DISABLE_MEMTABLE = true;
95 uint64_t seq_used = kMaxSequenceNumber;
96 Status s = db_impl_->WriteImpl(
97 write_options, GetWriteBatch()->GetWriteBatch(),
98 /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE,
99 &seq_used, prepare_batch_cnt_, &add_prepared_callback);
100 assert(!s.ok() || seq_used != kMaxSequenceNumber);
101 auto prepare_seq = seq_used;
102 SetId(prepare_seq);
103 return s;
104}
105
106Status WritePreparedTxn::CommitWithoutPrepareInternal() {
107 // For each duplicate key we account for a new sub-batch
108 const size_t batch_cnt = GetWriteBatch()->SubBatchCnt();
109 return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
110}
111
112Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
113 size_t batch_cnt) {
114 return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this);
115}
116
117Status WritePreparedTxn::CommitInternal() {
118 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
119 "CommitInternal prepare_seq: %" PRIu64, GetID());
120 // We take the commit-time batch and append the Commit marker.
121 // The Memtable will ignore the Commit marker in non-recovery mode
122 WriteBatch* working_batch = GetCommitTimeWriteBatch();
123 const bool empty = working_batch->Count() == 0;
124 WriteBatchInternal::MarkCommit(working_batch, name_);
125
126 const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
127 if (!empty && for_recovery) {
128 // When not writing to memtable, we can still cache the latest write batch.
129 // The cached batch will be written to memtable in WriteRecoverableState
130 // during FlushMemTable
131 WriteBatchInternal::SetAsLastestPersistentState(working_batch);
132 }
133
134 auto prepare_seq = GetId();
135 const bool includes_data = !empty && !for_recovery;
136 assert(prepare_batch_cnt_);
137 size_t commit_batch_cnt = 0;
138 if (UNLIKELY(includes_data)) {
139 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
140 "Duplicate key overhead");
141 SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
142 auto s = working_batch->Iterate(&counter);
143 assert(s.ok());
144 commit_batch_cnt = counter.BatchCount();
145 }
146 const bool disable_memtable = !includes_data;
147 const bool do_one_write =
148 !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
149 const bool publish_seq = do_one_write;
150 // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
151 // DB in one shot. min_uncommitted still works since it requires capturing
152 // data that is written to DB but not yet committed, while
153 // CommitTimeWriteBatch commits with PreReleaseCallback.
154 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
155 wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt,
156 publish_seq);
157 uint64_t seq_used = kMaxSequenceNumber;
158 // Since the prepared batch is directly written to memtable, there is already
159 // a connection between the memtable and its WAL, so there is no need to
160 // redundantly reference the log that contains the prepared data.
161 const uint64_t zero_log_number = 0ull;
162 size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
163 auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
164 zero_log_number, disable_memtable, &seq_used,
165 batch_cnt, &update_commit_map);
166 assert(!s.ok() || seq_used != kMaxSequenceNumber);
167 if (LIKELY(do_one_write || !s.ok())) {
168 if (LIKELY(s.ok())) {
169 // Note RemovePrepared should be called after WriteImpl that publishsed
170 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
171 wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
172 }
173 return s;
174 } // else do the 2nd write to publish seq
175 // Note: the 2nd write comes with a performance penality. So if we have too
176 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
177 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
178 // two_write_queues should be disabled to avoid many additional writes here.
179 class PublishSeqPreReleaseCallback : public PreReleaseCallback {
180 public:
181 explicit PublishSeqPreReleaseCallback(DBImpl* db_impl)
182 : db_impl_(db_impl) {}
183 virtual Status Callback(SequenceNumber seq, bool is_mem_disabled) override {
184#ifdef NDEBUG
185 (void)is_mem_disabled;
186#endif
187 assert(is_mem_disabled);
188 assert(db_impl_->immutable_db_options().two_write_queues);
189 db_impl_->SetLastPublishedSequence(seq);
190 return Status::OK();
191 }
192
193 private:
194 DBImpl* db_impl_;
195 } publish_seq_callback(db_impl_);
196 WriteBatch empty_batch;
197 empty_batch.PutLogData(Slice());
198 // In the absence of Prepare markers, use Noop as a batch separator
199 WriteBatchInternal::InsertNoop(&empty_batch);
200 const bool DISABLE_MEMTABLE = true;
201 const size_t ONE_BATCH = 1;
202 const uint64_t NO_REF_LOG = 0;
203 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
204 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
205 &publish_seq_callback);
206 assert(!s.ok() || seq_used != kMaxSequenceNumber);
207 // Note RemovePrepared should be called after WriteImpl that publishsed the
208 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
209 wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
210 return s;
211}
212
213Status WritePreparedTxn::RollbackInternal() {
214 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
215 "RollbackInternal prepare_seq: %" PRIu64, GetId());
216 WriteBatch rollback_batch;
217 assert(GetId() != kMaxSequenceNumber);
218 assert(GetId() > 0);
219 auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
220 auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
221 // In WritePrepared, the txn is is the same as prepare seq
222 auto last_visible_txn = GetId() - 1;
223 struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
224 DBImpl* db_;
225 ReadOptions roptions;
226 WritePreparedTxnReadCallback callback;
227 WriteBatch* rollback_batch_;
228 std::map<uint32_t, const Comparator*>& comparators_;
229 std::map<uint32_t, ColumnFamilyHandle*>& handles_;
230 using CFKeys = std::set<Slice, SetComparator>;
231 std::map<uint32_t, CFKeys> keys_;
232 bool rollback_merge_operands_;
233 RollbackWriteBatchBuilder(
234 DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
235 WriteBatch* dst_batch,
236 std::map<uint32_t, const Comparator*>& comparators,
237 std::map<uint32_t, ColumnFamilyHandle*>& handles,
238 bool rollback_merge_operands)
239 : db_(db),
240 callback(wpt_db, snap_seq,
241 0), // 0 disables min_uncommitted optimization
242 rollback_batch_(dst_batch),
243 comparators_(comparators),
244 handles_(handles),
245 rollback_merge_operands_(rollback_merge_operands) {}
246
247 Status Rollback(uint32_t cf, const Slice& key) {
248 Status s;
249 CFKeys& cf_keys = keys_[cf];
250 if (cf_keys.size() == 0) { // just inserted
251 auto cmp = comparators_[cf];
252 keys_[cf] = CFKeys(SetComparator(cmp));
253 }
254 auto it = cf_keys.insert(key);
255 if (it.second ==
256 false) { // second is false if a element already existed.
257 return s;
258 }
259
260 PinnableSlice pinnable_val;
261 bool not_used;
262 auto cf_handle = handles_[cf];
263 s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
264 &callback);
265 assert(s.ok() || s.IsNotFound());
266 if (s.ok()) {
267 s = rollback_batch_->Put(cf_handle, key, pinnable_val);
268 assert(s.ok());
269 } else if (s.IsNotFound()) {
270 // There has been no readable value before txn. By adding a delete we
271 // make sure that there will be none afterwards either.
272 s = rollback_batch_->Delete(cf_handle, key);
273 assert(s.ok());
274 } else {
275 // Unexpected status. Return it to the user.
276 }
277 return s;
278 }
279
280 Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override {
281 return Rollback(cf, key);
282 }
283
284 Status DeleteCF(uint32_t cf, const Slice& key) override {
285 return Rollback(cf, key);
286 }
287
288 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
289 return Rollback(cf, key);
290 }
291
292 Status MergeCF(uint32_t cf, const Slice& key,
293 const Slice& /*val*/) override {
294 if (rollback_merge_operands_) {
295 return Rollback(cf, key);
296 } else {
297 return Status::OK();
298 }
299 }
300
301 Status MarkNoop(bool) override { return Status::OK(); }
302 Status MarkBeginPrepare(bool) override { return Status::OK(); }
303 Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
304 Status MarkCommit(const Slice&) override { return Status::OK(); }
305 Status MarkRollback(const Slice&) override {
306 return Status::InvalidArgument();
307 }
308
309 protected:
310 virtual bool WriteAfterCommit() const override { return false; }
311 } rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch,
312 *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
313 wpt_db_->txn_db_options_.rollback_merge_operands);
314 auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
315 assert(s.ok());
316 if (!s.ok()) {
317 return s;
318 }
319 // The Rollback marker will be used as a batch separator
320 WriteBatchInternal::MarkRollback(&rollback_batch, name_);
321 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
322 const bool DISABLE_MEMTABLE = true;
323 const uint64_t NO_REF_LOG = 0;
324 uint64_t seq_used = kMaxSequenceNumber;
325 const size_t ONE_BATCH = 1;
326 // We commit the rolled back prepared batches. ALthough this is
327 // counter-intuitive, i) it is safe to do so, since the prepared batches are
328 // already canceled out by the rollback batch, ii) adding the commit entry to
329 // CommitCache will allow us to benefit from the existing mechanism in
330 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
331 // with a live snapshot around so that the live snapshot properly skips the
332 // entry even if its prepare seq is lower than max_evicted_seq_.
333 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
334 wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
335 // Note: the rollback batch does not need AddPrepared since it is written to
336 // DB in one shot. min_uncommitted still works since it requires capturing
337 // data that is written to DB but not yet committed, while
338 // the roolback batch commits with PreReleaseCallback.
339 s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
340 NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
341 do_one_write ? &update_commit_map : nullptr);
342 assert(!s.ok() || seq_used != kMaxSequenceNumber);
343 if (!s.ok()) {
344 return s;
345 }
346 if (do_one_write) {
347 wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
348 return s;
349 } // else do the 2nd write for commit
350 uint64_t& prepare_seq = seq_used;
351 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
352 "RollbackInternal 2nd write prepare_seq: %" PRIu64,
353 prepare_seq);
354 // Commit the batch by writing an empty batch to the queue that will release
355 // the commit sequence number to readers.
356 const size_t ZERO_COMMITS = 0;
357 WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
358 wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS);
359 WriteBatch empty_batch;
360 empty_batch.PutLogData(Slice());
361 // In the absence of Prepare markers, use Noop as a batch separator
362 WriteBatchInternal::InsertNoop(&empty_batch);
363 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
364 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
365 &update_commit_map_with_prepare);
366 assert(!s.ok() || seq_used != kMaxSequenceNumber);
367 // Mark the txn as rolled back
368 uint64_t& rollback_seq = seq_used;
369 if (s.ok()) {
370 // Note: it is safe to do it after PreReleaseCallback via WriteImpl since
371 // all the writes by the prpared batch are already blinded by the rollback
372 // batch. The only reason we commit the prepared batch here is to benefit
373 // from the existing mechanism in CommitCache that takes care of the rare
374 // cases that the prepare seq is visible to a snsapshot but max evicted seq
375 // advances that prepare seq.
376 for (size_t i = 0; i < prepare_batch_cnt_; i++) {
377 wpt_db_->AddCommitted(GetId() + i, rollback_seq);
378 }
379 wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
380 }
381
382 return s;
383}
384
385Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
386 const Slice& key,
387 SequenceNumber* tracked_at_seq) {
388 assert(snapshot_);
389
390 SequenceNumber min_uncommitted =
391 static_cast_with_check<const SnapshotImpl, const Snapshot>(
392 snapshot_.get())
393 ->min_uncommitted_;
394 SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
395 // tracked_at_seq is either max or the last snapshot with which this key was
396 // trackeed so there is no need to apply the IsInSnapshot to this comparison
397 // here as tracked_at_seq is not a prepare seq.
398 if (*tracked_at_seq <= snap_seq) {
399 // If the key has been previous validated at a sequence number earlier
400 // than the curent snapshot's sequence number, we already know it has not
401 // been modified.
402 return Status::OK();
403 }
404
405 *tracked_at_seq = snap_seq;
406
407 ColumnFamilyHandle* cfh =
408 column_family ? column_family : db_impl_->DefaultColumnFamily();
409
410 WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted);
411 return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
412 snap_seq, false /* cache_only */,
413 &snap_checker);
414}
415
416void WritePreparedTxn::SetSnapshot() {
417 // Note: for this optimization setting the last sequence number and obtaining
418 // the smallest uncommitted seq should be done atomically. However to avoid
419 // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
420 // snapshot. Since we always updated the list of unprepared seq (via
421 // AddPrepared) AFTER the last sequence is updated, this guarantees that the
422 // smallest uncommited seq that we pair with the snapshot is smaller or equal
423 // the value that would be obtained otherwise atomically. That is ok since
424 // this optimization works as long as min_uncommitted is less than or equal
425 // than the smallest uncommitted seq when the snapshot was taken.
426 auto min_uncommitted = wpt_db_->SmallestUnCommittedSeq();
427 const bool FOR_WW_CONFLICT_CHECK = true;
428 SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK);
429 assert(snapshot);
430 wpt_db_->EnhanceSnapshot(snapshot, min_uncommitted);
431 SetSnapshotInternal(snapshot);
432}
433
434Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
435 auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
436 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
437 return ret;
438}
439
440} // namespace rocksdb
441
442#endif // ROCKSDB_LITE