]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "utilities/transactions/write_unprepared_txn.h" | |
f67539c2 | 9 | #include "db/db_impl/db_impl.h" |
11fdf7f2 TL |
10 | #include "util/cast_util.h" |
11 | #include "utilities/transactions/write_unprepared_txn_db.h" | |
12 | ||
f67539c2 | 13 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 | 14 | |
494da23a | 15 | bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) { |
11fdf7f2 TL |
16 | // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is |
17 | // in unprep_seqs, we have to check if seq is equal to prep_seq or any of | |
18 | // the prepare_batch_cnt seq nums after it. | |
19 | // | |
20 | // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is | |
21 | // large. | |
f67539c2 | 22 | for (const auto& it : unprep_seqs_) { |
11fdf7f2 TL |
23 | if (it.first <= seq && seq < it.first + it.second) { |
24 | return true; | |
25 | } | |
26 | } | |
27 | ||
f67539c2 TL |
28 | bool snap_released = false; |
29 | auto ret = | |
30 | db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_, &snap_released); | |
31 | assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot); | |
32 | snap_released_ |= snap_released; | |
33 | return ret; | |
11fdf7f2 TL |
34 | } |
35 | ||
36 | WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, | |
37 | const WriteOptions& write_options, | |
38 | const TransactionOptions& txn_options) | |
f67539c2 TL |
39 | : WritePreparedTxn(txn_db, write_options, txn_options), |
40 | wupt_db_(txn_db), | |
41 | last_log_number_(0), | |
42 | recovered_txn_(false), | |
43 | largest_validated_seq_(0) { | |
44 | if (txn_options.write_batch_flush_threshold < 0) { | |
45 | write_batch_flush_threshold_ = | |
46 | txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold; | |
47 | } else { | |
48 | write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold; | |
49 | } | |
11fdf7f2 TL |
50 | } |
51 | ||
52 | WriteUnpreparedTxn::~WriteUnpreparedTxn() { | |
53 | if (!unprep_seqs_.empty()) { | |
54 | assert(log_number_ > 0); | |
55 | assert(GetId() > 0); | |
56 | assert(!name_.empty()); | |
57 | ||
58 | // We should rollback regardless of GetState, but some unit tests that | |
59 | // test crash recovery run the destructor assuming that rollback does not | |
60 | // happen, so that rollback during recovery can be exercised. | |
f67539c2 TL |
61 | if (GetState() == STARTED || GetState() == LOCKS_STOLEN) { |
62 | auto s = RollbackInternal(); | |
11fdf7f2 | 63 | assert(s.ok()); |
f67539c2 TL |
64 | if (!s.ok()) { |
65 | ROCKS_LOG_FATAL( | |
66 | wupt_db_->info_log_, | |
67 | "Rollback of WriteUnprepared transaction failed in destructor: %s", | |
68 | s.ToString().c_str()); | |
69 | } | |
11fdf7f2 TL |
70 | dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( |
71 | log_number_); | |
72 | } | |
73 | } | |
f67539c2 | 74 | |
20effc67 | 75 | // Clear the tracked locks so that ~PessimisticTransaction does not |
f67539c2 TL |
76 | // try to unlock keys for recovered transactions. |
77 | if (recovered_txn_) { | |
20effc67 | 78 | tracked_locks_->Clear(); |
f67539c2 | 79 | } |
11fdf7f2 TL |
80 | } |
81 | ||
82 | void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { | |
83 | PessimisticTransaction::Initialize(txn_options); | |
f67539c2 TL |
84 | if (txn_options.write_batch_flush_threshold < 0) { |
85 | write_batch_flush_threshold_ = | |
86 | txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold; | |
87 | } else { | |
88 | write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold; | |
89 | } | |
90 | ||
11fdf7f2 | 91 | unprep_seqs_.clear(); |
f67539c2 TL |
92 | flushed_save_points_.reset(nullptr); |
93 | unflushed_save_points_.reset(nullptr); | |
94 | recovered_txn_ = false; | |
95 | largest_validated_seq_ = 0; | |
96 | assert(active_iterators_.empty()); | |
97 | active_iterators_.clear(); | |
98 | untracked_keys_.clear(); | |
99 | } | |
100 | ||
101 | Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) { | |
102 | Status s; | |
103 | if (active_iterators_.empty()) { | |
104 | s = MaybeFlushWriteBatchToDB(); | |
105 | if (!s.ok()) { | |
106 | return s; | |
107 | } | |
108 | } | |
109 | s = do_write(); | |
110 | if (s.ok()) { | |
111 | if (snapshot_) { | |
112 | largest_validated_seq_ = | |
113 | std::max(largest_validated_seq_, snapshot_->GetSequenceNumber()); | |
114 | } else { | |
115 | // TODO(lth): We should use the same number as tracked_at_seq in TryLock, | |
116 | // because what is actually being tracked is the sequence number at which | |
117 | // this key was locked at. | |
118 | largest_validated_seq_ = db_impl_->GetLastPublishedSequence(); | |
119 | } | |
120 | } | |
121 | return s; | |
11fdf7f2 TL |
122 | } |
123 | ||
124 | Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, | |
494da23a TL |
125 | const Slice& key, const Slice& value, |
126 | const bool assume_tracked) { | |
f67539c2 TL |
127 | return HandleWrite([&]() { |
128 | return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); | |
129 | }); | |
11fdf7f2 TL |
130 | } |
131 | ||
132 | Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, | |
494da23a TL |
133 | const SliceParts& key, const SliceParts& value, |
134 | const bool assume_tracked) { | |
f67539c2 TL |
135 | return HandleWrite([&]() { |
136 | return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); | |
137 | }); | |
11fdf7f2 TL |
138 | } |
139 | ||
140 | Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family, | |
494da23a TL |
141 | const Slice& key, const Slice& value, |
142 | const bool assume_tracked) { | |
f67539c2 TL |
143 | return HandleWrite([&]() { |
144 | return TransactionBaseImpl::Merge(column_family, key, value, | |
145 | assume_tracked); | |
146 | }); | |
11fdf7f2 TL |
147 | } |
148 | ||
149 | Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, | |
494da23a | 150 | const Slice& key, const bool assume_tracked) { |
f67539c2 TL |
151 | return HandleWrite([&]() { |
152 | return TransactionBaseImpl::Delete(column_family, key, assume_tracked); | |
153 | }); | |
11fdf7f2 TL |
154 | } |
155 | ||
156 | Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, | |
494da23a TL |
157 | const SliceParts& key, |
158 | const bool assume_tracked) { | |
f67539c2 TL |
159 | return HandleWrite([&]() { |
160 | return TransactionBaseImpl::Delete(column_family, key, assume_tracked); | |
161 | }); | |
11fdf7f2 TL |
162 | } |
163 | ||
164 | Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, | |
494da23a TL |
165 | const Slice& key, |
166 | const bool assume_tracked) { | |
f67539c2 TL |
167 | return HandleWrite([&]() { |
168 | return TransactionBaseImpl::SingleDelete(column_family, key, | |
169 | assume_tracked); | |
170 | }); | |
11fdf7f2 TL |
171 | } |
172 | ||
173 | Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, | |
494da23a TL |
174 | const SliceParts& key, |
175 | const bool assume_tracked) { | |
f67539c2 TL |
176 | return HandleWrite([&]() { |
177 | return TransactionBaseImpl::SingleDelete(column_family, key, | |
178 | assume_tracked); | |
179 | }); | |
180 | } | |
181 | ||
182 | // WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For | |
183 | // WriteUnprepared, the write batches have already been written into the | |
184 | // database during WAL replay, so all we have to do is just to "retrack" the key | |
185 | // so that rollbacks are possible. | |
186 | // | |
187 | // Calling TryLock instead of TrackKey is also possible, but as an optimization, | |
188 | // recovered transactions do not hold locks on their keys. This follows the | |
189 | // implementation in PessimisticTransactionDB::Initialize where we set | |
190 | // skip_concurrency_control to true. | |
191 | Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) { | |
192 | struct TrackKeyHandler : public WriteBatch::Handler { | |
193 | WriteUnpreparedTxn* txn_; | |
194 | bool rollback_merge_operands_; | |
195 | ||
196 | TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands) | |
197 | : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {} | |
198 | ||
199 | Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { | |
200 | txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, | |
201 | false /* read_only */, true /* exclusive */); | |
202 | return Status::OK(); | |
203 | } | |
204 | ||
205 | Status DeleteCF(uint32_t cf, const Slice& key) override { | |
206 | txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, | |
207 | false /* read_only */, true /* exclusive */); | |
208 | return Status::OK(); | |
209 | } | |
210 | ||
211 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { | |
212 | txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, | |
213 | false /* read_only */, true /* exclusive */); | |
214 | return Status::OK(); | |
215 | } | |
216 | ||
217 | Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { | |
218 | if (rollback_merge_operands_) { | |
219 | txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber, | |
220 | false /* read_only */, true /* exclusive */); | |
221 | } | |
222 | return Status::OK(); | |
223 | } | |
224 | ||
225 | // Recovered batches do not contain 2PC markers. | |
226 | Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } | |
227 | ||
228 | Status MarkEndPrepare(const Slice&) override { | |
229 | return Status::InvalidArgument(); | |
230 | } | |
231 | ||
232 | Status MarkNoop(bool) override { return Status::InvalidArgument(); } | |
233 | ||
234 | Status MarkCommit(const Slice&) override { | |
235 | return Status::InvalidArgument(); | |
236 | } | |
237 | ||
238 | Status MarkRollback(const Slice&) override { | |
239 | return Status::InvalidArgument(); | |
240 | } | |
241 | }; | |
242 | ||
243 | TrackKeyHandler handler(this, | |
244 | wupt_db_->txn_db_options_.rollback_merge_operands); | |
245 | return wb->Iterate(&handler); | |
11fdf7f2 TL |
246 | } |
247 | ||
248 | Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { | |
249 | const bool kPrepared = true; | |
250 | Status s; | |
f67539c2 TL |
251 | if (write_batch_flush_threshold_ > 0 && |
252 | write_batch_.GetWriteBatch()->Count() > 0 && | |
253 | write_batch_.GetDataSize() > | |
254 | static_cast<size_t>(write_batch_flush_threshold_)) { | |
11fdf7f2 TL |
255 | assert(GetState() != PREPARED); |
256 | s = FlushWriteBatchToDB(!kPrepared); | |
11fdf7f2 TL |
257 | } |
258 | return s; | |
259 | } | |
260 | ||
f67539c2 TL |
261 | Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { |
262 | // If the current write batch contains savepoints, then some special handling | |
263 | // is required so that RollbackToSavepoint can work. | |
264 | // | |
265 | // RollbackToSavepoint is not supported after Prepare() is called, so only do | |
266 | // this for unprepared batches. | |
267 | if (!prepared && unflushed_save_points_ != nullptr && | |
268 | !unflushed_save_points_->empty()) { | |
269 | return FlushWriteBatchWithSavePointToDB(); | |
270 | } | |
271 | ||
272 | return FlushWriteBatchToDBInternal(prepared); | |
11fdf7f2 TL |
273 | } |
274 | ||
f67539c2 | 275 | Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) { |
11fdf7f2 | 276 | if (name_.empty()) { |
f67539c2 TL |
277 | assert(!prepared); |
278 | #ifndef NDEBUG | |
279 | static std::atomic_ullong autogen_id{0}; | |
280 | // To avoid changing all tests to call SetName, just autogenerate one. | |
281 | if (wupt_db_->txn_db_options_.autogenerate_name) { | |
20effc67 TL |
282 | auto s = |
283 | SetName(std::string("autoxid") + ToString(autogen_id.fetch_add(1))); | |
284 | assert(s.ok()); | |
f67539c2 TL |
285 | } else |
286 | #endif | |
287 | { | |
288 | return Status::InvalidArgument("Cannot write to DB without SetName."); | |
289 | } | |
11fdf7f2 TL |
290 | } |
291 | ||
f67539c2 TL |
292 | struct UntrackedKeyHandler : public WriteBatch::Handler { |
293 | WriteUnpreparedTxn* txn_; | |
294 | bool rollback_merge_operands_; | |
295 | ||
296 | UntrackedKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands) | |
297 | : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {} | |
298 | ||
299 | Status AddUntrackedKey(uint32_t cf, const Slice& key) { | |
300 | auto str = key.ToString(); | |
20effc67 TL |
301 | PointLockStatus lock_status = |
302 | txn_->tracked_locks_->GetPointLockStatus(cf, str); | |
303 | if (!lock_status.locked) { | |
f67539c2 TL |
304 | txn_->untracked_keys_[cf].push_back(str); |
305 | } | |
306 | return Status::OK(); | |
307 | } | |
308 | ||
309 | Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { | |
310 | return AddUntrackedKey(cf, key); | |
311 | } | |
312 | ||
313 | Status DeleteCF(uint32_t cf, const Slice& key) override { | |
314 | return AddUntrackedKey(cf, key); | |
315 | } | |
316 | ||
317 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { | |
318 | return AddUntrackedKey(cf, key); | |
319 | } | |
320 | ||
321 | Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { | |
322 | if (rollback_merge_operands_) { | |
323 | return AddUntrackedKey(cf, key); | |
324 | } | |
325 | return Status::OK(); | |
326 | } | |
327 | ||
328 | // The only expected 2PC marker is the initial Noop marker. | |
329 | Status MarkNoop(bool empty_batch) override { | |
330 | return empty_batch ? Status::OK() : Status::InvalidArgument(); | |
331 | } | |
332 | ||
333 | Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } | |
334 | ||
335 | Status MarkEndPrepare(const Slice&) override { | |
336 | return Status::InvalidArgument(); | |
337 | } | |
338 | ||
339 | Status MarkCommit(const Slice&) override { | |
340 | return Status::InvalidArgument(); | |
341 | } | |
342 | ||
343 | Status MarkRollback(const Slice&) override { | |
344 | return Status::InvalidArgument(); | |
345 | } | |
346 | }; | |
347 | ||
348 | UntrackedKeyHandler handler( | |
11fdf7f2 | 349 | this, wupt_db_->txn_db_options_.rollback_merge_operands); |
f67539c2 | 350 | auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&handler); |
11fdf7f2 | 351 | assert(s.ok()); |
11fdf7f2 TL |
352 | |
353 | // TODO(lth): Reduce duplicate code with WritePrepared prepare logic. | |
354 | WriteOptions write_options = write_options_; | |
355 | write_options.disableWAL = false; | |
356 | const bool WRITE_AFTER_COMMIT = true; | |
494da23a | 357 | const bool first_prepare_batch = log_number_ == 0; |
11fdf7f2 | 358 | // MarkEndPrepare will change Noop marker to the appropriate marker. |
20effc67 TL |
359 | s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), |
360 | name_, !WRITE_AFTER_COMMIT, !prepared); | |
361 | assert(s.ok()); | |
11fdf7f2 TL |
362 | // For each duplicate key we account for a new sub-batch |
363 | prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); | |
364 | // AddPrepared better to be called in the pre-release callback otherwise there | |
365 | // is a non-zero chance of max advancing prepare_seq and readers assume the | |
366 | // data as committed. | |
367 | // Also having it in the PreReleaseCallback allows in-order addition of | |
494da23a TL |
368 | // prepared entries to PreparedHeap and hence enables an optimization. Refer |
369 | // to SmallestUnCommittedSeq for more details. | |
11fdf7f2 | 370 | AddPreparedCallback add_prepared_callback( |
494da23a TL |
371 | wpt_db_, db_impl_, prepare_batch_cnt_, |
372 | db_impl_->immutable_db_options().two_write_queues, first_prepare_batch); | |
11fdf7f2 TL |
373 | const bool DISABLE_MEMTABLE = true; |
374 | uint64_t seq_used = kMaxSequenceNumber; | |
375 | // log_number_ should refer to the oldest log containing uncommitted data | |
376 | // from the current transaction. This means that if log_number_ is set, | |
377 | // WriteImpl should not overwrite that value, so set log_used to nullptr if | |
378 | // log_number_ is already set. | |
11fdf7f2 | 379 | s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), |
f67539c2 TL |
380 | /*callback*/ nullptr, &last_log_number_, |
381 | /*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used, | |
382 | prepare_batch_cnt_, &add_prepared_callback); | |
383 | if (log_number_ == 0) { | |
384 | log_number_ = last_log_number_; | |
385 | } | |
11fdf7f2 TL |
386 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
387 | auto prepare_seq = seq_used; | |
388 | ||
389 | // Only call SetId if it hasn't been set yet. | |
390 | if (GetId() == 0) { | |
391 | SetId(prepare_seq); | |
392 | } | |
393 | // unprep_seqs_ will also contain prepared seqnos since they are treated in | |
394 | // the same way in the prepare/commit callbacks. See the comment on the | |
395 | // definition of unprep_seqs_. | |
396 | unprep_seqs_[prepare_seq] = prepare_batch_cnt_; | |
397 | ||
398 | // Reset transaction state. | |
399 | if (!prepared) { | |
400 | prepare_batch_cnt_ = 0; | |
f67539c2 TL |
401 | const bool kClear = true; |
402 | TransactionBaseImpl::InitWriteBatch(kClear); | |
11fdf7f2 TL |
403 | } |
404 | ||
405 | return s; | |
406 | } | |
407 | ||
f67539c2 TL |
408 | Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() { |
409 | assert(unflushed_save_points_ != nullptr && | |
410 | unflushed_save_points_->size() > 0); | |
411 | assert(save_points_ != nullptr && save_points_->size() > 0); | |
412 | assert(save_points_->size() >= unflushed_save_points_->size()); | |
413 | ||
414 | // Handler class for creating an unprepared batch from a savepoint. | |
415 | struct SavePointBatchHandler : public WriteBatch::Handler { | |
416 | WriteBatchWithIndex* wb_; | |
417 | const std::map<uint32_t, ColumnFamilyHandle*>& handles_; | |
418 | ||
419 | SavePointBatchHandler( | |
420 | WriteBatchWithIndex* wb, | |
421 | const std::map<uint32_t, ColumnFamilyHandle*>& handles) | |
422 | : wb_(wb), handles_(handles) {} | |
423 | ||
424 | Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override { | |
425 | return wb_->Put(handles_.at(cf), key, value); | |
426 | } | |
427 | ||
428 | Status DeleteCF(uint32_t cf, const Slice& key) override { | |
429 | return wb_->Delete(handles_.at(cf), key); | |
430 | } | |
431 | ||
432 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { | |
433 | return wb_->SingleDelete(handles_.at(cf), key); | |
434 | } | |
435 | ||
436 | Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override { | |
437 | return wb_->Merge(handles_.at(cf), key, value); | |
438 | } | |
439 | ||
440 | // The only expected 2PC marker is the initial Noop marker. | |
441 | Status MarkNoop(bool empty_batch) override { | |
442 | return empty_batch ? Status::OK() : Status::InvalidArgument(); | |
443 | } | |
444 | ||
445 | Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } | |
446 | ||
447 | Status MarkEndPrepare(const Slice&) override { | |
448 | return Status::InvalidArgument(); | |
449 | } | |
450 | ||
451 | Status MarkCommit(const Slice&) override { | |
452 | return Status::InvalidArgument(); | |
453 | } | |
454 | ||
455 | Status MarkRollback(const Slice&) override { | |
456 | return Status::InvalidArgument(); | |
457 | } | |
458 | }; | |
459 | ||
460 | // The comparator of the default cf is passed in, similar to the | |
461 | // initialization of TransactionBaseImpl::write_batch_. This comparator is | |
462 | // only used if the write batch encounters an invalid cf id, and falls back to | |
463 | // this comparator. | |
464 | WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0, | |
465 | true, 0); | |
466 | // Swap with write_batch_ so that wb contains the complete write batch. The | |
467 | // actual write batch that will be flushed to DB will be built in | |
468 | // write_batch_, and will be read by FlushWriteBatchToDBInternal. | |
469 | std::swap(wb, write_batch_); | |
470 | TransactionBaseImpl::InitWriteBatch(); | |
471 | ||
472 | size_t prev_boundary = WriteBatchInternal::kHeader; | |
473 | const bool kPrepared = true; | |
474 | for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) { | |
475 | bool trailing_batch = i == unflushed_save_points_->size(); | |
476 | SavePointBatchHandler sp_handler(&write_batch_, | |
477 | *wupt_db_->GetCFHandleMap().get()); | |
478 | size_t curr_boundary = trailing_batch ? wb.GetWriteBatch()->GetDataSize() | |
479 | : (*unflushed_save_points_)[i]; | |
480 | ||
481 | // Construct the partial write batch up to the savepoint. | |
482 | // | |
483 | // Theoretically, a memcpy between the write batches should be sufficient | |
484 | // since the rewriting into the batch should produce the exact same byte | |
485 | // representation. Rebuilding the WriteBatchWithIndex index is still | |
486 | // necessary though, and would imply doing two passes over the batch though. | |
487 | Status s = WriteBatchInternal::Iterate(wb.GetWriteBatch(), &sp_handler, | |
488 | prev_boundary, curr_boundary); | |
489 | if (!s.ok()) { | |
490 | return s; | |
491 | } | |
492 | ||
493 | if (write_batch_.GetWriteBatch()->Count() > 0) { | |
494 | // Flush the write batch. | |
495 | s = FlushWriteBatchToDBInternal(!kPrepared); | |
496 | if (!s.ok()) { | |
497 | return s; | |
498 | } | |
499 | } | |
500 | ||
501 | if (!trailing_batch) { | |
502 | if (flushed_save_points_ == nullptr) { | |
503 | flushed_save_points_.reset( | |
504 | new autovector<WriteUnpreparedTxn::SavePoint>()); | |
505 | } | |
506 | flushed_save_points_->emplace_back( | |
507 | unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot())); | |
508 | } | |
509 | ||
510 | prev_boundary = curr_boundary; | |
511 | const bool kClear = true; | |
512 | TransactionBaseImpl::InitWriteBatch(kClear); | |
513 | } | |
514 | ||
515 | unflushed_save_points_->clear(); | |
516 | return Status::OK(); | |
517 | } | |
518 | ||
11fdf7f2 TL |
519 | Status WriteUnpreparedTxn::PrepareInternal() { |
520 | const bool kPrepared = true; | |
521 | return FlushWriteBatchToDB(kPrepared); | |
522 | } | |
523 | ||
524 | Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() { | |
525 | if (unprep_seqs_.empty()) { | |
526 | assert(log_number_ == 0); | |
527 | assert(GetId() == 0); | |
528 | return WritePreparedTxn::CommitWithoutPrepareInternal(); | |
529 | } | |
530 | ||
531 | // TODO(lth): We should optimize commit without prepare to not perform | |
532 | // a prepare under the hood. | |
533 | auto s = PrepareInternal(); | |
534 | if (!s.ok()) { | |
535 | return s; | |
536 | } | |
537 | return CommitInternal(); | |
538 | } | |
539 | ||
540 | Status WriteUnpreparedTxn::CommitInternal() { | |
541 | // TODO(lth): Reduce duplicate code with WritePrepared commit logic. | |
542 | ||
543 | // We take the commit-time batch and append the Commit marker. The Memtable | |
544 | // will ignore the Commit marker in non-recovery mode | |
545 | WriteBatch* working_batch = GetCommitTimeWriteBatch(); | |
546 | const bool empty = working_batch->Count() == 0; | |
20effc67 TL |
547 | auto s = WriteBatchInternal::MarkCommit(working_batch, name_); |
548 | assert(s.ok()); | |
11fdf7f2 TL |
549 | |
550 | const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; | |
551 | if (!empty && for_recovery) { | |
552 | // When not writing to memtable, we can still cache the latest write batch. | |
553 | // The cached batch will be written to memtable in WriteRecoverableState | |
554 | // during FlushMemTable | |
555 | WriteBatchInternal::SetAsLastestPersistentState(working_batch); | |
556 | } | |
557 | ||
558 | const bool includes_data = !empty && !for_recovery; | |
559 | size_t commit_batch_cnt = 0; | |
560 | if (UNLIKELY(includes_data)) { | |
561 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, | |
562 | "Duplicate key overhead"); | |
563 | SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); | |
20effc67 | 564 | s = working_batch->Iterate(&counter); |
11fdf7f2 TL |
565 | assert(s.ok()); |
566 | commit_batch_cnt = counter.BatchCount(); | |
567 | } | |
568 | const bool disable_memtable = !includes_data; | |
569 | const bool do_one_write = | |
570 | !db_impl_->immutable_db_options().two_write_queues || disable_memtable; | |
f67539c2 | 571 | |
11fdf7f2 | 572 | WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map( |
f67539c2 TL |
573 | wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt); |
574 | const bool kFirstPrepareBatch = true; | |
575 | AddPreparedCallback add_prepared_callback( | |
576 | wpt_db_, db_impl_, commit_batch_cnt, | |
577 | db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch); | |
578 | PreReleaseCallback* pre_release_callback; | |
579 | if (do_one_write) { | |
580 | pre_release_callback = &update_commit_map; | |
581 | } else { | |
582 | pre_release_callback = &add_prepared_callback; | |
583 | } | |
11fdf7f2 | 584 | uint64_t seq_used = kMaxSequenceNumber; |
f67539c2 TL |
585 | // Since the prepared batch is directly written to memtable, there is |
586 | // already a connection between the memtable and its WAL, so there is no | |
587 | // need to redundantly reference the log that contains the prepared data. | |
11fdf7f2 TL |
588 | const uint64_t zero_log_number = 0ull; |
589 | size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; | |
20effc67 TL |
590 | s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, |
591 | zero_log_number, disable_memtable, &seq_used, | |
592 | batch_cnt, pre_release_callback); | |
11fdf7f2 | 593 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
f67539c2 | 594 | const SequenceNumber commit_batch_seq = seq_used; |
11fdf7f2 TL |
595 | if (LIKELY(do_one_write || !s.ok())) { |
596 | if (LIKELY(s.ok())) { | |
597 | // Note RemovePrepared should be called after WriteImpl that publishsed | |
598 | // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. | |
599 | for (const auto& seq : unprep_seqs_) { | |
600 | wpt_db_->RemovePrepared(seq.first, seq.second); | |
601 | } | |
602 | } | |
f67539c2 TL |
603 | if (UNLIKELY(!do_one_write)) { |
604 | wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); | |
605 | } | |
11fdf7f2 | 606 | unprep_seqs_.clear(); |
f67539c2 TL |
607 | flushed_save_points_.reset(nullptr); |
608 | unflushed_save_points_.reset(nullptr); | |
11fdf7f2 TL |
609 | return s; |
610 | } // else do the 2nd write to publish seq | |
f67539c2 TL |
611 | |
612 | // Populate unprep_seqs_ with commit_batch_seq, since we treat data in the | |
613 | // commit write batch as just another "unprepared" batch. This will also | |
614 | // update the unprep_seqs_ in the update_commit_map callback. | |
615 | unprep_seqs_[commit_batch_seq] = commit_batch_cnt; | |
20effc67 TL |
616 | WriteUnpreparedCommitEntryPreReleaseCallback |
617 | update_commit_map_with_commit_batch(wpt_db_, db_impl_, unprep_seqs_, 0); | |
f67539c2 | 618 | |
11fdf7f2 TL |
619 | // Note: the 2nd write comes with a performance penality. So if we have too |
620 | // many of commits accompanied with ComitTimeWriteBatch and yet we cannot | |
621 | // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, | |
622 | // two_write_queues should be disabled to avoid many additional writes here. | |
11fdf7f2 | 623 | |
f67539c2 | 624 | // Update commit map only from the 2nd queue |
11fdf7f2 | 625 | WriteBatch empty_batch; |
20effc67 TL |
626 | s = empty_batch.PutLogData(Slice()); |
627 | assert(s.ok()); | |
11fdf7f2 | 628 | // In the absence of Prepare markers, use Noop as a batch separator |
20effc67 TL |
629 | s = WriteBatchInternal::InsertNoop(&empty_batch); |
630 | assert(s.ok()); | |
11fdf7f2 TL |
631 | const bool DISABLE_MEMTABLE = true; |
632 | const size_t ONE_BATCH = 1; | |
633 | const uint64_t NO_REF_LOG = 0; | |
634 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, | |
635 | NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
20effc67 | 636 | &update_commit_map_with_commit_batch); |
11fdf7f2 TL |
637 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
638 | // Note RemovePrepared should be called after WriteImpl that publishsed the | |
639 | // seq. Otherwise SmallestUnCommittedSeq optimization breaks. | |
640 | for (const auto& seq : unprep_seqs_) { | |
641 | wpt_db_->RemovePrepared(seq.first, seq.second); | |
642 | } | |
643 | unprep_seqs_.clear(); | |
f67539c2 TL |
644 | flushed_save_points_.reset(nullptr); |
645 | unflushed_save_points_.reset(nullptr); | |
11fdf7f2 TL |
646 | return s; |
647 | } | |
648 | ||
f67539c2 | 649 | Status WriteUnpreparedTxn::WriteRollbackKeys( |
20effc67 | 650 | const LockTracker& lock_tracker, WriteBatchWithIndex* rollback_batch, |
f67539c2 | 651 | ReadCallback* callback, const ReadOptions& roptions) { |
20effc67 TL |
652 | // This assertion can be removed when range lock is supported. |
653 | assert(lock_tracker.IsPointLockSupported()); | |
f67539c2 TL |
654 | const auto& cf_map = *wupt_db_->GetCFHandleMap(); |
655 | auto WriteRollbackKey = [&](const std::string& key, uint32_t cfid) { | |
656 | const auto& cf_handle = cf_map.at(cfid); | |
657 | PinnableSlice pinnable_val; | |
658 | bool not_used; | |
659 | DBImpl::GetImplOptions get_impl_options; | |
660 | get_impl_options.column_family = cf_handle; | |
661 | get_impl_options.value = &pinnable_val; | |
662 | get_impl_options.value_found = ¬_used; | |
663 | get_impl_options.callback = callback; | |
664 | auto s = db_impl_->GetImpl(roptions, key, get_impl_options); | |
665 | ||
666 | if (s.ok()) { | |
667 | s = rollback_batch->Put(cf_handle, key, pinnable_val); | |
668 | assert(s.ok()); | |
669 | } else if (s.IsNotFound()) { | |
670 | s = rollback_batch->Delete(cf_handle, key); | |
671 | assert(s.ok()); | |
672 | } else { | |
673 | return s; | |
674 | } | |
675 | ||
676 | return Status::OK(); | |
677 | }; | |
678 | ||
20effc67 TL |
679 | std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it( |
680 | lock_tracker.GetColumnFamilyIterator()); | |
681 | assert(cf_it != nullptr); | |
682 | while (cf_it->HasNext()) { | |
683 | ColumnFamilyId cf = cf_it->Next(); | |
684 | std::unique_ptr<LockTracker::KeyIterator> key_it( | |
685 | lock_tracker.GetKeyIterator(cf)); | |
686 | assert(key_it != nullptr); | |
687 | while (key_it->HasNext()) { | |
688 | const std::string& key = key_it->Next(); | |
689 | auto s = WriteRollbackKey(key, cf); | |
f67539c2 TL |
690 | if (!s.ok()) { |
691 | return s; | |
692 | } | |
693 | } | |
694 | } | |
695 | ||
696 | for (const auto& cfkey : untracked_keys_) { | |
697 | const auto cfid = cfkey.first; | |
698 | const auto& keys = cfkey.second; | |
699 | for (const auto& key : keys) { | |
700 | auto s = WriteRollbackKey(key, cfid); | |
701 | if (!s.ok()) { | |
702 | return s; | |
703 | } | |
704 | } | |
705 | } | |
706 | ||
707 | return Status::OK(); | |
708 | } | |
709 | ||
11fdf7f2 TL |
710 | Status WriteUnpreparedTxn::RollbackInternal() { |
711 | // TODO(lth): Reduce duplicate code with WritePrepared rollback logic. | |
712 | WriteBatchWithIndex rollback_batch( | |
713 | wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0); | |
714 | assert(GetId() != kMaxSequenceNumber); | |
715 | assert(GetId() > 0); | |
11fdf7f2 | 716 | Status s; |
f67539c2 | 717 | auto read_at_seq = kMaxSequenceNumber; |
11fdf7f2 | 718 | ReadOptions roptions; |
f67539c2 TL |
719 | // to prevent callback's seq to be overrriden inside DBImpk::Get |
720 | roptions.snapshot = wpt_db_->GetMaxSnapshot(); | |
11fdf7f2 TL |
721 | // Note that we do not use WriteUnpreparedTxnReadCallback because we do not |
722 | // need to read our own writes when reading prior versions of the key for | |
723 | // rollback. | |
494da23a | 724 | WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq); |
20effc67 TL |
725 | // TODO(lth): We write rollback batch all in a single batch here, but this |
726 | // should be subdivded into multiple batches as well. In phase 2, when key | |
727 | // sets are read from WAL, this will happen naturally. | |
728 | s = WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions); | |
729 | if (!s.ok()) { | |
730 | return s; | |
731 | } | |
11fdf7f2 TL |
732 | |
733 | // The Rollback marker will be used as a batch separator | |
20effc67 TL |
734 | s = WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_); |
735 | assert(s.ok()); | |
11fdf7f2 TL |
736 | bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; |
737 | const bool DISABLE_MEMTABLE = true; | |
738 | const uint64_t NO_REF_LOG = 0; | |
739 | uint64_t seq_used = kMaxSequenceNumber; | |
20effc67 TL |
740 | // Rollback batch may contain duplicate keys, because tracked_keys_ is not |
741 | // comparator aware. | |
742 | auto rollback_batch_cnt = rollback_batch.SubBatchCnt(); | |
743 | // We commit the rolled back prepared batches. Although this is | |
11fdf7f2 TL |
744 | // counter-intuitive, i) it is safe to do so, since the prepared batches are |
745 | // already canceled out by the rollback batch, ii) adding the commit entry to | |
746 | // CommitCache will allow us to benefit from the existing mechanism in | |
747 | // CommitCache that keeps an entry evicted due to max advance and yet overlaps | |
748 | // with a live snapshot around so that the live snapshot properly skips the | |
749 | // entry even if its prepare seq is lower than max_evicted_seq_. | |
20effc67 TL |
750 | // |
751 | // TODO(lth): RollbackInternal is conceptually very similar to | |
752 | // CommitInternal, with the rollback batch simply taking on the role of | |
753 | // CommitTimeWriteBatch. We should be able to merge the two code paths. | |
11fdf7f2 | 754 | WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map( |
20effc67 | 755 | wpt_db_, db_impl_, unprep_seqs_, rollback_batch_cnt); |
11fdf7f2 TL |
756 | // Note: the rollback batch does not need AddPrepared since it is written to |
757 | // DB in one shot. min_uncommitted still works since it requires capturing | |
20effc67 | 758 | // data that is written to DB but not yet committed, while the rollback |
11fdf7f2 TL |
759 | // batch commits with PreReleaseCallback. |
760 | s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(), | |
761 | nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE, | |
20effc67 | 762 | &seq_used, rollback_batch_cnt, |
11fdf7f2 TL |
763 | do_one_write ? &update_commit_map : nullptr); |
764 | assert(!s.ok() || seq_used != kMaxSequenceNumber); | |
765 | if (!s.ok()) { | |
766 | return s; | |
767 | } | |
768 | if (do_one_write) { | |
769 | for (const auto& seq : unprep_seqs_) { | |
770 | wpt_db_->RemovePrepared(seq.first, seq.second); | |
771 | } | |
772 | unprep_seqs_.clear(); | |
f67539c2 TL |
773 | flushed_save_points_.reset(nullptr); |
774 | unflushed_save_points_.reset(nullptr); | |
11fdf7f2 TL |
775 | return s; |
776 | } // else do the 2nd write for commit | |
20effc67 | 777 | |
11fdf7f2 | 778 | uint64_t& prepare_seq = seq_used; |
20effc67 TL |
779 | // Populate unprep_seqs_ with rollback_batch_cnt, since we treat data in the |
780 | // rollback write batch as just another "unprepared" batch. This will also | |
781 | // update the unprep_seqs_ in the update_commit_map callback. | |
782 | unprep_seqs_[prepare_seq] = rollback_batch_cnt; | |
783 | WriteUnpreparedCommitEntryPreReleaseCallback | |
784 | update_commit_map_with_rollback_batch(wpt_db_, db_impl_, unprep_seqs_, 0); | |
785 | ||
11fdf7f2 TL |
786 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, |
787 | "RollbackInternal 2nd write prepare_seq: %" PRIu64, | |
788 | prepare_seq); | |
11fdf7f2 | 789 | WriteBatch empty_batch; |
20effc67 TL |
790 | const size_t ONE_BATCH = 1; |
791 | s = empty_batch.PutLogData(Slice()); | |
792 | assert(s.ok()); | |
11fdf7f2 | 793 | // In the absence of Prepare markers, use Noop as a batch separator |
20effc67 TL |
794 | s = WriteBatchInternal::InsertNoop(&empty_batch); |
795 | assert(s.ok()); | |
11fdf7f2 TL |
796 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, |
797 | NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |
20effc67 | 798 | &update_commit_map_with_rollback_batch); |
11fdf7f2 TL |
799 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
800 | // Mark the txn as rolled back | |
11fdf7f2 | 801 | if (s.ok()) { |
11fdf7f2 TL |
802 | for (const auto& seq : unprep_seqs_) { |
803 | wpt_db_->RemovePrepared(seq.first, seq.second); | |
804 | } | |
805 | } | |
806 | ||
807 | unprep_seqs_.clear(); | |
f67539c2 TL |
808 | flushed_save_points_.reset(nullptr); |
809 | unflushed_save_points_.reset(nullptr); | |
810 | return s; | |
811 | } | |
812 | ||
813 | void WriteUnpreparedTxn::Clear() { | |
814 | if (!recovered_txn_) { | |
20effc67 | 815 | txn_db_impl_->UnLock(this, *tracked_locks_); |
f67539c2 TL |
816 | } |
817 | unprep_seqs_.clear(); | |
818 | flushed_save_points_.reset(nullptr); | |
819 | unflushed_save_points_.reset(nullptr); | |
820 | recovered_txn_ = false; | |
821 | largest_validated_seq_ = 0; | |
822 | assert(active_iterators_.empty()); | |
823 | active_iterators_.clear(); | |
824 | untracked_keys_.clear(); | |
825 | TransactionBaseImpl::Clear(); | |
826 | } | |
827 | ||
828 | void WriteUnpreparedTxn::SetSavePoint() { | |
829 | assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) + | |
830 | (flushed_save_points_ ? flushed_save_points_->size() : 0) == | |
831 | (save_points_ ? save_points_->size() : 0)); | |
832 | PessimisticTransaction::SetSavePoint(); | |
833 | if (unflushed_save_points_ == nullptr) { | |
834 | unflushed_save_points_.reset(new autovector<size_t>()); | |
835 | } | |
836 | unflushed_save_points_->push_back(write_batch_.GetDataSize()); | |
837 | } | |
838 | ||
839 | Status WriteUnpreparedTxn::RollbackToSavePoint() { | |
840 | assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) + | |
841 | (flushed_save_points_ ? flushed_save_points_->size() : 0) == | |
842 | (save_points_ ? save_points_->size() : 0)); | |
843 | if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) { | |
844 | Status s = PessimisticTransaction::RollbackToSavePoint(); | |
845 | assert(!s.IsNotFound()); | |
846 | unflushed_save_points_->pop_back(); | |
847 | return s; | |
848 | } | |
849 | ||
850 | if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) { | |
851 | return RollbackToSavePointInternal(); | |
852 | } | |
853 | ||
854 | return Status::NotFound(); | |
855 | } | |
856 | ||
857 | Status WriteUnpreparedTxn::RollbackToSavePointInternal() { | |
858 | Status s; | |
859 | ||
860 | const bool kClear = true; | |
861 | TransactionBaseImpl::InitWriteBatch(kClear); | |
862 | ||
863 | assert(flushed_save_points_->size() > 0); | |
864 | WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back(); | |
865 | ||
866 | assert(save_points_ != nullptr && save_points_->size() > 0); | |
20effc67 | 867 | const LockTracker& tracked_keys = *save_points_->top().new_locks_; |
f67539c2 TL |
868 | |
869 | ReadOptions roptions; | |
870 | roptions.snapshot = top.snapshot_->snapshot(); | |
871 | SequenceNumber min_uncommitted = | |
20effc67 | 872 | static_cast_with_check<const SnapshotImpl>(roptions.snapshot) |
f67539c2 TL |
873 | ->min_uncommitted_; |
874 | SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber(); | |
875 | WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, | |
876 | top.unprep_seqs_, | |
877 | kBackedByDBSnapshot); | |
20effc67 TL |
878 | s = WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions); |
879 | if (!s.ok()) { | |
880 | return s; | |
881 | } | |
f67539c2 TL |
882 | |
883 | const bool kPrepared = true; | |
884 | s = FlushWriteBatchToDBInternal(!kPrepared); | |
f67539c2 TL |
885 | if (!s.ok()) { |
886 | return s; | |
887 | } | |
888 | ||
889 | // PessimisticTransaction::RollbackToSavePoint will call also call | |
890 | // RollbackToSavepoint on write_batch_. However, write_batch_ is empty and has | |
891 | // no savepoints because this savepoint has already been flushed. Work around | |
892 | // this by setting a fake savepoint. | |
893 | write_batch_.SetSavePoint(); | |
894 | s = PessimisticTransaction::RollbackToSavePoint(); | |
895 | assert(s.ok()); | |
896 | if (!s.ok()) { | |
897 | return s; | |
898 | } | |
899 | ||
900 | flushed_save_points_->pop_back(); | |
11fdf7f2 TL |
901 | return s; |
902 | } | |
903 | ||
f67539c2 TL |
904 | Status WriteUnpreparedTxn::PopSavePoint() { |
905 | assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) + | |
906 | (flushed_save_points_ ? flushed_save_points_->size() : 0) == | |
907 | (save_points_ ? save_points_->size() : 0)); | |
908 | if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) { | |
909 | Status s = PessimisticTransaction::PopSavePoint(); | |
910 | assert(!s.IsNotFound()); | |
911 | unflushed_save_points_->pop_back(); | |
912 | return s; | |
913 | } | |
914 | ||
915 | if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) { | |
916 | // PessimisticTransaction::PopSavePoint will call also call PopSavePoint on | |
917 | // write_batch_. However, write_batch_ is empty and has no savepoints | |
918 | // because this savepoint has already been flushed. Work around this by | |
919 | // setting a fake savepoint. | |
920 | write_batch_.SetSavePoint(); | |
921 | Status s = PessimisticTransaction::PopSavePoint(); | |
922 | assert(!s.IsNotFound()); | |
923 | flushed_save_points_->pop_back(); | |
924 | return s; | |
925 | } | |
926 | ||
927 | return Status::NotFound(); | |
928 | } | |
929 | ||
930 | void WriteUnpreparedTxn::MultiGet(const ReadOptions& options, | |
931 | ColumnFamilyHandle* column_family, | |
932 | const size_t num_keys, const Slice* keys, | |
933 | PinnableSlice* values, Status* statuses, | |
934 | const bool sorted_input) { | |
935 | SequenceNumber min_uncommitted, snap_seq; | |
936 | const SnapshotBackup backed_by_snapshot = | |
937 | wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); | |
938 | WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, | |
939 | unprep_seqs_, backed_by_snapshot); | |
940 | write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys, | |
941 | keys, values, statuses, sorted_input, | |
942 | &callback); | |
943 | if (UNLIKELY(!callback.valid() || | |
944 | !wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { | |
945 | wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); | |
946 | for (size_t i = 0; i < num_keys; i++) { | |
947 | statuses[i] = Status::TryAgain(); | |
948 | } | |
949 | } | |
950 | } | |
951 | ||
11fdf7f2 TL |
952 | Status WriteUnpreparedTxn::Get(const ReadOptions& options, |
953 | ColumnFamilyHandle* column_family, | |
954 | const Slice& key, PinnableSlice* value) { | |
f67539c2 TL |
955 | SequenceNumber min_uncommitted, snap_seq; |
956 | const SnapshotBackup backed_by_snapshot = | |
957 | wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); | |
958 | WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, | |
959 | unprep_seqs_, backed_by_snapshot); | |
960 | auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key, | |
961 | value, &callback); | |
962 | if (LIKELY(callback.valid() && | |
963 | wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { | |
964 | return res; | |
965 | } else { | |
966 | wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); | |
967 | return Status::TryAgain(); | |
11fdf7f2 | 968 | } |
f67539c2 | 969 | } |
11fdf7f2 | 970 | |
f67539c2 TL |
971 | namespace { |
972 | static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) { | |
973 | auto txn = reinterpret_cast<WriteUnpreparedTxn*>(arg1); | |
974 | auto iter = reinterpret_cast<Iterator*>(arg2); | |
975 | txn->RemoveActiveIterator(iter); | |
11fdf7f2 | 976 | } |
f67539c2 | 977 | } // anonymous namespace |
11fdf7f2 TL |
978 | |
979 | Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) { | |
980 | return GetIterator(options, wupt_db_->DefaultColumnFamily()); | |
981 | } | |
982 | ||
983 | Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options, | |
984 | ColumnFamilyHandle* column_family) { | |
985 | // Make sure to get iterator from WriteUnprepareTxnDB, not the root db. | |
986 | Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this); | |
987 | assert(db_iter); | |
988 | ||
f67539c2 TL |
989 | auto iter = write_batch_.NewIteratorWithBase(column_family, db_iter); |
990 | active_iterators_.push_back(iter); | |
991 | iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter); | |
992 | return iter; | |
993 | } | |
994 | ||
995 | Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, | |
996 | const Slice& key, | |
997 | SequenceNumber* tracked_at_seq) { | |
998 | // TODO(lth): Reduce duplicate code with WritePrepared ValidateSnapshot logic. | |
999 | assert(snapshot_); | |
1000 | ||
1001 | SequenceNumber min_uncommitted = | |
20effc67 | 1002 | static_cast_with_check<const SnapshotImpl>(snapshot_.get()) |
f67539c2 TL |
1003 | ->min_uncommitted_; |
1004 | SequenceNumber snap_seq = snapshot_->GetSequenceNumber(); | |
1005 | // tracked_at_seq is either max or the last snapshot with which this key was | |
1006 | // trackeed so there is no need to apply the IsInSnapshot to this comparison | |
1007 | // here as tracked_at_seq is not a prepare seq. | |
1008 | if (*tracked_at_seq <= snap_seq) { | |
1009 | // If the key has been previous validated at a sequence number earlier | |
1010 | // than the curent snapshot's sequence number, we already know it has not | |
1011 | // been modified. | |
1012 | return Status::OK(); | |
1013 | } | |
1014 | ||
1015 | *tracked_at_seq = snap_seq; | |
1016 | ||
1017 | ColumnFamilyHandle* cfh = | |
1018 | column_family ? column_family : db_impl_->DefaultColumnFamily(); | |
1019 | ||
1020 | WriteUnpreparedTxnReadCallback snap_checker( | |
1021 | wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot); | |
1022 | return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), | |
1023 | snap_seq, false /* cache_only */, | |
1024 | &snap_checker, min_uncommitted); | |
11fdf7f2 TL |
1025 | } |
1026 | ||
1027 | const std::map<SequenceNumber, size_t>& | |
1028 | WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() { | |
1029 | return unprep_seqs_; | |
1030 | } | |
1031 | ||
f67539c2 | 1032 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 TL |
1033 | |
1034 | #endif // ROCKSDB_LITE |