1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/transactions/write_unprepared_txn_db.h"
9 #include "db/arena_wrapped_db_iter.h"
10 #include "rocksdb/utilities/transaction_db.h"
11 #include "util/cast_util.h"
13 namespace ROCKSDB_NAMESPACE
{
15 // Instead of reconstructing a Transaction object, and calling rollback on it,
16 // we can be more efficient with RollbackRecoveredTransaction by skipping
17 // unnecessary steps (eg. updating CommitMap, reconstructing keyset)
18 Status
WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
19 const DBImpl::RecoveredTransaction
* rtxn
) {
20 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
21 assert(rtxn
->unprepared_
);
22 auto cf_map_shared_ptr
= WritePreparedTxnDB::GetCFHandleMap();
23 auto cf_comp_map_shared_ptr
= WritePreparedTxnDB::GetCFComparatorMap();
24 // In theory we could write with disableWAL = true during recovery, and
25 // assume that if we crash again during recovery, we can just replay from
26 // the very beginning. Unfortunately, the XIDs from the application may not
27 // necessarily be unique across restarts, potentially leading to situations
30 // BEGIN_PREPARE(unprepared) Put(a) END_PREPARE(xid = 1)
31 // -- crash and recover with Put(a) rolled back as it was not prepared
32 // BEGIN_PREPARE(prepared) Put(b) END_PREPARE(xid = 1)
34 // -- crash and recover with both a, b
36 // We could just write the rollback marker, but then we would have to extend
37 // MemTableInserter during recovery to actually do writes into the DB
38 // instead of just dropping the in-memory write batch.
40 WriteOptions w_options
;
42 class InvalidSnapshotReadCallback
: public ReadCallback
{
44 InvalidSnapshotReadCallback(SequenceNumber snapshot
)
45 : ReadCallback(snapshot
) {}
47 inline bool IsVisibleFullCheck(SequenceNumber
) override
{
48 // The seq provided as snapshot is the seq right before we have locked and
49 // wrote to it, so whatever is there, it is committed.
53 // Ignore the refresh request since we are confident that our snapshot seq
54 // is not going to be affected by concurrent compactions (not enabled yet.)
55 void Refresh(SequenceNumber
) override
{}
58 // Iterate starting with largest sequence number.
59 for (auto it
= rtxn
->batches_
.rbegin(); it
!= rtxn
->batches_
.rend(); ++it
) {
60 auto last_visible_txn
= it
->first
- 1;
61 const auto& batch
= it
->second
.batch_
;
62 WriteBatch rollback_batch
;
64 struct RollbackWriteBatchBuilder
: public WriteBatch::Handler
{
67 InvalidSnapshotReadCallback callback
;
68 WriteBatch
* rollback_batch_
;
69 std::map
<uint32_t, const Comparator
*>& comparators_
;
70 std::map
<uint32_t, ColumnFamilyHandle
*>& handles_
;
71 using CFKeys
= std::set
<Slice
, SetComparator
>;
72 std::map
<uint32_t, CFKeys
> keys_
;
73 bool rollback_merge_operands_
;
74 RollbackWriteBatchBuilder(
75 DBImpl
* db
, SequenceNumber snap_seq
, WriteBatch
* dst_batch
,
76 std::map
<uint32_t, const Comparator
*>& comparators
,
77 std::map
<uint32_t, ColumnFamilyHandle
*>& handles
,
78 bool rollback_merge_operands
)
81 // disable min_uncommitted optimization
82 rollback_batch_(dst_batch
),
83 comparators_(comparators
),
85 rollback_merge_operands_(rollback_merge_operands
) {}
87 Status
Rollback(uint32_t cf
, const Slice
& key
) {
89 CFKeys
& cf_keys
= keys_
[cf
];
90 if (cf_keys
.size() == 0) { // just inserted
91 auto cmp
= comparators_
[cf
];
92 keys_
[cf
] = CFKeys(SetComparator(cmp
));
94 auto res
= cf_keys
.insert(key
);
96 false) { // second is false if a element already existed.
100 PinnableSlice pinnable_val
;
102 auto cf_handle
= handles_
[cf
];
103 DBImpl::GetImplOptions get_impl_options
;
104 get_impl_options
.column_family
= cf_handle
;
105 get_impl_options
.value
= &pinnable_val
;
106 get_impl_options
.value_found
= ¬_used
;
107 get_impl_options
.callback
= &callback
;
108 s
= db_
->GetImpl(roptions
, key
, get_impl_options
);
109 assert(s
.ok() || s
.IsNotFound());
111 s
= rollback_batch_
->Put(cf_handle
, key
, pinnable_val
);
113 } else if (s
.IsNotFound()) {
114 // There has been no readable value before txn. By adding a delete we
115 // make sure that there will be none afterwards either.
116 s
= rollback_batch_
->Delete(cf_handle
, key
);
119 // Unexpected status. Return it to the user.
124 Status
PutCF(uint32_t cf
, const Slice
& key
,
125 const Slice
& /*val*/) override
{
126 return Rollback(cf
, key
);
129 Status
DeleteCF(uint32_t cf
, const Slice
& key
) override
{
130 return Rollback(cf
, key
);
133 Status
SingleDeleteCF(uint32_t cf
, const Slice
& key
) override
{
134 return Rollback(cf
, key
);
137 Status
MergeCF(uint32_t cf
, const Slice
& key
,
138 const Slice
& /*val*/) override
{
139 if (rollback_merge_operands_
) {
140 return Rollback(cf
, key
);
146 // Recovered batches do not contain 2PC markers.
147 Status
MarkNoop(bool) override
{ return Status::InvalidArgument(); }
148 Status
MarkBeginPrepare(bool) override
{
149 return Status::InvalidArgument();
151 Status
MarkEndPrepare(const Slice
&) override
{
152 return Status::InvalidArgument();
154 Status
MarkCommit(const Slice
&) override
{
155 return Status::InvalidArgument();
157 Status
MarkRollback(const Slice
&) override
{
158 return Status::InvalidArgument();
160 } rollback_handler(db_impl_
, last_visible_txn
, &rollback_batch
,
161 *cf_comp_map_shared_ptr
.get(), *cf_map_shared_ptr
.get(),
162 txn_db_options_
.rollback_merge_operands
);
164 auto s
= batch
->Iterate(&rollback_handler
);
169 // The Rollback marker will be used as a batch separator
170 WriteBatchInternal::MarkRollback(&rollback_batch
, rtxn
->name_
);
172 const uint64_t kNoLogRef
= 0;
173 const bool kDisableMemtable
= true;
174 const size_t kOneBatch
= 1;
175 uint64_t seq_used
= kMaxSequenceNumber
;
176 s
= db_impl_
->WriteImpl(w_options
, &rollback_batch
, nullptr, nullptr,
177 kNoLogRef
, !kDisableMemtable
, &seq_used
, kOneBatch
);
182 // If two_write_queues, we must manually release the sequence number to
184 if (db_impl_
->immutable_db_options().two_write_queues
) {
185 db_impl_
->SetLastPublishedSequence(seq_used
);
192 Status
WriteUnpreparedTxnDB::Initialize(
193 const std::vector
<size_t>& compaction_enabled_cf_indices
,
194 const std::vector
<ColumnFamilyHandle
*>& handles
) {
195 // TODO(lth): Reduce code duplication in this function.
196 auto dbimpl
= static_cast_with_check
<DBImpl
, DB
>(GetRootDB());
197 assert(dbimpl
!= nullptr);
199 db_impl_
->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
200 // A callback to commit a single sub-batch
201 class CommitSubBatchPreReleaseCallback
: public PreReleaseCallback
{
203 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB
* db
)
205 Status
Callback(SequenceNumber commit_seq
,
206 bool is_mem_disabled
__attribute__((__unused__
)), uint64_t,
207 size_t /*index*/, size_t /*total*/) override
{
208 assert(!is_mem_disabled
);
209 db_
->AddCommitted(commit_seq
, commit_seq
);
214 WritePreparedTxnDB
* db_
;
216 db_impl_
->SetRecoverableStatePreReleaseCallback(
217 new CommitSubBatchPreReleaseCallback(this));
219 // PessimisticTransactionDB::Initialize
220 for (auto cf_ptr
: handles
) {
221 AddColumnFamily(cf_ptr
);
224 for (auto handle
: handles
) {
225 ColumnFamilyDescriptor cfd
;
226 Status s
= handle
->GetDescriptor(&cfd
);
230 s
= VerifyCFOptions(cfd
.options
);
236 // Re-enable compaction for the column families that initially had
237 // compaction enabled.
238 std::vector
<ColumnFamilyHandle
*> compaction_enabled_cf_handles
;
239 compaction_enabled_cf_handles
.reserve(compaction_enabled_cf_indices
.size());
240 for (auto index
: compaction_enabled_cf_indices
) {
241 compaction_enabled_cf_handles
.push_back(handles
[index
]);
244 // create 'real' transactions from recovered shell transactions
245 auto rtxns
= dbimpl
->recovered_transactions();
246 std::map
<SequenceNumber
, SequenceNumber
> ordered_seq_cnt
;
247 for (auto rtxn
: rtxns
) {
248 auto recovered_trx
= rtxn
.second
;
249 assert(recovered_trx
);
250 assert(recovered_trx
->batches_
.size() >= 1);
251 assert(recovered_trx
->name_
.length());
253 // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
254 // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
255 // two iterations is required.
256 if (recovered_trx
->unprepared_
) {
260 WriteOptions w_options
;
261 w_options
.sync
= true;
262 TransactionOptions t_options
;
264 auto first_log_number
= recovered_trx
->batches_
.begin()->second
.log_number_
;
265 auto first_seq
= recovered_trx
->batches_
.begin()->first
;
266 auto last_prepare_batch_cnt
=
267 recovered_trx
->batches_
.begin()->second
.batch_cnt_
;
269 Transaction
* real_trx
= BeginTransaction(w_options
, t_options
, nullptr);
272 static_cast_with_check
<WriteUnpreparedTxn
, Transaction
>(real_trx
);
273 wupt
->recovered_txn_
= true;
275 real_trx
->SetLogNumber(first_log_number
);
276 real_trx
->SetId(first_seq
);
277 Status s
= real_trx
->SetName(recovered_trx
->name_
);
281 wupt
->prepare_batch_cnt_
= last_prepare_batch_cnt
;
283 for (auto batch
: recovered_trx
->batches_
) {
284 const auto& seq
= batch
.first
;
285 const auto& batch_info
= batch
.second
;
286 auto cnt
= batch_info
.batch_cnt_
? batch_info
.batch_cnt_
: 1;
287 assert(batch_info
.log_number_
);
289 ordered_seq_cnt
[seq
] = cnt
;
290 assert(wupt
->unprep_seqs_
.count(seq
) == 0);
291 wupt
->unprep_seqs_
[seq
] = cnt
;
293 s
= wupt
->RebuildFromWriteBatch(batch_info
.batch_
);
300 const bool kClear
= true;
301 wupt
->InitWriteBatch(kClear
);
303 real_trx
->SetState(Transaction::PREPARED
);
308 // AddPrepared must be called in order
309 for (auto seq_cnt
: ordered_seq_cnt
) {
310 auto seq
= seq_cnt
.first
;
311 auto cnt
= seq_cnt
.second
;
312 for (size_t i
= 0; i
< cnt
; i
++) {
313 AddPrepared(seq
+ i
);
317 SequenceNumber prev_max
= max_evicted_seq_
;
318 SequenceNumber last_seq
= db_impl_
->GetLatestSequenceNumber();
319 AdvanceMaxEvictedSeq(prev_max
, last_seq
);
320 // Create a gap between max and the next snapshot. This simplifies the logic
321 // in IsInSnapshot by not having to consider the special case of max ==
322 // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
324 db_impl_
->versions_
->SetLastAllocatedSequence(last_seq
+ 1);
325 db_impl_
->versions_
->SetLastSequence(last_seq
+ 1);
326 db_impl_
->versions_
->SetLastPublishedSequence(last_seq
+ 1);
330 // Rollback unprepared transactions.
331 for (auto rtxn
: rtxns
) {
332 auto recovered_trx
= rtxn
.second
;
333 if (recovered_trx
->unprepared_
) {
334 s
= RollbackRecoveredTransaction(recovered_trx
);
343 dbimpl
->DeleteAllRecoveredTransactions();
345 // Compaction should start only after max_evicted_seq_ is set AND recovered
346 // transactions are either added to PrepareHeap or rolled back.
347 s
= EnableAutoCompaction(compaction_enabled_cf_handles
);
353 Transaction
* WriteUnpreparedTxnDB::BeginTransaction(
354 const WriteOptions
& write_options
, const TransactionOptions
& txn_options
,
355 Transaction
* old_txn
) {
356 if (old_txn
!= nullptr) {
357 ReinitializeTransaction(old_txn
, write_options
, txn_options
);
360 return new WriteUnpreparedTxn(this, write_options
, txn_options
);
364 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
365 struct WriteUnpreparedTxnDB::IteratorState
{
366 IteratorState(WritePreparedTxnDB
* txn_db
, SequenceNumber sequence
,
367 std::shared_ptr
<ManagedSnapshot
> s
,
368 SequenceNumber min_uncommitted
, WriteUnpreparedTxn
* txn
)
369 : callback(txn_db
, sequence
, min_uncommitted
, txn
->unprep_seqs_
,
370 kBackedByDBSnapshot
),
372 SequenceNumber
MaxVisibleSeq() { return callback
.max_visible_seq(); }
374 WriteUnpreparedTxnReadCallback callback
;
375 std::shared_ptr
<ManagedSnapshot
> snapshot
;
379 static void CleanupWriteUnpreparedTxnDBIterator(void* arg1
, void* /*arg2*/) {
380 delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState
*>(arg1
);
382 } // anonymous namespace
384 Iterator
* WriteUnpreparedTxnDB::NewIterator(const ReadOptions
& options
,
385 ColumnFamilyHandle
* column_family
,
386 WriteUnpreparedTxn
* txn
) {
387 // TODO(lth): Refactor so that this logic is shared with WritePrepared.
388 constexpr bool ALLOW_BLOB
= true;
389 constexpr bool ALLOW_REFRESH
= true;
390 std::shared_ptr
<ManagedSnapshot
> own_snapshot
= nullptr;
391 SequenceNumber snapshot_seq
= kMaxSequenceNumber
;
392 SequenceNumber min_uncommitted
= 0;
394 // Currently, the Prev() iterator logic does not work well without snapshot
395 // validation. The logic simply iterates through values of a key in
396 // ascending seqno order, stopping at the first non-visible value and
397 // returning the last visible value.
399 // For example, if snapshot sequence is 3, and we have the following keys:
406 // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3,
407 // which is the last visible value.
409 // For unprepared transactions, if we have snap_seq = 3, but the current
410 // transaction has unprep_seq 5, then returning the first non-visible value
411 // would be incorrect, as we should return v5, and not v3. The problem is that
412 // there are committed values at snapshot_seq < commit_seq < unprep_seq.
414 // Snapshot validation can prevent this problem by ensuring that no committed
415 // values exist at snapshot_seq < commit_seq, and thus any value with a
416 // sequence number greater than snapshot_seq must be unprepared values. For
417 // example, if the transaction had a snapshot at 3, then snapshot validation
418 // would be performed during the Put(v5) call. It would find v4, and the Put
419 // would fail with snapshot validation failure.
421 // TODO(lth): Improve Prev() logic to continue iterating until
422 // max_visible_seq, and then return the last visible value, so that this
423 // restriction can be lifted.
424 const Snapshot
* snapshot
= nullptr;
425 if (options
.snapshot
== nullptr) {
426 snapshot
= GetSnapshot();
427 own_snapshot
= std::make_shared
<ManagedSnapshot
>(db_impl_
, snapshot
);
429 snapshot
= options
.snapshot
;
432 snapshot_seq
= snapshot
->GetSequenceNumber();
433 assert(snapshot_seq
!= kMaxSequenceNumber
);
434 // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are
435 // guaranteed that for keys that were modified by this transaction (and thus
436 // might have unprepared values), no committed values exist at
437 // largest_validated_seq < commit_seq (or the contrapositive: any committed
438 // value must exist at commit_seq <= largest_validated_seq). This implies
439 // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <=
440 // snapshot_seq. As explained above, the problem with Prev() only happens when
441 // snapshot_seq < commit_seq.
443 // For keys that were not modified by this transaction, largest_validated_seq_
444 // is meaningless, and Prev() should just work with the existing visibility
446 if (txn
->largest_validated_seq_
> snapshot
->GetSequenceNumber() &&
447 !txn
->unprep_seqs_
.empty()) {
448 ROCKS_LOG_ERROR(info_log_
,
449 "WriteUnprepared iterator creation failed since the "
450 "transaction has performed unvalidated writes");
454 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(snapshot
)
457 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
)->cfd();
459 new IteratorState(this, snapshot_seq
, own_snapshot
, min_uncommitted
, txn
);
461 db_impl_
->NewIteratorImpl(options
, cfd
, state
->MaxVisibleSeq(),
462 &state
->callback
, !ALLOW_BLOB
, !ALLOW_REFRESH
);
463 db_iter
->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator
, state
, nullptr);
467 } // namespace ROCKSDB_NAMESPACE
468 #endif // ROCKSDB_LITE