]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/transactions/write_unprepared_txn.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_unprepared_txn.h
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#pragma once
7
8#ifndef ROCKSDB_LITE
9
10#include <set>
11
12#include "utilities/transactions/write_prepared_txn.h"
13#include "utilities/transactions/write_unprepared_txn_db.h"
14
f67539c2 15namespace ROCKSDB_NAMESPACE {
11fdf7f2
TL
16
17class WriteUnpreparedTxnDB;
18class WriteUnpreparedTxn;
19
f67539c2
TL
20// WriteUnprepared transactions needs to be able to read their own uncommitted
21// writes, and supporting this requires some careful consideration. Because
22// writes in the current transaction may be flushed to DB already, we cannot
23// rely on the contents of WriteBatchWithIndex to determine whether a key should
24// be visible or not, so we have to remember to check the DB for any uncommitted
25// keys that should be visible to us. First, we will need to change the seek to
26// snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
27// Any key greater than max_visible_seq should not be visible because they
28// cannot be unprepared by the current transaction and they are not in its
29// snapshot.
30//
31// When we seek to max_visible_seq, one of these cases will happen:
32// 1. We hit a unprepared key from the current transaction.
33// 2. We hit a unprepared key from the another transaction.
34// 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
35// 4. We hit a committed key with seq <= snap_seq.
36//
37// IsVisibleFullCheck handles all cases correctly.
38//
39// Other notes:
40// Note that max_visible_seq is only calculated once at iterator construction
41// time, meaning if the same transaction is adding more unprep seqs through
42// writes during iteration, these newer writes may not be visible. This is not a
43// problem for MySQL though because it avoids modifying the index as it is
44// scanning through it to avoid the Halloween Problem. Instead, it scans the
45// index once up front, and modifies based on a temporary copy.
46//
47// In DBIter, there is a "reseek" optimization if the iterator skips over too
48// many keys. However, this assumes that the reseek seeks exactly to the
49// required key. In write unprepared, even after seeking directly to
50// max_visible_seq, some iteration may be required before hitting a visible key,
51// and special precautions must be taken to avoid performing another reseek,
52// leading to an infinite loop.
53//
11fdf7f2
TL
54class WriteUnpreparedTxnReadCallback : public ReadCallback {
55 public:
f67539c2
TL
56 WriteUnpreparedTxnReadCallback(
57 WritePreparedTxnDB* db, SequenceNumber snapshot,
58 SequenceNumber min_uncommitted,
59 const std::map<SequenceNumber, size_t>& unprep_seqs,
60 SnapshotBackup backed_by_snapshot)
494da23a
TL
61 // Pass our last uncommitted seq as the snapshot to the parent class to
62 // ensure that the parent will not prematurely filter out own writes. We
f67539c2 63 // will do the exact comparison against snapshots in IsVisibleFullCheck
494da23a 64 // override.
f67539c2 65 : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted),
494da23a 66 db_(db),
f67539c2
TL
67 unprep_seqs_(unprep_seqs),
68 wup_snapshot_(snapshot),
69 backed_by_snapshot_(backed_by_snapshot) {
70 (void)backed_by_snapshot_; // to silence unused private field warning
71 }
72
73 virtual ~WriteUnpreparedTxnReadCallback() {
74 // If it is not backed by snapshot, the caller must check validity
75 assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
76 }
494da23a
TL
77
78 virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
79
f67539c2
TL
80 inline bool valid() {
81 valid_checked_ = true;
82 return snap_released_ == false;
494da23a 83 }
11fdf7f2 84
f67539c2
TL
85 void Refresh(SequenceNumber seq) override {
86 max_visible_seq_ = std::max(max_visible_seq_, seq);
87 wup_snapshot_ = seq;
88 }
89
90 static SequenceNumber CalcMaxVisibleSeq(
91 const std::map<SequenceNumber, size_t>& unprep_seqs,
92 SequenceNumber snapshot_seq) {
93 SequenceNumber max_unprepared = 0;
94 if (unprep_seqs.size()) {
95 max_unprepared =
96 unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
97 }
494da23a
TL
98 return std::max(max_unprepared, snapshot_seq);
99 }
f67539c2
TL
100
101 private:
11fdf7f2 102 WritePreparedTxnDB* db_;
f67539c2 103 const std::map<SequenceNumber, size_t>& unprep_seqs_;
494da23a 104 SequenceNumber wup_snapshot_;
f67539c2
TL
105 // Whether max_visible_seq_ is backed by a snapshot
106 const SnapshotBackup backed_by_snapshot_;
107 bool snap_released_ = false;
108 // Safety check to ensure that the caller has checked invalid statuses
109 bool valid_checked_ = false;
11fdf7f2
TL
110};
111
112class WriteUnpreparedTxn : public WritePreparedTxn {
113 public:
114 WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
115 const WriteOptions& write_options,
116 const TransactionOptions& txn_options);
117
118 virtual ~WriteUnpreparedTxn();
119
120 using TransactionBaseImpl::Put;
121 virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
494da23a
TL
122 const Slice& value,
123 const bool assume_tracked = false) override;
11fdf7f2 124 virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
494da23a
TL
125 const SliceParts& value,
126 const bool assume_tracked = false) override;
11fdf7f2
TL
127
128 using TransactionBaseImpl::Merge;
129 virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
494da23a
TL
130 const Slice& value,
131 const bool assume_tracked = false) override;
11fdf7f2
TL
132
133 using TransactionBaseImpl::Delete;
494da23a
TL
134 virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
135 const bool assume_tracked = false) override;
11fdf7f2 136 virtual Status Delete(ColumnFamilyHandle* column_family,
494da23a
TL
137 const SliceParts& key,
138 const bool assume_tracked = false) override;
11fdf7f2
TL
139
140 using TransactionBaseImpl::SingleDelete;
141 virtual Status SingleDelete(ColumnFamilyHandle* column_family,
494da23a
TL
142 const Slice& key,
143 const bool assume_tracked = false) override;
11fdf7f2 144 virtual Status SingleDelete(ColumnFamilyHandle* column_family,
494da23a
TL
145 const SliceParts& key,
146 const bool assume_tracked = false) override;
11fdf7f2 147
f67539c2
TL
148 // In WriteUnprepared, untracked writes will break snapshot validation logic.
149 // Snapshot validation will only check the largest sequence number of a key to
150 // see if it was committed or not. However, an untracked unprepared write will
151 // hide smaller committed sequence numbers.
152 //
153 // TODO(lth): Investigate whether it is worth having snapshot validation
154 // validate all values larger than snap_seq. Otherwise, we should return
155 // Status::NotSupported for untracked writes.
11fdf7f2 156
f67539c2 157 virtual Status RebuildFromWriteBatch(WriteBatch*) override;
11fdf7f2 158
f67539c2
TL
159 virtual uint64_t GetLastLogNumber() const override {
160 return last_log_number_;
161 }
162
163 void RemoveActiveIterator(Iterator* iter) {
164 active_iterators_.erase(
165 std::remove(active_iterators_.begin(), active_iterators_.end(), iter),
166 active_iterators_.end());
167 }
11fdf7f2
TL
168
169 protected:
170 void Initialize(const TransactionOptions& txn_options) override;
171
172 Status PrepareInternal() override;
173
174 Status CommitWithoutPrepareInternal() override;
175 Status CommitInternal() override;
176
177 Status RollbackInternal() override;
178
f67539c2
TL
179 void Clear() override;
180
181 void SetSavePoint() override;
182 Status RollbackToSavePoint() override;
183 Status PopSavePoint() override;
184
11fdf7f2
TL
185 // Get and GetIterator needs to be overridden so that a ReadCallback to
186 // handle read-your-own-write is used.
187 using Transaction::Get;
188 virtual Status Get(const ReadOptions& options,
189 ColumnFamilyHandle* column_family, const Slice& key,
190 PinnableSlice* value) override;
191
f67539c2
TL
192 using Transaction::MultiGet;
193 virtual void MultiGet(const ReadOptions& options,
194 ColumnFamilyHandle* column_family,
195 const size_t num_keys, const Slice* keys,
196 PinnableSlice* values, Status* statuses,
197 const bool sorted_input = false) override;
198
11fdf7f2
TL
199 using Transaction::GetIterator;
200 virtual Iterator* GetIterator(const ReadOptions& options) override;
201 virtual Iterator* GetIterator(const ReadOptions& options,
202 ColumnFamilyHandle* column_family) override;
203
f67539c2
TL
204 virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
205 const Slice& key,
206 SequenceNumber* tracked_at_seq) override;
207
11fdf7f2
TL
208 private:
209 friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
210 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
211 friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
212 friend class WriteUnpreparedTxnDB;
213
f67539c2 214 const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
20effc67 215 Status WriteRollbackKeys(const LockTracker& tracked_keys,
f67539c2
TL
216 WriteBatchWithIndex* rollback_batch,
217 ReadCallback* callback, const ReadOptions& roptions);
218
11fdf7f2
TL
219 Status MaybeFlushWriteBatchToDB();
220 Status FlushWriteBatchToDB(bool prepared);
f67539c2
TL
221 Status FlushWriteBatchToDBInternal(bool prepared);
222 Status FlushWriteBatchWithSavePointToDB();
223 Status RollbackToSavePointInternal();
224 Status HandleWrite(std::function<Status()> do_write);
11fdf7f2
TL
225
226 // For write unprepared, we check on every writebatch append to see if
f67539c2 227 // write_batch_flush_threshold_ has been exceeded, and then call
11fdf7f2
TL
228 // FlushWriteBatchToDB if so. This logic is encapsulated in
229 // MaybeFlushWriteBatchToDB.
f67539c2 230 int64_t write_batch_flush_threshold_;
11fdf7f2
TL
231 WriteUnpreparedTxnDB* wupt_db_;
232
233 // Ordered list of unprep_seq sequence numbers that we have already written
234 // to DB.
235 //
236 // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
237 // written by this transaction.
238 //
239 // Note that this contains both prepared and unprepared batches, since they
240 // are treated similarily in prepare heap/commit map, so it simplifies the
241 // commit callbacks.
242 std::map<SequenceNumber, size_t> unprep_seqs_;
243
f67539c2
TL
244 uint64_t last_log_number_;
245
246 // Recovered transactions have tracked_keys_ populated, but are not actually
247 // locked for efficiency reasons. For recovered transactions, skip unlocking
248 // keys when transaction ends.
249 bool recovered_txn_;
250
251 // Track the largest sequence number at which we performed snapshot
252 // validation. If snapshot validation was skipped because no snapshot was set,
253 // then this is set to GetLastPublishedSequence. This value is useful because
254 // it means that for keys that have unprepared seqnos, we can guarantee that
255 // no committed keys by other transactions can exist between
256 // largest_validated_seq_ and max_unprep_seq. See
257 // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
258 // necessary for iterator Prev().
259 //
260 // Currently this value only increases during the lifetime of a transaction,
261 // but in some cases, we should be able to restore the previously largest
262 // value when calling RollbackToSavepoint.
263 SequenceNumber largest_validated_seq_;
264
f67539c2
TL
265 struct SavePoint {
266 // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
267 // used during RollbackToSavepoint to determine visibility when restoring
268 // old values.
269 //
270 // TODO(lth): Since all unprep_seqs_ sets further down the stack must be
271 // subsets, this can potentially be deduplicated by just storing set
272 // difference. Investigate if this is worth it.
273 std::map<SequenceNumber, size_t> unprep_seqs_;
274
275 // This snapshot will be used to read keys at this savepoint if we call
276 // RollbackToSavePoint.
277 std::unique_ptr<ManagedSnapshot> snapshot_;
278
279 SavePoint(const std::map<SequenceNumber, size_t>& seqs,
280 ManagedSnapshot* snapshot)
281 : unprep_seqs_(seqs), snapshot_(snapshot){};
282 };
283
284 // We have 3 data structures holding savepoint information:
285 // 1. TransactionBaseImpl::save_points_
286 // 2. WriteUnpreparedTxn::flushed_save_points_
287 // 3. WriteUnpreparecTxn::unflushed_save_points_
288 //
289 // TransactionBaseImpl::save_points_ holds information about all write
290 // batches, including the current in-memory write_batch_, or unprepared
291 // batches that have been written out. Its responsibility is just to track
292 // which keys have been modified in every savepoint.
293 //
294 // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints
295 // set on unprepared batches that have already flushed. It holds the snapshot
296 // and unprep_seqs at that savepoint, so that the rollback process can
297 // determine which keys were visible at that point in time.
298 //
299 // WriteUnpreparecTxn::unflushed_save_points_ holds information about
300 // savepoints on the current in-memory write_batch_. It simply records the
301 // size of the write batch at every savepoint.
302 //
303 // TODO(lth): Remove the redundancy between save_point_boundaries_ and
304 // write_batch_.save_points_.
305 //
306 // Based on this information, here are some invariants:
307 // size(unflushed_save_points_) = size(write_batch_.save_points_)
308 // size(flushed_save_points_) + size(unflushed_save_points_)
309 // = size(save_points_)
310 //
311 std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>>
312 flushed_save_points_;
313 std::unique_ptr<autovector<size_t>> unflushed_save_points_;
314
315 // It is currently unsafe to flush a write batch if there are active iterators
316 // created from this transaction. This is because we use WriteBatchWithIndex
317 // to do merging reads from the DB and the write batch. If we flush the write
318 // batch, it is possible that the delta iterator on the iterator will point to
319 // invalid memory.
320 std::vector<Iterator*> active_iterators_;
321
322 // Untracked keys that we have to rollback.
323 //
324 // TODO(lth): Currently we we do not record untracked keys per-savepoint.
325 // This means that when rolling back to savepoints, we have to check all
326 // keys in the current transaction for rollback. Note that this is only
327 // inefficient, but still correct because we take a snapshot at every
328 // savepoint, and we will use that snapshot to construct the rollback batch.
329 // The rollback batch will then contain a reissue of the same marker.
11fdf7f2 330 //
f67539c2
TL
331 // A more optimal solution would be to only check keys changed since the
332 // last savepoint. Also, it may make sense to merge this into tracked_keys_
333 // and differentiate between tracked but not locked keys to avoid having two
334 // very similar data structures.
20effc67 335 using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>;
f67539c2 336 KeySet untracked_keys_;
11fdf7f2
TL
337};
338
f67539c2 339} // namespace ROCKSDB_NAMESPACE
11fdf7f2
TL
340
341#endif // ROCKSDB_LITE