1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
12 #include "utilities/transactions/write_prepared_txn.h"
13 #include "utilities/transactions/write_unprepared_txn_db.h"
17 class WriteUnpreparedTxnDB
;
18 class WriteUnpreparedTxn
;
20 class WriteUnpreparedTxnReadCallback
: public ReadCallback
{
22 WriteUnpreparedTxnReadCallback(WritePreparedTxnDB
* db
,
23 SequenceNumber snapshot
,
24 SequenceNumber min_uncommitted
,
25 WriteUnpreparedTxn
* txn
)
26 // Pass our last uncommitted seq as the snapshot to the parent class to
27 // ensure that the parent will not prematurely filter out own writes. We
28 // will do the exact comparison agaisnt snapshots in IsVisibleFullCheck
30 : ReadCallback(CalcMaxVisibleSeq(txn
, snapshot
), min_uncommitted
),
33 wup_snapshot_(snapshot
) {}
35 virtual bool IsVisibleFullCheck(SequenceNumber seq
) override
;
37 bool CanReseekToSkip() override
{
38 return wup_snapshot_
== max_visible_seq_
;
39 // Otherwise our own writes uncommitted are in db, and the assumptions
40 // behind reseek optimizations are no longer valid.
43 // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
45 static SequenceNumber
CalcMaxVisibleSeq(WriteUnpreparedTxn
* txn
,
46 SequenceNumber snapshot_seq
) {
47 SequenceNumber max_unprepared
= CalcMaxUnpreparedSequenceNumber(txn
);
48 assert(snapshot_seq
< max_unprepared
|| max_unprepared
== 0 ||
49 snapshot_seq
== kMaxSequenceNumber
);
50 return std::max(max_unprepared
, snapshot_seq
);
52 static SequenceNumber
CalcMaxUnpreparedSequenceNumber(
53 WriteUnpreparedTxn
* txn
);
54 WritePreparedTxnDB
* db_
;
55 WriteUnpreparedTxn
* txn_
;
56 SequenceNumber wup_snapshot_
;
59 class WriteUnpreparedTxn
: public WritePreparedTxn
{
61 WriteUnpreparedTxn(WriteUnpreparedTxnDB
* db
,
62 const WriteOptions
& write_options
,
63 const TransactionOptions
& txn_options
);
65 virtual ~WriteUnpreparedTxn();
67 using TransactionBaseImpl::Put
;
68 virtual Status
Put(ColumnFamilyHandle
* column_family
, const Slice
& key
,
70 const bool assume_tracked
= false) override
;
71 virtual Status
Put(ColumnFamilyHandle
* column_family
, const SliceParts
& key
,
72 const SliceParts
& value
,
73 const bool assume_tracked
= false) override
;
75 using TransactionBaseImpl::Merge
;
76 virtual Status
Merge(ColumnFamilyHandle
* column_family
, const Slice
& key
,
78 const bool assume_tracked
= false) override
;
80 using TransactionBaseImpl::Delete
;
81 virtual Status
Delete(ColumnFamilyHandle
* column_family
, const Slice
& key
,
82 const bool assume_tracked
= false) override
;
83 virtual Status
Delete(ColumnFamilyHandle
* column_family
,
84 const SliceParts
& key
,
85 const bool assume_tracked
= false) override
;
87 using TransactionBaseImpl::SingleDelete
;
88 virtual Status
SingleDelete(ColumnFamilyHandle
* column_family
,
90 const bool assume_tracked
= false) override
;
91 virtual Status
SingleDelete(ColumnFamilyHandle
* column_family
,
92 const SliceParts
& key
,
93 const bool assume_tracked
= false) override
;
95 virtual Status
RebuildFromWriteBatch(WriteBatch
*) override
{
96 // This function was only useful for recovering prepared transactions, but
97 // is unused for write prepared because a transaction may consist of
98 // multiple write batches.
100 // If there are use cases outside of recovery that can make use of this,
101 // then support could be added.
102 return Status::NotSupported("Not supported for WriteUnprepared");
105 const std::map
<SequenceNumber
, size_t>& GetUnpreparedSequenceNumbers();
107 void UpdateWriteKeySet(uint32_t cfid
, const Slice
& key
);
110 void Initialize(const TransactionOptions
& txn_options
) override
;
112 Status
PrepareInternal() override
;
114 Status
CommitWithoutPrepareInternal() override
;
115 Status
CommitInternal() override
;
117 Status
RollbackInternal() override
;
119 // Get and GetIterator needs to be overridden so that a ReadCallback to
120 // handle read-your-own-write is used.
121 using Transaction::Get
;
122 virtual Status
Get(const ReadOptions
& options
,
123 ColumnFamilyHandle
* column_family
, const Slice
& key
,
124 PinnableSlice
* value
) override
;
126 using Transaction::GetIterator
;
127 virtual Iterator
* GetIterator(const ReadOptions
& options
) override
;
128 virtual Iterator
* GetIterator(const ReadOptions
& options
,
129 ColumnFamilyHandle
* column_family
) override
;
132 friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test
;
133 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test
;
134 friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test
;
135 friend class WriteUnpreparedTxnDB
;
137 Status
MaybeFlushWriteBatchToDB();
138 Status
FlushWriteBatchToDB(bool prepared
);
140 // For write unprepared, we check on every writebatch append to see if
141 // max_write_batch_size_ has been exceeded, and then call
142 // FlushWriteBatchToDB if so. This logic is encapsulated in
143 // MaybeFlushWriteBatchToDB.
144 size_t max_write_batch_size_
;
145 WriteUnpreparedTxnDB
* wupt_db_
;
147 // Ordered list of unprep_seq sequence numbers that we have already written
150 // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
151 // written by this transaction.
153 // Note that this contains both prepared and unprepared batches, since they
154 // are treated similarily in prepare heap/commit map, so it simplifies the
156 std::map
<SequenceNumber
, size_t> unprep_seqs_
;
158 // Set of keys that have written to that have already been written to DB
159 // (ie. not in write_batch_).
161 std::map
<uint32_t, std::vector
<std::string
>> write_set_keys_
;
164 } // namespace rocksdb
166 #endif // ROCKSDB_LITE