1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
12 #include "utilities/transactions/write_unprepared_txn_db.h"
13 #include "rocksdb/utilities/transaction_db.h"
14 #include "util/cast_util.h"
18 // Instead of reconstructing a Transaction object, and calling rollback on it,
19 // we can be more efficient with RollbackRecoveredTransaction by skipping
20 // unnecessary steps (eg. updating CommitMap, reconstructing keyset)
21 Status
WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
22 const DBImpl::RecoveredTransaction
* rtxn
) {
23 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
24 assert(rtxn
->unprepared_
);
25 auto cf_map_shared_ptr
= WritePreparedTxnDB::GetCFHandleMap();
26 auto cf_comp_map_shared_ptr
= WritePreparedTxnDB::GetCFComparatorMap();
27 WriteOptions w_options
;
28 // If we crash during recovery, we can just recalculate and rewrite the
30 w_options
.disableWAL
= true;
32 class InvalidSnapshotReadCallback
: public ReadCallback
{
34 InvalidSnapshotReadCallback(WritePreparedTxnDB
* db
, SequenceNumber snapshot
)
35 : ReadCallback(snapshot
), db_(db
) {}
37 // Will be called to see if the seq number visible; if not it moves on to
38 // the next seq number.
39 inline bool IsVisibleFullCheck(SequenceNumber seq
) override
{
40 // Becomes true if it cannot tell by comparing seq with snapshot seq since
41 // the snapshot is not a real snapshot.
42 auto snapshot
= max_visible_seq_
;
43 bool released
= false;
44 auto ret
= db_
->IsInSnapshot(seq
, snapshot
, min_uncommitted_
, &released
);
45 assert(!released
|| ret
);
50 WritePreparedTxnDB
* db_
;
53 // Iterate starting with largest sequence number.
54 for (auto it
= rtxn
->batches_
.rbegin(); it
!= rtxn
->batches_
.rend(); it
++) {
55 auto last_visible_txn
= it
->first
- 1;
56 const auto& batch
= it
->second
.batch_
;
57 WriteBatch rollback_batch
;
59 struct RollbackWriteBatchBuilder
: public WriteBatch::Handler
{
62 InvalidSnapshotReadCallback callback
;
63 WriteBatch
* rollback_batch_
;
64 std::map
<uint32_t, const Comparator
*>& comparators_
;
65 std::map
<uint32_t, ColumnFamilyHandle
*>& handles_
;
66 using CFKeys
= std::set
<Slice
, SetComparator
>;
67 std::map
<uint32_t, CFKeys
> keys_
;
68 bool rollback_merge_operands_
;
69 RollbackWriteBatchBuilder(
70 DBImpl
* db
, WritePreparedTxnDB
* wpt_db
, SequenceNumber snap_seq
,
71 WriteBatch
* dst_batch
,
72 std::map
<uint32_t, const Comparator
*>& comparators
,
73 std::map
<uint32_t, ColumnFamilyHandle
*>& handles
,
74 bool rollback_merge_operands
)
76 callback(wpt_db
, snap_seq
),
77 // disable min_uncommitted optimization
78 rollback_batch_(dst_batch
),
79 comparators_(comparators
),
81 rollback_merge_operands_(rollback_merge_operands
) {}
83 Status
Rollback(uint32_t cf
, const Slice
& key
) {
85 CFKeys
& cf_keys
= keys_
[cf
];
86 if (cf_keys
.size() == 0) { // just inserted
87 auto cmp
= comparators_
[cf
];
88 keys_
[cf
] = CFKeys(SetComparator(cmp
));
90 auto res
= cf_keys
.insert(key
);
92 false) { // second is false if a element already existed.
96 PinnableSlice pinnable_val
;
98 auto cf_handle
= handles_
[cf
];
99 s
= db_
->GetImpl(roptions
, cf_handle
, key
, &pinnable_val
, ¬_used
,
101 assert(s
.ok() || s
.IsNotFound());
103 s
= rollback_batch_
->Put(cf_handle
, key
, pinnable_val
);
105 } else if (s
.IsNotFound()) {
106 // There has been no readable value before txn. By adding a delete we
107 // make sure that there will be none afterwards either.
108 s
= rollback_batch_
->Delete(cf_handle
, key
);
111 // Unexpected status. Return it to the user.
116 Status
PutCF(uint32_t cf
, const Slice
& key
,
117 const Slice
& /*val*/) override
{
118 return Rollback(cf
, key
);
121 Status
DeleteCF(uint32_t cf
, const Slice
& key
) override
{
122 return Rollback(cf
, key
);
125 Status
SingleDeleteCF(uint32_t cf
, const Slice
& key
) override
{
126 return Rollback(cf
, key
);
129 Status
MergeCF(uint32_t cf
, const Slice
& key
,
130 const Slice
& /*val*/) override
{
131 if (rollback_merge_operands_
) {
132 return Rollback(cf
, key
);
138 // Recovered batches do not contain 2PC markers.
139 Status
MarkNoop(bool) override
{ return Status::InvalidArgument(); }
140 Status
MarkBeginPrepare(bool) override
{
141 return Status::InvalidArgument();
143 Status
MarkEndPrepare(const Slice
&) override
{
144 return Status::InvalidArgument();
146 Status
MarkCommit(const Slice
&) override
{
147 return Status::InvalidArgument();
149 Status
MarkRollback(const Slice
&) override
{
150 return Status::InvalidArgument();
152 } rollback_handler(db_impl_
, this, last_visible_txn
, &rollback_batch
,
153 *cf_comp_map_shared_ptr
.get(), *cf_map_shared_ptr
.get(),
154 txn_db_options_
.rollback_merge_operands
);
156 auto s
= batch
->Iterate(&rollback_handler
);
161 // The Rollback marker will be used as a batch separator
162 WriteBatchInternal::MarkRollback(&rollback_batch
, rtxn
->name_
);
164 const uint64_t kNoLogRef
= 0;
165 const bool kDisableMemtable
= true;
166 const size_t kOneBatch
= 1;
167 uint64_t seq_used
= kMaxSequenceNumber
;
168 s
= db_impl_
->WriteImpl(w_options
, &rollback_batch
, nullptr, nullptr,
169 kNoLogRef
, !kDisableMemtable
, &seq_used
, kOneBatch
);
174 // If two_write_queues, we must manually release the sequence number to
176 if (db_impl_
->immutable_db_options().two_write_queues
) {
177 db_impl_
->SetLastPublishedSequence(seq_used
);
184 Status
WriteUnpreparedTxnDB::Initialize(
185 const std::vector
<size_t>& compaction_enabled_cf_indices
,
186 const std::vector
<ColumnFamilyHandle
*>& handles
) {
187 // TODO(lth): Reduce code duplication in this function.
188 auto dbimpl
= reinterpret_cast<DBImpl
*>(GetRootDB());
189 assert(dbimpl
!= nullptr);
191 db_impl_
->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
192 // A callback to commit a single sub-batch
193 class CommitSubBatchPreReleaseCallback
: public PreReleaseCallback
{
195 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB
* db
)
197 Status
Callback(SequenceNumber commit_seq
,
198 bool is_mem_disabled
__attribute__((__unused__
)),
200 assert(!is_mem_disabled
);
201 db_
->AddCommitted(commit_seq
, commit_seq
);
206 WritePreparedTxnDB
* db_
;
208 db_impl_
->SetRecoverableStatePreReleaseCallback(
209 new CommitSubBatchPreReleaseCallback(this));
211 // PessimisticTransactionDB::Initialize
212 for (auto cf_ptr
: handles
) {
213 AddColumnFamily(cf_ptr
);
216 for (auto handle
: handles
) {
217 ColumnFamilyDescriptor cfd
;
218 Status s
= handle
->GetDescriptor(&cfd
);
222 s
= VerifyCFOptions(cfd
.options
);
228 // Re-enable compaction for the column families that initially had
229 // compaction enabled.
230 std::vector
<ColumnFamilyHandle
*> compaction_enabled_cf_handles
;
231 compaction_enabled_cf_handles
.reserve(compaction_enabled_cf_indices
.size());
232 for (auto index
: compaction_enabled_cf_indices
) {
233 compaction_enabled_cf_handles
.push_back(handles
[index
]);
236 // create 'real' transactions from recovered shell transactions
237 auto rtxns
= dbimpl
->recovered_transactions();
238 for (auto rtxn
: rtxns
) {
239 auto recovered_trx
= rtxn
.second
;
240 assert(recovered_trx
);
241 assert(recovered_trx
->batches_
.size() >= 1);
242 assert(recovered_trx
->name_
.length());
244 // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
245 // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
246 // two iterations is required.
247 if (recovered_trx
->unprepared_
) {
251 WriteOptions w_options
;
252 w_options
.sync
= true;
253 TransactionOptions t_options
;
255 auto first_log_number
= recovered_trx
->batches_
.begin()->second
.log_number_
;
256 auto first_seq
= recovered_trx
->batches_
.begin()->first
;
257 auto last_prepare_batch_cnt
=
258 recovered_trx
->batches_
.begin()->second
.batch_cnt_
;
260 Transaction
* real_trx
= BeginTransaction(w_options
, t_options
, nullptr);
263 static_cast_with_check
<WriteUnpreparedTxn
, Transaction
>(real_trx
);
265 real_trx
->SetLogNumber(first_log_number
);
266 real_trx
->SetId(first_seq
);
267 Status s
= real_trx
->SetName(recovered_trx
->name_
);
271 wupt
->prepare_batch_cnt_
= last_prepare_batch_cnt
;
273 for (auto batch
: recovered_trx
->batches_
) {
274 const auto& seq
= batch
.first
;
275 const auto& batch_info
= batch
.second
;
276 auto cnt
= batch_info
.batch_cnt_
? batch_info
.batch_cnt_
: 1;
277 assert(batch_info
.log_number_
);
279 for (size_t i
= 0; i
< cnt
; i
++) {
280 AddPrepared(seq
+ i
);
282 assert(wupt
->unprep_seqs_
.count(seq
) == 0);
283 wupt
->unprep_seqs_
[seq
] = cnt
;
284 KeySetBuilder
keyset_handler(wupt
,
285 txn_db_options_
.rollback_merge_operands
);
286 s
= batch_info
.batch_
->Iterate(&keyset_handler
);
293 wupt
->write_batch_
.Clear();
294 WriteBatchInternal::InsertNoop(wupt
->write_batch_
.GetWriteBatch());
296 real_trx
->SetState(Transaction::PREPARED
);
302 SequenceNumber prev_max
= max_evicted_seq_
;
303 SequenceNumber last_seq
= db_impl_
->GetLatestSequenceNumber();
304 AdvanceMaxEvictedSeq(prev_max
, last_seq
);
305 // Create a gap between max and the next snapshot. This simplifies the logic
306 // in IsInSnapshot by not having to consider the special case of max ==
307 // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
309 db_impl_
->versions_
->SetLastAllocatedSequence(last_seq
+ 1);
310 db_impl_
->versions_
->SetLastSequence(last_seq
+ 1);
311 db_impl_
->versions_
->SetLastPublishedSequence(last_seq
+ 1);
314 // Compaction should start only after max_evicted_seq_ is set.
315 Status s
= EnableAutoCompaction(compaction_enabled_cf_handles
);
320 // Rollback unprepared transactions.
321 for (auto rtxn
: rtxns
) {
322 auto recovered_trx
= rtxn
.second
;
323 if (recovered_trx
->unprepared_
) {
324 s
= RollbackRecoveredTransaction(recovered_trx
);
333 dbimpl
->DeleteAllRecoveredTransactions();
339 Transaction
* WriteUnpreparedTxnDB::BeginTransaction(
340 const WriteOptions
& write_options
, const TransactionOptions
& txn_options
,
341 Transaction
* old_txn
) {
342 if (old_txn
!= nullptr) {
343 ReinitializeTransaction(old_txn
, write_options
, txn_options
);
346 return new WriteUnpreparedTxn(this, write_options
, txn_options
);
350 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
351 struct WriteUnpreparedTxnDB::IteratorState
{
352 IteratorState(WritePreparedTxnDB
* txn_db
, SequenceNumber sequence
,
353 std::shared_ptr
<ManagedSnapshot
> s
,
354 SequenceNumber min_uncommitted
, WriteUnpreparedTxn
* txn
)
355 : callback(txn_db
, sequence
, min_uncommitted
, txn
), snapshot(s
) {}
356 SequenceNumber
MaxVisibleSeq() { return callback
.max_visible_seq(); }
358 WriteUnpreparedTxnReadCallback callback
;
359 std::shared_ptr
<ManagedSnapshot
> snapshot
;
363 static void CleanupWriteUnpreparedTxnDBIterator(void* arg1
, void* /*arg2*/) {
364 delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState
*>(arg1
);
366 } // anonymous namespace
368 Iterator
* WriteUnpreparedTxnDB::NewIterator(const ReadOptions
& options
,
369 ColumnFamilyHandle
* column_family
,
370 WriteUnpreparedTxn
* txn
) {
371 // TODO(lth): Refactor so that this logic is shared with WritePrepared.
372 constexpr bool ALLOW_BLOB
= true;
373 constexpr bool ALLOW_REFRESH
= true;
374 std::shared_ptr
<ManagedSnapshot
> own_snapshot
= nullptr;
375 SequenceNumber snapshot_seq
;
376 SequenceNumber min_uncommitted
= 0;
377 if (options
.snapshot
!= nullptr) {
378 snapshot_seq
= options
.snapshot
->GetSequenceNumber();
380 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(
384 auto* snapshot
= GetSnapshot();
385 // We take a snapshot to make sure that the related data in the commit map
387 snapshot_seq
= snapshot
->GetSequenceNumber();
389 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(snapshot
)
391 own_snapshot
= std::make_shared
<ManagedSnapshot
>(db_impl_
, snapshot
);
393 assert(snapshot_seq
!= kMaxSequenceNumber
);
394 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
)->cfd();
396 new IteratorState(this, snapshot_seq
, own_snapshot
, min_uncommitted
, txn
);
398 db_impl_
->NewIteratorImpl(options
, cfd
, state
->MaxVisibleSeq(),
399 &state
->callback
, !ALLOW_BLOB
, !ALLOW_REFRESH
);
400 db_iter
->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator
, state
, nullptr);
404 Status
KeySetBuilder::PutCF(uint32_t cf
, const Slice
& key
,
405 const Slice
& /*val*/) {
406 txn_
->UpdateWriteKeySet(cf
, key
);
410 Status
KeySetBuilder::DeleteCF(uint32_t cf
, const Slice
& key
) {
411 txn_
->UpdateWriteKeySet(cf
, key
);
415 Status
KeySetBuilder::SingleDeleteCF(uint32_t cf
, const Slice
& key
) {
416 txn_
->UpdateWriteKeySet(cf
, key
);
420 Status
KeySetBuilder::MergeCF(uint32_t cf
, const Slice
& key
,
421 const Slice
& /*val*/) {
422 if (rollback_merge_operands_
) {
423 txn_
->UpdateWriteKeySet(cf
, key
);
428 } // namespace rocksdb
429 #endif // ROCKSDB_LITE