]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_unprepared_txn.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_unprepared_txn.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/write_unprepared_txn.h"
9 #include "db/db_impl.h"
10 #include "util/cast_util.h"
11 #include "utilities/transactions/write_unprepared_txn_db.h"
12
13 #ifndef __STDC_FORMAT_MACROS
14 #define __STDC_FORMAT_MACROS
15 #endif
16
17 namespace rocksdb {
18
19 bool WriteUnpreparedTxnReadCallback::IsVisible(SequenceNumber seq) {
20 auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers();
21
22 // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
23 // in unprep_seqs, we have to check if seq is equal to prep_seq or any of
24 // the prepare_batch_cnt seq nums after it.
25 //
26 // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
27 // large.
28 for (const auto& it : unprep_seqs) {
29 if (it.first <= seq && seq < it.first + it.second) {
30 return true;
31 }
32 }
33
34 return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_);
35 }
36
37 SequenceNumber WriteUnpreparedTxnReadCallback::MaxUnpreparedSequenceNumber() {
38 auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers();
39 if (unprep_seqs.size()) {
40 return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
41 }
42
43 return 0;
44 }
45
46 WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
47 const WriteOptions& write_options,
48 const TransactionOptions& txn_options)
49 : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) {
50 max_write_batch_size_ = txn_options.max_write_batch_size;
51 // We set max bytes to zero so that we don't get a memory limit error.
52 // Instead of trying to keep write batch strictly under the size limit, we
53 // just flush to DB when the limit is exceeded in write unprepared, to avoid
54 // having retry logic. This also allows very big key-value pairs that exceed
55 // max bytes to succeed.
56 write_batch_.SetMaxBytes(0);
57 }
58
59 WriteUnpreparedTxn::~WriteUnpreparedTxn() {
60 if (!unprep_seqs_.empty()) {
61 assert(log_number_ > 0);
62 assert(GetId() > 0);
63 assert(!name_.empty());
64
65 // We should rollback regardless of GetState, but some unit tests that
66 // test crash recovery run the destructor assuming that rollback does not
67 // happen, so that rollback during recovery can be exercised.
68 if (GetState() == STARTED) {
69 auto s __attribute__((__unused__)) = RollbackInternal();
70 // TODO(lth): Better error handling.
71 assert(s.ok());
72 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
73 log_number_);
74 }
75 }
76 }
77
78 void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
79 PessimisticTransaction::Initialize(txn_options);
80 max_write_batch_size_ = txn_options.max_write_batch_size;
81 write_batch_.SetMaxBytes(0);
82 unprep_seqs_.clear();
83 write_set_keys_.clear();
84 }
85
86 Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
87 const Slice& key, const Slice& value) {
88 Status s = MaybeFlushWriteBatchToDB();
89 if (!s.ok()) {
90 return s;
91 }
92 return TransactionBaseImpl::Put(column_family, key, value);
93 }
94
95 Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
96 const SliceParts& key, const SliceParts& value) {
97 Status s = MaybeFlushWriteBatchToDB();
98 if (!s.ok()) {
99 return s;
100 }
101 return TransactionBaseImpl::Put(column_family, key, value);
102 }
103
104 Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
105 const Slice& key, const Slice& value) {
106 Status s = MaybeFlushWriteBatchToDB();
107 if (!s.ok()) {
108 return s;
109 }
110 return TransactionBaseImpl::Merge(column_family, key, value);
111 }
112
113 Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
114 const Slice& key) {
115 Status s = MaybeFlushWriteBatchToDB();
116 if (!s.ok()) {
117 return s;
118 }
119 return TransactionBaseImpl::Delete(column_family, key);
120 }
121
122 Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
123 const SliceParts& key) {
124 Status s = MaybeFlushWriteBatchToDB();
125 if (!s.ok()) {
126 return s;
127 }
128 return TransactionBaseImpl::Delete(column_family, key);
129 }
130
131 Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
132 const Slice& key) {
133 Status s = MaybeFlushWriteBatchToDB();
134 if (!s.ok()) {
135 return s;
136 }
137 return TransactionBaseImpl::SingleDelete(column_family, key);
138 }
139
140 Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
141 const SliceParts& key) {
142 Status s = MaybeFlushWriteBatchToDB();
143 if (!s.ok()) {
144 return s;
145 }
146 return TransactionBaseImpl::SingleDelete(column_family, key);
147 }
148
149 Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
150 const bool kPrepared = true;
151 Status s;
152
153 bool needs_mark = (log_number_ == 0);
154
155 if (max_write_batch_size_ != 0 &&
156 write_batch_.GetDataSize() > max_write_batch_size_) {
157 assert(GetState() != PREPARED);
158 s = FlushWriteBatchToDB(!kPrepared);
159 if (s.ok()) {
160 assert(log_number_ > 0);
161 // This is done to prevent WAL files after log_number_ from being
162 // deleted, because they could potentially contain unprepared batches.
163 if (needs_mark) {
164 dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
165 log_number_);
166 }
167 }
168 }
169 return s;
170 }
171
172 void WriteUnpreparedTxn::UpdateWriteKeySet(uint32_t cfid, const Slice& key) {
173 // TODO(lth): write_set_keys_ can just be a std::string instead of a vector.
174 write_set_keys_[cfid].push_back(key.ToString());
175 }
176
177 Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
178 if (name_.empty()) {
179 return Status::InvalidArgument("Cannot write to DB without SetName.");
180 }
181
182 // Update write_key_set_ for rollback purposes.
183 KeySetBuilder keyset_handler(
184 this, wupt_db_->txn_db_options_.rollback_merge_operands);
185 auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&keyset_handler);
186 assert(s.ok());
187 if (!s.ok()) {
188 return s;
189 }
190
191 // TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
192 WriteOptions write_options = write_options_;
193 write_options.disableWAL = false;
194 const bool WRITE_AFTER_COMMIT = true;
195 // MarkEndPrepare will change Noop marker to the appropriate marker.
196 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
197 !WRITE_AFTER_COMMIT, !prepared);
198 // For each duplicate key we account for a new sub-batch
199 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
200 // AddPrepared better to be called in the pre-release callback otherwise there
201 // is a non-zero chance of max advancing prepare_seq and readers assume the
202 // data as committed.
203 // Also having it in the PreReleaseCallback allows in-order addition of
204 // prepared entries to PrepareHeap and hence enables an optimization. Refer to
205 // SmallestUnCommittedSeq for more details.
206 AddPreparedCallback add_prepared_callback(
207 wpt_db_, prepare_batch_cnt_,
208 db_impl_->immutable_db_options().two_write_queues);
209 const bool DISABLE_MEMTABLE = true;
210 uint64_t seq_used = kMaxSequenceNumber;
211 // log_number_ should refer to the oldest log containing uncommitted data
212 // from the current transaction. This means that if log_number_ is set,
213 // WriteImpl should not overwrite that value, so set log_used to nullptr if
214 // log_number_ is already set.
215 uint64_t* log_used = log_number_ ? nullptr : &log_number_;
216 s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
217 /*callback*/ nullptr, log_used, /*log ref*/
218 0, !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_,
219 &add_prepared_callback);
220 assert(!s.ok() || seq_used != kMaxSequenceNumber);
221 auto prepare_seq = seq_used;
222
223 // Only call SetId if it hasn't been set yet.
224 if (GetId() == 0) {
225 SetId(prepare_seq);
226 }
227 // unprep_seqs_ will also contain prepared seqnos since they are treated in
228 // the same way in the prepare/commit callbacks. See the comment on the
229 // definition of unprep_seqs_.
230 unprep_seqs_[prepare_seq] = prepare_batch_cnt_;
231
232 // Reset transaction state.
233 if (!prepared) {
234 prepare_batch_cnt_ = 0;
235 write_batch_.Clear();
236 WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
237 }
238
239 return s;
240 }
241
242 Status WriteUnpreparedTxn::PrepareInternal() {
243 const bool kPrepared = true;
244 return FlushWriteBatchToDB(kPrepared);
245 }
246
247 Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() {
248 if (unprep_seqs_.empty()) {
249 assert(log_number_ == 0);
250 assert(GetId() == 0);
251 return WritePreparedTxn::CommitWithoutPrepareInternal();
252 }
253
254 // TODO(lth): We should optimize commit without prepare to not perform
255 // a prepare under the hood.
256 auto s = PrepareInternal();
257 if (!s.ok()) {
258 return s;
259 }
260 return CommitInternal();
261 }
262
263 Status WriteUnpreparedTxn::CommitInternal() {
264 // TODO(lth): Reduce duplicate code with WritePrepared commit logic.
265
266 // We take the commit-time batch and append the Commit marker. The Memtable
267 // will ignore the Commit marker in non-recovery mode
268 WriteBatch* working_batch = GetCommitTimeWriteBatch();
269 const bool empty = working_batch->Count() == 0;
270 WriteBatchInternal::MarkCommit(working_batch, name_);
271
272 const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
273 if (!empty && for_recovery) {
274 // When not writing to memtable, we can still cache the latest write batch.
275 // The cached batch will be written to memtable in WriteRecoverableState
276 // during FlushMemTable
277 WriteBatchInternal::SetAsLastestPersistentState(working_batch);
278 }
279
280 const bool includes_data = !empty && !for_recovery;
281 size_t commit_batch_cnt = 0;
282 if (UNLIKELY(includes_data)) {
283 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
284 "Duplicate key overhead");
285 SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
286 auto s = working_batch->Iterate(&counter);
287 assert(s.ok());
288 commit_batch_cnt = counter.BatchCount();
289 }
290 const bool disable_memtable = !includes_data;
291 const bool do_one_write =
292 !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
293 const bool publish_seq = do_one_write;
294 // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
295 // DB in one shot. min_uncommitted still works since it requires capturing
296 // data that is written to DB but not yet committed, while
297 // CommitTimeWriteBatch commits with PreReleaseCallback.
298 WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
299 wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt, publish_seq);
300 uint64_t seq_used = kMaxSequenceNumber;
301 // Since the prepared batch is directly written to memtable, there is already
302 // a connection between the memtable and its WAL, so there is no need to
303 // redundantly reference the log that contains the prepared data.
304 const uint64_t zero_log_number = 0ull;
305 size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
306 auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
307 zero_log_number, disable_memtable, &seq_used,
308 batch_cnt, &update_commit_map);
309 assert(!s.ok() || seq_used != kMaxSequenceNumber);
310 if (LIKELY(do_one_write || !s.ok())) {
311 if (LIKELY(s.ok())) {
312 // Note RemovePrepared should be called after WriteImpl that publishsed
313 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
314 for (const auto& seq : unprep_seqs_) {
315 wpt_db_->RemovePrepared(seq.first, seq.second);
316 }
317 }
318 unprep_seqs_.clear();
319 write_set_keys_.clear();
320 return s;
321 } // else do the 2nd write to publish seq
322 // Note: the 2nd write comes with a performance penality. So if we have too
323 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
324 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
325 // two_write_queues should be disabled to avoid many additional writes here.
326 class PublishSeqPreReleaseCallback : public PreReleaseCallback {
327 public:
328 explicit PublishSeqPreReleaseCallback(DBImpl* db_impl)
329 : db_impl_(db_impl) {}
330 virtual Status Callback(SequenceNumber seq, bool is_mem_disabled
331 __attribute__((__unused__))) override {
332 assert(is_mem_disabled);
333 assert(db_impl_->immutable_db_options().two_write_queues);
334 db_impl_->SetLastPublishedSequence(seq);
335 return Status::OK();
336 }
337
338 private:
339 DBImpl* db_impl_;
340 } publish_seq_callback(db_impl_);
341 WriteBatch empty_batch;
342 empty_batch.PutLogData(Slice());
343 // In the absence of Prepare markers, use Noop as a batch separator
344 WriteBatchInternal::InsertNoop(&empty_batch);
345 const bool DISABLE_MEMTABLE = true;
346 const size_t ONE_BATCH = 1;
347 const uint64_t NO_REF_LOG = 0;
348 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
349 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
350 &publish_seq_callback);
351 assert(!s.ok() || seq_used != kMaxSequenceNumber);
352 // Note RemovePrepared should be called after WriteImpl that publishsed the
353 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
354 for (const auto& seq : unprep_seqs_) {
355 wpt_db_->RemovePrepared(seq.first, seq.second);
356 }
357 unprep_seqs_.clear();
358 write_set_keys_.clear();
359 return s;
360 }
361
362 Status WriteUnpreparedTxn::RollbackInternal() {
363 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
364 WriteBatchWithIndex rollback_batch(
365 wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0);
366 assert(GetId() != kMaxSequenceNumber);
367 assert(GetId() > 0);
368 const auto& cf_map = *wupt_db_->GetCFHandleMap();
369 // In WritePrepared, the txn is is the same as prepare seq
370 auto last_visible_txn = GetId() - 1;
371 Status s;
372
373 ReadOptions roptions;
374 // Note that we do not use WriteUnpreparedTxnReadCallback because we do not
375 // need to read our own writes when reading prior versions of the key for
376 // rollback.
377 WritePreparedTxnReadCallback callback(wpt_db_, last_visible_txn, 0);
378 for (const auto& cfkey : write_set_keys_) {
379 const auto cfid = cfkey.first;
380 const auto& keys = cfkey.second;
381 for (const auto& key : keys) {
382 const auto& cf_handle = cf_map.at(cfid);
383 PinnableSlice pinnable_val;
384 bool not_used;
385 s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
386 &callback);
387
388 if (s.ok()) {
389 s = rollback_batch.Put(cf_handle, key, pinnable_val);
390 assert(s.ok());
391 } else if (s.IsNotFound()) {
392 s = rollback_batch.Delete(cf_handle, key);
393 assert(s.ok());
394 } else {
395 return s;
396 }
397 }
398 }
399
400 // The Rollback marker will be used as a batch separator
401 WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
402 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
403 const bool DISABLE_MEMTABLE = true;
404 const uint64_t NO_REF_LOG = 0;
405 uint64_t seq_used = kMaxSequenceNumber;
406 // TODO(lth): We write rollback batch all in a single batch here, but this
407 // should be subdivded into multiple batches as well. In phase 2, when key
408 // sets are read from WAL, this will happen naturally.
409 const size_t ONE_BATCH = 1;
410 // We commit the rolled back prepared batches. ALthough this is
411 // counter-intuitive, i) it is safe to do so, since the prepared batches are
412 // already canceled out by the rollback batch, ii) adding the commit entry to
413 // CommitCache will allow us to benefit from the existing mechanism in
414 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
415 // with a live snapshot around so that the live snapshot properly skips the
416 // entry even if its prepare seq is lower than max_evicted_seq_.
417 WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
418 wpt_db_, db_impl_, unprep_seqs_, ONE_BATCH);
419 // Note: the rollback batch does not need AddPrepared since it is written to
420 // DB in one shot. min_uncommitted still works since it requires capturing
421 // data that is written to DB but not yet committed, while the roolback
422 // batch commits with PreReleaseCallback.
423 s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
424 nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE,
425 &seq_used, rollback_batch.SubBatchCnt(),
426 do_one_write ? &update_commit_map : nullptr);
427 assert(!s.ok() || seq_used != kMaxSequenceNumber);
428 if (!s.ok()) {
429 return s;
430 }
431 if (do_one_write) {
432 for (const auto& seq : unprep_seqs_) {
433 wpt_db_->RemovePrepared(seq.first, seq.second);
434 }
435 unprep_seqs_.clear();
436 write_set_keys_.clear();
437 return s;
438 } // else do the 2nd write for commit
439 uint64_t& prepare_seq = seq_used;
440 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
441 "RollbackInternal 2nd write prepare_seq: %" PRIu64,
442 prepare_seq);
443 // Commit the batch by writing an empty batch to the queue that will release
444 // the commit sequence number to readers.
445 const size_t ZERO_COMMITS = 0;
446 WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
447 wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS);
448 WriteBatch empty_batch;
449 empty_batch.PutLogData(Slice());
450 // In the absence of Prepare markers, use Noop as a batch separator
451 WriteBatchInternal::InsertNoop(&empty_batch);
452 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
453 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
454 &update_commit_map_with_prepare);
455 assert(!s.ok() || seq_used != kMaxSequenceNumber);
456 // Mark the txn as rolled back
457 uint64_t& rollback_seq = seq_used;
458 if (s.ok()) {
459 // Note: it is safe to do it after PreReleaseCallback via WriteImpl since
460 // all the writes by the prpared batch are already blinded by the rollback
461 // batch. The only reason we commit the prepared batch here is to benefit
462 // from the existing mechanism in CommitCache that takes care of the rare
463 // cases that the prepare seq is visible to a snsapshot but max evicted seq
464 // advances that prepare seq.
465 for (const auto& seq : unprep_seqs_) {
466 for (size_t i = 0; i < seq.second; i++) {
467 wpt_db_->AddCommitted(seq.first + i, rollback_seq);
468 }
469 }
470 for (const auto& seq : unprep_seqs_) {
471 wpt_db_->RemovePrepared(seq.first, seq.second);
472 }
473 }
474
475 unprep_seqs_.clear();
476 write_set_keys_.clear();
477 return s;
478 }
479
480 Status WriteUnpreparedTxn::Get(const ReadOptions& options,
481 ColumnFamilyHandle* column_family,
482 const Slice& key, PinnableSlice* value) {
483 auto snapshot = options.snapshot;
484 auto snap_seq =
485 snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
486 SequenceNumber min_uncommitted = 0; // by default disable the optimization
487 if (snapshot != nullptr) {
488 min_uncommitted =
489 static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
490 ->min_uncommitted_;
491 }
492
493 WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
494 this);
495 return write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value,
496 &callback);
497 }
498
499 Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
500 return GetIterator(options, wupt_db_->DefaultColumnFamily());
501 }
502
503 Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
504 ColumnFamilyHandle* column_family) {
505 // Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
506 Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
507 assert(db_iter);
508
509 return write_batch_.NewIteratorWithBase(column_family, db_iter);
510 }
511
512 const std::map<SequenceNumber, size_t>&
513 WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
514 return unprep_seqs_;
515 }
516
517 } // namespace rocksdb
518
519 #endif // ROCKSDB_LITE