1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
12 #include "utilities/transactions/write_unprepared_txn_db.h"
13 #include "rocksdb/utilities/transaction_db.h"
14 #include "util/cast_util.h"
18 // Instead of reconstructing a Transaction object, and calling rollback on it,
19 // we can be more efficient with RollbackRecoveredTransaction by skipping
20 // unnecessary steps (eg. updating CommitMap, reconstructing keyset)
21 Status
WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
22 const DBImpl::RecoveredTransaction
* rtxn
) {
23 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
24 assert(rtxn
->unprepared_
);
25 auto cf_map_shared_ptr
= WritePreparedTxnDB::GetCFHandleMap();
26 auto cf_comp_map_shared_ptr
= WritePreparedTxnDB::GetCFComparatorMap();
27 WriteOptions w_options
;
28 // If we crash during recovery, we can just recalculate and rewrite the
30 w_options
.disableWAL
= true;
32 // Iterate starting with largest sequence number.
33 for (auto it
= rtxn
->batches_
.rbegin(); it
!= rtxn
->batches_
.rend(); it
++) {
34 auto last_visible_txn
= it
->first
- 1;
35 const auto& batch
= it
->second
.batch_
;
36 WriteBatch rollback_batch
;
38 struct RollbackWriteBatchBuilder
: public WriteBatch::Handler
{
41 WritePreparedTxnReadCallback callback
;
42 WriteBatch
* rollback_batch_
;
43 std::map
<uint32_t, const Comparator
*>& comparators_
;
44 std::map
<uint32_t, ColumnFamilyHandle
*>& handles_
;
45 using CFKeys
= std::set
<Slice
, SetComparator
>;
46 std::map
<uint32_t, CFKeys
> keys_
;
47 bool rollback_merge_operands_
;
48 RollbackWriteBatchBuilder(
49 DBImpl
* db
, WritePreparedTxnDB
* wpt_db
, SequenceNumber snap_seq
,
50 WriteBatch
* dst_batch
,
51 std::map
<uint32_t, const Comparator
*>& comparators
,
52 std::map
<uint32_t, ColumnFamilyHandle
*>& handles
,
53 bool rollback_merge_operands
)
55 callback(wpt_db
, snap_seq
,
56 0), // 0 disables min_uncommitted optimization
57 rollback_batch_(dst_batch
),
58 comparators_(comparators
),
60 rollback_merge_operands_(rollback_merge_operands
) {}
62 Status
Rollback(uint32_t cf
, const Slice
& key
) {
64 CFKeys
& cf_keys
= keys_
[cf
];
65 if (cf_keys
.size() == 0) { // just inserted
66 auto cmp
= comparators_
[cf
];
67 keys_
[cf
] = CFKeys(SetComparator(cmp
));
69 auto res
= cf_keys
.insert(key
);
71 false) { // second is false if a element already existed.
75 PinnableSlice pinnable_val
;
77 auto cf_handle
= handles_
[cf
];
78 s
= db_
->GetImpl(roptions
, cf_handle
, key
, &pinnable_val
, ¬_used
,
80 assert(s
.ok() || s
.IsNotFound());
82 s
= rollback_batch_
->Put(cf_handle
, key
, pinnable_val
);
84 } else if (s
.IsNotFound()) {
85 // There has been no readable value before txn. By adding a delete we
86 // make sure that there will be none afterwards either.
87 s
= rollback_batch_
->Delete(cf_handle
, key
);
90 // Unexpected status. Return it to the user.
95 Status
PutCF(uint32_t cf
, const Slice
& key
,
96 const Slice
& /*val*/) override
{
97 return Rollback(cf
, key
);
100 Status
DeleteCF(uint32_t cf
, const Slice
& key
) override
{
101 return Rollback(cf
, key
);
104 Status
SingleDeleteCF(uint32_t cf
, const Slice
& key
) override
{
105 return Rollback(cf
, key
);
108 Status
MergeCF(uint32_t cf
, const Slice
& key
,
109 const Slice
& /*val*/) override
{
110 if (rollback_merge_operands_
) {
111 return Rollback(cf
, key
);
117 // Recovered batches do not contain 2PC markers.
118 Status
MarkNoop(bool) override
{ return Status::InvalidArgument(); }
119 Status
MarkBeginPrepare(bool) override
{
120 return Status::InvalidArgument();
122 Status
MarkEndPrepare(const Slice
&) override
{
123 return Status::InvalidArgument();
125 Status
MarkCommit(const Slice
&) override
{
126 return Status::InvalidArgument();
128 Status
MarkRollback(const Slice
&) override
{
129 return Status::InvalidArgument();
131 } rollback_handler(db_impl_
, this, last_visible_txn
, &rollback_batch
,
132 *cf_comp_map_shared_ptr
.get(), *cf_map_shared_ptr
.get(),
133 txn_db_options_
.rollback_merge_operands
);
135 auto s
= batch
->Iterate(&rollback_handler
);
140 // The Rollback marker will be used as a batch separator
141 WriteBatchInternal::MarkRollback(&rollback_batch
, rtxn
->name_
);
143 const uint64_t kNoLogRef
= 0;
144 const bool kDisableMemtable
= true;
145 const size_t kOneBatch
= 1;
146 uint64_t seq_used
= kMaxSequenceNumber
;
147 s
= db_impl_
->WriteImpl(w_options
, &rollback_batch
, nullptr, nullptr,
148 kNoLogRef
, !kDisableMemtable
, &seq_used
, kOneBatch
);
153 // If two_write_queues, we must manually release the sequence number to
155 if (db_impl_
->immutable_db_options().two_write_queues
) {
156 db_impl_
->SetLastPublishedSequence(seq_used
);
163 Status
WriteUnpreparedTxnDB::Initialize(
164 const std::vector
<size_t>& compaction_enabled_cf_indices
,
165 const std::vector
<ColumnFamilyHandle
*>& handles
) {
166 // TODO(lth): Reduce code duplication in this function.
167 auto dbimpl
= reinterpret_cast<DBImpl
*>(GetRootDB());
168 assert(dbimpl
!= nullptr);
170 db_impl_
->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
171 // A callback to commit a single sub-batch
172 class CommitSubBatchPreReleaseCallback
: public PreReleaseCallback
{
174 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB
* db
)
176 virtual Status
Callback(SequenceNumber commit_seq
,
177 bool is_mem_disabled
) override
{
179 (void)is_mem_disabled
;
181 assert(!is_mem_disabled
);
182 db_
->AddCommitted(commit_seq
, commit_seq
);
187 WritePreparedTxnDB
* db_
;
189 db_impl_
->SetRecoverableStatePreReleaseCallback(
190 new CommitSubBatchPreReleaseCallback(this));
192 // PessimisticTransactionDB::Initialize
193 for (auto cf_ptr
: handles
) {
194 AddColumnFamily(cf_ptr
);
197 for (auto handle
: handles
) {
198 ColumnFamilyDescriptor cfd
;
199 Status s
= handle
->GetDescriptor(&cfd
);
203 s
= VerifyCFOptions(cfd
.options
);
209 // Re-enable compaction for the column families that initially had
210 // compaction enabled.
211 std::vector
<ColumnFamilyHandle
*> compaction_enabled_cf_handles
;
212 compaction_enabled_cf_handles
.reserve(compaction_enabled_cf_indices
.size());
213 for (auto index
: compaction_enabled_cf_indices
) {
214 compaction_enabled_cf_handles
.push_back(handles
[index
]);
217 Status s
= EnableAutoCompaction(compaction_enabled_cf_handles
);
222 // create 'real' transactions from recovered shell transactions
223 auto rtxns
= dbimpl
->recovered_transactions();
224 for (auto rtxn
: rtxns
) {
225 auto recovered_trx
= rtxn
.second
;
226 assert(recovered_trx
);
227 assert(recovered_trx
->batches_
.size() >= 1);
228 assert(recovered_trx
->name_
.length());
230 // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
231 // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
232 // two iterations is required.
233 if (recovered_trx
->unprepared_
) {
237 WriteOptions w_options
;
238 w_options
.sync
= true;
239 TransactionOptions t_options
;
241 auto first_log_number
= recovered_trx
->batches_
.begin()->second
.log_number_
;
242 auto first_seq
= recovered_trx
->batches_
.begin()->first
;
243 auto last_prepare_batch_cnt
=
244 recovered_trx
->batches_
.begin()->second
.batch_cnt_
;
246 Transaction
* real_trx
= BeginTransaction(w_options
, t_options
, nullptr);
249 static_cast_with_check
<WriteUnpreparedTxn
, Transaction
>(real_trx
);
251 real_trx
->SetLogNumber(first_log_number
);
252 real_trx
->SetId(first_seq
);
253 s
= real_trx
->SetName(recovered_trx
->name_
);
257 wupt
->prepare_batch_cnt_
= last_prepare_batch_cnt
;
259 for (auto batch
: recovered_trx
->batches_
) {
260 const auto& seq
= batch
.first
;
261 const auto& batch_info
= batch
.second
;
262 auto cnt
= batch_info
.batch_cnt_
? batch_info
.batch_cnt_
: 1;
263 assert(batch_info
.log_number_
);
265 for (size_t i
= 0; i
< cnt
; i
++) {
266 AddPrepared(seq
+ i
);
268 assert(wupt
->unprep_seqs_
.count(seq
) == 0);
269 wupt
->unprep_seqs_
[seq
] = cnt
;
270 KeySetBuilder
keyset_handler(wupt
,
271 txn_db_options_
.rollback_merge_operands
);
272 s
= batch_info
.batch_
->Iterate(&keyset_handler
);
279 wupt
->write_batch_
.Clear();
280 WriteBatchInternal::InsertNoop(wupt
->write_batch_
.GetWriteBatch());
282 real_trx
->SetState(Transaction::PREPARED
);
288 SequenceNumber prev_max
= max_evicted_seq_
;
289 SequenceNumber last_seq
= db_impl_
->GetLatestSequenceNumber();
290 AdvanceMaxEvictedSeq(prev_max
, last_seq
);
292 // Rollback unprepared transactions.
293 for (auto rtxn
: rtxns
) {
294 auto recovered_trx
= rtxn
.second
;
295 if (recovered_trx
->unprepared_
) {
296 s
= RollbackRecoveredTransaction(recovered_trx
);
305 dbimpl
->DeleteAllRecoveredTransactions();
311 Transaction
* WriteUnpreparedTxnDB::BeginTransaction(
312 const WriteOptions
& write_options
, const TransactionOptions
& txn_options
,
313 Transaction
* old_txn
) {
314 if (old_txn
!= nullptr) {
315 ReinitializeTransaction(old_txn
, write_options
, txn_options
);
318 return new WriteUnpreparedTxn(this, write_options
, txn_options
);
322 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
323 struct WriteUnpreparedTxnDB::IteratorState
{
324 IteratorState(WritePreparedTxnDB
* txn_db
, SequenceNumber sequence
,
325 std::shared_ptr
<ManagedSnapshot
> s
,
326 SequenceNumber min_uncommitted
, WriteUnpreparedTxn
* txn
)
327 : callback(txn_db
, sequence
, min_uncommitted
, txn
), snapshot(s
) {}
329 WriteUnpreparedTxnReadCallback callback
;
330 std::shared_ptr
<ManagedSnapshot
> snapshot
;
334 static void CleanupWriteUnpreparedTxnDBIterator(void* arg1
, void* /*arg2*/) {
335 delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState
*>(arg1
);
337 } // anonymous namespace
339 Iterator
* WriteUnpreparedTxnDB::NewIterator(const ReadOptions
& options
,
340 ColumnFamilyHandle
* column_family
,
341 WriteUnpreparedTxn
* txn
) {
342 // TODO(lth): Refactor so that this logic is shared with WritePrepared.
343 constexpr bool ALLOW_BLOB
= true;
344 constexpr bool ALLOW_REFRESH
= true;
345 std::shared_ptr
<ManagedSnapshot
> own_snapshot
= nullptr;
346 SequenceNumber snapshot_seq
;
347 SequenceNumber min_uncommitted
= 0;
348 if (options
.snapshot
!= nullptr) {
349 snapshot_seq
= options
.snapshot
->GetSequenceNumber();
351 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(
355 auto* snapshot
= GetSnapshot();
356 // We take a snapshot to make sure that the related data in the commit map
358 snapshot_seq
= snapshot
->GetSequenceNumber();
360 static_cast_with_check
<const SnapshotImpl
, const Snapshot
>(snapshot
)
362 own_snapshot
= std::make_shared
<ManagedSnapshot
>(db_impl_
, snapshot
);
364 assert(snapshot_seq
!= kMaxSequenceNumber
);
365 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
)->cfd();
367 new IteratorState(this, snapshot_seq
, own_snapshot
, min_uncommitted
, txn
);
369 db_impl_
->NewIteratorImpl(options
, cfd
, snapshot_seq
, &state
->callback
,
370 !ALLOW_BLOB
, !ALLOW_REFRESH
);
371 db_iter
->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator
, state
, nullptr);
375 Status
KeySetBuilder::PutCF(uint32_t cf
, const Slice
& key
,
376 const Slice
& /*val*/) {
377 txn_
->UpdateWriteKeySet(cf
, key
);
381 Status
KeySetBuilder::DeleteCF(uint32_t cf
, const Slice
& key
) {
382 txn_
->UpdateWriteKeySet(cf
, key
);
386 Status
KeySetBuilder::SingleDeleteCF(uint32_t cf
, const Slice
& key
) {
387 txn_
->UpdateWriteKeySet(cf
, key
);
391 Status
KeySetBuilder::MergeCF(uint32_t cf
, const Slice
& key
,
392 const Slice
& /*val*/) {
393 if (rollback_merge_operands_
) {
394 txn_
->UpdateWriteKeySet(cf
, key
);
399 } // namespace rocksdb
400 #endif // ROCKSDB_LITE