]>
Commit | Line | Data |
---|---|---|
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. | |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "utilities/transactions/write_prepared_txn.h" | |
9 | ||
10 | #ifndef __STDC_FORMAT_MACROS | |
11 | #define __STDC_FORMAT_MACROS | |
12 | #endif | |
13 | ||
14 | #include <inttypes.h> | |
15 | #include <map> | |
16 | #include <set> | |
17 | ||
18 | #include "db/column_family.h" | |
19 | #include "db/db_impl.h" | |
20 | #include "rocksdb/db.h" | |
21 | #include "rocksdb/status.h" | |
22 | #include "rocksdb/utilities/transaction_db.h" | |
23 | #include "util/cast_util.h" | |
24 | #include "utilities/transactions/pessimistic_transaction.h" | |
25 | #include "utilities/transactions/write_prepared_txn_db.h" | |
26 | ||
27 | namespace rocksdb { | |
28 | ||
29 | struct WriteOptions; | |
30 | ||
31 | WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, | |
32 | const WriteOptions& write_options, | |
33 | const TransactionOptions& txn_options) | |
34 | : PessimisticTransaction(txn_db, write_options, txn_options), | |
35 | wpt_db_(txn_db) {} | |
36 | ||
37 | void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) { | |
38 | PessimisticTransaction::Initialize(txn_options); | |
39 | prepare_batch_cnt_ = 0; | |
40 | } | |
41 | ||
42 | Status WritePreparedTxn::Get(const ReadOptions& read_options, | |
43 | ColumnFamilyHandle* column_family, | |
44 | const Slice& key, PinnableSlice* pinnable_val) { | |
45 | auto snapshot = read_options.snapshot; | |
46 | auto snap_seq = | |
47 | snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; | |
48 | SequenceNumber min_uncommitted = 0; // by default disable the optimization | |
49 | if (snapshot != nullptr) { | |
50 | min_uncommitted = | |
51 | static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot) | |
52 | ->min_uncommitted_; | |
53 | } | |
54 | ||
55 | WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted); | |
56 | return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key, | |
57 | pinnable_val, &callback); | |
58 | } | |
59 | ||
60 | Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) { | |
61 | // Make sure to get iterator from WritePrepareTxnDB, not the root db. | |
62 | Iterator* db_iter = wpt_db_->NewIterator(options); | |
63 | assert(db_iter); | |
64 | ||
65 | return write_batch_.NewIteratorWithBase(db_iter); | |
66 | } | |
67 | ||
68 | Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, | |
69 | ColumnFamilyHandle* column_family) { | |
70 | // Make sure to get iterator from WritePrepareTxnDB, not the root db. | |
71 | Iterator* db_iter = wpt_db_->NewIterator(options, column_family); | |
72 | assert(db_iter); | |
73 | ||
74 | return write_batch_.NewIteratorWithBase(column_family, db_iter); | |
75 | } | |
76 | ||
77 | Status WritePreparedTxn::PrepareInternal() { | |
78 | WriteOptions write_options = write_options_; | |
79 | write_options.disableWAL = false; | |
80 | const bool WRITE_AFTER_COMMIT = true; | |
81 | WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, | |
82 | !WRITE_AFTER_COMMIT); | |
83 | // For each duplicate key we account for a new sub-batch | |
84 | prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); | |
85 | // AddPrepared better to be called in the pre-release callback otherwise there | |
86 | // is a non-zero chance of max advancing prepare_seq and readers assume the | |
87 | // data as committed. | |
88 | // Also having it in the PreReleaseCallback allows in-order addition of | |
89 | // prepared entries to PrepareHeap and hence enables an optimization. Refer to | |
90 | // SmallestUnCommittedSeq for more details. | |
91 | AddPreparedCallback add_prepared_callback( | |
92 | wpt_db_, prepare_batch_cnt_, | |
93 | db_impl_->immutable_db_options().two_write_queues); | |
94 | const bool DISABLE_MEMTABLE = true; | |
95 | uint64_t seq_used = kMaxSequenceNumber; | |
96 | Status s = db_impl_->WriteImpl( | |
97 | write_options, GetWriteBatch()->GetWriteBatch(), | |
98 | /*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE, | |
99 | &seq_used, prepare_batch_cnt_, &add_prepared_callback); | |
100 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
101 | auto prepare_seq = seq_used; | |
102 | SetId(prepare_seq); | |
103 | return s; | |
104 | } | |
105 | ||
106 | Status WritePreparedTxn::CommitWithoutPrepareInternal() { | |
107 | // For each duplicate key we account for a new sub-batch | |
108 | const size_t batch_cnt = GetWriteBatch()->SubBatchCnt(); | |
109 | return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt); | |
110 | } | |
111 | ||
112 | Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch, | |
113 | size_t batch_cnt) { | |
114 | return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this); | |
115 | } | |
116 | ||
117 | Status WritePreparedTxn::CommitInternal() { | |
118 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, | |
119 | "CommitInternal prepare_seq: %" PRIu64, GetID()); | |
120 | // We take the commit-time batch and append the Commit marker. | |
121 | // The Memtable will ignore the Commit marker in non-recovery mode | |
122 | WriteBatch* working_batch = GetCommitTimeWriteBatch(); | |
123 | const bool empty = working_batch->Count() == 0; | |
124 | WriteBatchInternal::MarkCommit(working_batch, name_); | |
125 | ||
126 | const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; | |
127 | if (!empty && for_recovery) { | |
128 | // When not writing to memtable, we can still cache the latest write batch. | |
129 | // The cached batch will be written to memtable in WriteRecoverableState | |
130 | // during FlushMemTable | |
131 | WriteBatchInternal::SetAsLastestPersistentState(working_batch); | |
132 | } | |
133 | ||
134 | auto prepare_seq = GetId(); | |
135 | const bool includes_data = !empty && !for_recovery; | |
136 | assert(prepare_batch_cnt_); | |
137 | size_t commit_batch_cnt = 0; | |
138 | if (UNLIKELY(includes_data)) { | |
139 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, | |
140 | "Duplicate key overhead"); | |
141 | SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); | |
142 | auto s = working_batch->Iterate(&counter); | |
143 | assert(s.ok()); | |
144 | commit_batch_cnt = counter.BatchCount(); | |
145 | } | |
146 | const bool disable_memtable = !includes_data; | |
147 | const bool do_one_write = | |
148 | !db_impl_->immutable_db_options().two_write_queues || disable_memtable; | |
149 | const bool publish_seq = do_one_write; | |
150 | // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to | |
151 | // DB in one shot. min_uncommitted still works since it requires capturing | |
152 | // data that is written to DB but not yet committed, while | |
153 | // CommitTimeWriteBatch commits with PreReleaseCallback. | |
154 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( | |
155 | wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt, | |
156 | publish_seq); | |
157 | uint64_t seq_used = kMaxSequenceNumber; | |
158 | // Since the prepared batch is directly written to memtable, there is already | |
159 | // a connection between the memtable and its WAL, so there is no need to | |
160 | // redundantly reference the log that contains the prepared data. | |
161 | const uint64_t zero_log_number = 0ull; | |
162 | size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; | |
163 | auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, | |
164 | zero_log_number, disable_memtable, &seq_used, | |
165 | batch_cnt, &update_commit_map); | |
166 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
167 | if (LIKELY(do_one_write || !s.ok())) { | |
168 | if (LIKELY(s.ok())) { | |
169 | // Note RemovePrepared should be called after WriteImpl that publishsed | |
170 | // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. | |
171 | wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); | |
172 | } | |
173 | return s; | |
174 | } // else do the 2nd write to publish seq | |
175 | // Note: the 2nd write comes with a performance penality. So if we have too | |
176 | // many of commits accompanied with ComitTimeWriteBatch and yet we cannot | |
177 | // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, | |
178 | // two_write_queues should be disabled to avoid many additional writes here. | |
179 | class PublishSeqPreReleaseCallback : public PreReleaseCallback { | |
180 | public: | |
181 | explicit PublishSeqPreReleaseCallback(DBImpl* db_impl) | |
182 | : db_impl_(db_impl) {} | |
183 | virtual Status Callback(SequenceNumber seq, bool is_mem_disabled) override { | |
184 | #ifdef NDEBUG | |
185 | (void)is_mem_disabled; | |
186 | #endif | |
187 | assert(is_mem_disabled); | |
188 | assert(db_impl_->immutable_db_options().two_write_queues); | |
189 | db_impl_->SetLastPublishedSequence(seq); | |
190 | return Status::OK(); | |
191 | } | |
192 | ||
193 | private: | |
194 | DBImpl* db_impl_; | |
195 | } publish_seq_callback(db_impl_); | |
196 | WriteBatch empty_batch; | |
197 | empty_batch.PutLogData(Slice()); | |
198 | // In the absence of Prepare markers, use Noop as a batch separator | |
199 | WriteBatchInternal::InsertNoop(&empty_batch); | |
200 | const bool DISABLE_MEMTABLE = true; | |
201 | const size_t ONE_BATCH = 1; | |
202 | const uint64_t NO_REF_LOG = 0; | |
203 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, | |
204 | NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
205 | &publish_seq_callback); | |
206 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
207 | // Note RemovePrepared should be called after WriteImpl that publishsed the | |
208 | // seq. Otherwise SmallestUnCommittedSeq optimization breaks. | |
209 | wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); | |
210 | return s; | |
211 | } | |
212 | ||
213 | Status WritePreparedTxn::RollbackInternal() { | |
214 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, | |
215 | "RollbackInternal prepare_seq: %" PRIu64, GetId()); | |
216 | WriteBatch rollback_batch; | |
217 | assert(GetId() != kMaxSequenceNumber); | |
218 | assert(GetId() > 0); | |
219 | auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap(); | |
220 | auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap(); | |
221 | // In WritePrepared, the txn is is the same as prepare seq | |
222 | auto last_visible_txn = GetId() - 1; | |
223 | struct RollbackWriteBatchBuilder : public WriteBatch::Handler { | |
224 | DBImpl* db_; | |
225 | ReadOptions roptions; | |
226 | WritePreparedTxnReadCallback callback; | |
227 | WriteBatch* rollback_batch_; | |
228 | std::map<uint32_t, const Comparator*>& comparators_; | |
229 | std::map<uint32_t, ColumnFamilyHandle*>& handles_; | |
230 | using CFKeys = std::set<Slice, SetComparator>; | |
231 | std::map<uint32_t, CFKeys> keys_; | |
232 | bool rollback_merge_operands_; | |
233 | RollbackWriteBatchBuilder( | |
234 | DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, | |
235 | WriteBatch* dst_batch, | |
236 | std::map<uint32_t, const Comparator*>& comparators, | |
237 | std::map<uint32_t, ColumnFamilyHandle*>& handles, | |
238 | bool rollback_merge_operands) | |
239 | : db_(db), | |
240 | callback(wpt_db, snap_seq, | |
241 | 0), // 0 disables min_uncommitted optimization | |
242 | rollback_batch_(dst_batch), | |
243 | comparators_(comparators), | |
244 | handles_(handles), | |
245 | rollback_merge_operands_(rollback_merge_operands) {} | |
246 | ||
247 | Status Rollback(uint32_t cf, const Slice& key) { | |
248 | Status s; | |
249 | CFKeys& cf_keys = keys_[cf]; | |
250 | if (cf_keys.size() == 0) { // just inserted | |
251 | auto cmp = comparators_[cf]; | |
252 | keys_[cf] = CFKeys(SetComparator(cmp)); | |
253 | } | |
254 | auto it = cf_keys.insert(key); | |
255 | if (it.second == | |
256 | false) { // second is false if a element already existed. | |
257 | return s; | |
258 | } | |
259 | ||
260 | PinnableSlice pinnable_val; | |
261 | bool not_used; | |
262 | auto cf_handle = handles_[cf]; | |
263 | s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, | |
264 | &callback); | |
265 | assert(s.ok() || s.IsNotFound()); | |
266 | if (s.ok()) { | |
267 | s = rollback_batch_->Put(cf_handle, key, pinnable_val); | |
268 | assert(s.ok()); | |
269 | } else if (s.IsNotFound()) { | |
270 | // There has been no readable value before txn. By adding a delete we | |
271 | // make sure that there will be none afterwards either. | |
272 | s = rollback_batch_->Delete(cf_handle, key); | |
273 | assert(s.ok()); | |
274 | } else { | |
275 | // Unexpected status. Return it to the user. | |
276 | } | |
277 | return s; | |
278 | } | |
279 | ||
280 | Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override { | |
281 | return Rollback(cf, key); | |
282 | } | |
283 | ||
284 | Status DeleteCF(uint32_t cf, const Slice& key) override { | |
285 | return Rollback(cf, key); | |
286 | } | |
287 | ||
288 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { | |
289 | return Rollback(cf, key); | |
290 | } | |
291 | ||
292 | Status MergeCF(uint32_t cf, const Slice& key, | |
293 | const Slice& /*val*/) override { | |
294 | if (rollback_merge_operands_) { | |
295 | return Rollback(cf, key); | |
296 | } else { | |
297 | return Status::OK(); | |
298 | } | |
299 | } | |
300 | ||
301 | Status MarkNoop(bool) override { return Status::OK(); } | |
302 | Status MarkBeginPrepare(bool) override { return Status::OK(); } | |
303 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } | |
304 | Status MarkCommit(const Slice&) override { return Status::OK(); } | |
305 | Status MarkRollback(const Slice&) override { | |
306 | return Status::InvalidArgument(); | |
307 | } | |
308 | ||
309 | protected: | |
310 | virtual bool WriteAfterCommit() const override { return false; } | |
311 | } rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch, | |
312 | *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), | |
313 | wpt_db_->txn_db_options_.rollback_merge_operands); | |
314 | auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); | |
315 | assert(s.ok()); | |
316 | if (!s.ok()) { | |
317 | return s; | |
318 | } | |
319 | // The Rollback marker will be used as a batch separator | |
320 | WriteBatchInternal::MarkRollback(&rollback_batch, name_); | |
321 | bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; | |
322 | const bool DISABLE_MEMTABLE = true; | |
323 | const uint64_t NO_REF_LOG = 0; | |
324 | uint64_t seq_used = kMaxSequenceNumber; | |
325 | const size_t ONE_BATCH = 1; | |
326 | // We commit the rolled back prepared batches. ALthough this is | |
327 | // counter-intuitive, i) it is safe to do so, since the prepared batches are | |
328 | // already canceled out by the rollback batch, ii) adding the commit entry to | |
329 | // CommitCache will allow us to benefit from the existing mechanism in | |
330 | // CommitCache that keeps an entry evicted due to max advance and yet overlaps | |
331 | // with a live snapshot around so that the live snapshot properly skips the | |
332 | // entry even if its prepare seq is lower than max_evicted_seq_. | |
333 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( | |
334 | wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH); | |
335 | // Note: the rollback batch does not need AddPrepared since it is written to | |
336 | // DB in one shot. min_uncommitted still works since it requires capturing | |
337 | // data that is written to DB but not yet committed, while | |
338 | // the roolback batch commits with PreReleaseCallback. | |
339 | s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, | |
340 | NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
341 | do_one_write ? &update_commit_map : nullptr); | |
342 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
343 | if (!s.ok()) { | |
344 | return s; | |
345 | } | |
346 | if (do_one_write) { | |
347 | wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); | |
348 | return s; | |
349 | } // else do the 2nd write for commit | |
350 | uint64_t& prepare_seq = seq_used; | |
351 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, | |
352 | "RollbackInternal 2nd write prepare_seq: %" PRIu64, | |
353 | prepare_seq); | |
354 | // Commit the batch by writing an empty batch to the queue that will release | |
355 | // the commit sequence number to readers. | |
356 | const size_t ZERO_COMMITS = 0; | |
357 | WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( | |
358 | wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS); | |
359 | WriteBatch empty_batch; | |
360 | empty_batch.PutLogData(Slice()); | |
361 | // In the absence of Prepare markers, use Noop as a batch separator | |
362 | WriteBatchInternal::InsertNoop(&empty_batch); | |
363 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, | |
364 | NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
365 | &update_commit_map_with_prepare); | |
366 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
367 | // Mark the txn as rolled back | |
368 | uint64_t& rollback_seq = seq_used; | |
369 | if (s.ok()) { | |
370 | // Note: it is safe to do it after PreReleaseCallback via WriteImpl since | |
371 | // all the writes by the prpared batch are already blinded by the rollback | |
372 | // batch. The only reason we commit the prepared batch here is to benefit | |
373 | // from the existing mechanism in CommitCache that takes care of the rare | |
374 | // cases that the prepare seq is visible to a snsapshot but max evicted seq | |
375 | // advances that prepare seq. | |
376 | for (size_t i = 0; i < prepare_batch_cnt_; i++) { | |
377 | wpt_db_->AddCommitted(GetId() + i, rollback_seq); | |
378 | } | |
379 | wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); | |
380 | } | |
381 | ||
382 | return s; | |
383 | } | |
384 | ||
385 | Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, | |
386 | const Slice& key, | |
387 | SequenceNumber* tracked_at_seq) { | |
388 | assert(snapshot_); | |
389 | ||
390 | SequenceNumber min_uncommitted = | |
391 | static_cast_with_check<const SnapshotImpl, const Snapshot>( | |
392 | snapshot_.get()) | |
393 | ->min_uncommitted_; | |
394 | SequenceNumber snap_seq = snapshot_->GetSequenceNumber(); | |
395 | // tracked_at_seq is either max or the last snapshot with which this key was | |
396 | // trackeed so there is no need to apply the IsInSnapshot to this comparison | |
397 | // here as tracked_at_seq is not a prepare seq. | |
398 | if (*tracked_at_seq <= snap_seq) { | |
399 | // If the key has been previous validated at a sequence number earlier | |
400 | // than the curent snapshot's sequence number, we already know it has not | |
401 | // been modified. | |
402 | return Status::OK(); | |
403 | } | |
404 | ||
405 | *tracked_at_seq = snap_seq; | |
406 | ||
407 | ColumnFamilyHandle* cfh = | |
408 | column_family ? column_family : db_impl_->DefaultColumnFamily(); | |
409 | ||
410 | WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted); | |
411 | return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), | |
412 | snap_seq, false /* cache_only */, | |
413 | &snap_checker); | |
414 | } | |
415 | ||
416 | void WritePreparedTxn::SetSnapshot() { | |
417 | // Note: for this optimization setting the last sequence number and obtaining | |
418 | // the smallest uncommitted seq should be done atomically. However to avoid | |
419 | // the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the | |
420 | // snapshot. Since we always updated the list of unprepared seq (via | |
421 | // AddPrepared) AFTER the last sequence is updated, this guarantees that the | |
422 | // smallest uncommited seq that we pair with the snapshot is smaller or equal | |
423 | // the value that would be obtained otherwise atomically. That is ok since | |
424 | // this optimization works as long as min_uncommitted is less than or equal | |
425 | // than the smallest uncommitted seq when the snapshot was taken. | |
426 | auto min_uncommitted = wpt_db_->SmallestUnCommittedSeq(); | |
427 | const bool FOR_WW_CONFLICT_CHECK = true; | |
428 | SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK); | |
429 | assert(snapshot); | |
430 | wpt_db_->EnhanceSnapshot(snapshot, min_uncommitted); | |
431 | SetSnapshotInternal(snapshot); | |
432 | } | |
433 | ||
434 | Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) { | |
435 | auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch); | |
436 | prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); | |
437 | return ret; | |
438 | } | |
439 | ||
440 | } // namespace rocksdb | |
441 | ||
442 | #endif // ROCKSDB_LITE |