]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_unprepared_txn_db.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
10 #endif
11
12 #include "utilities/transactions/write_unprepared_txn_db.h"
13 #include "rocksdb/utilities/transaction_db.h"
14 #include "util/cast_util.h"
15
16 namespace rocksdb {
17
18 // Instead of reconstructing a Transaction object, and calling rollback on it,
19 // we can be more efficient with RollbackRecoveredTransaction by skipping
20 // unnecessary steps (eg. updating CommitMap, reconstructing keyset)
21 Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
22 const DBImpl::RecoveredTransaction* rtxn) {
23 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
24 assert(rtxn->unprepared_);
25 auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
26 auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
27 WriteOptions w_options;
28 // If we crash during recovery, we can just recalculate and rewrite the
29 // rollback batch.
30 w_options.disableWAL = true;
31
32 class InvalidSnapshotReadCallback : public ReadCallback {
33 public:
34 InvalidSnapshotReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
35 : ReadCallback(snapshot), db_(db) {}
36
37 // Will be called to see if the seq number visible; if not it moves on to
38 // the next seq number.
39 inline bool IsVisibleFullCheck(SequenceNumber seq) override {
40 // Becomes true if it cannot tell by comparing seq with snapshot seq since
41 // the snapshot is not a real snapshot.
42 auto snapshot = max_visible_seq_;
43 bool released = false;
44 auto ret = db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &released);
45 assert(!released || ret);
46 return ret;
47 }
48
49 private:
50 WritePreparedTxnDB* db_;
51 };
52
53 // Iterate starting with largest sequence number.
54 for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); it++) {
55 auto last_visible_txn = it->first - 1;
56 const auto& batch = it->second.batch_;
57 WriteBatch rollback_batch;
58
59 struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
60 DBImpl* db_;
61 ReadOptions roptions;
62 InvalidSnapshotReadCallback callback;
63 WriteBatch* rollback_batch_;
64 std::map<uint32_t, const Comparator*>& comparators_;
65 std::map<uint32_t, ColumnFamilyHandle*>& handles_;
66 using CFKeys = std::set<Slice, SetComparator>;
67 std::map<uint32_t, CFKeys> keys_;
68 bool rollback_merge_operands_;
69 RollbackWriteBatchBuilder(
70 DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
71 WriteBatch* dst_batch,
72 std::map<uint32_t, const Comparator*>& comparators,
73 std::map<uint32_t, ColumnFamilyHandle*>& handles,
74 bool rollback_merge_operands)
75 : db_(db),
76 callback(wpt_db, snap_seq),
77 // disable min_uncommitted optimization
78 rollback_batch_(dst_batch),
79 comparators_(comparators),
80 handles_(handles),
81 rollback_merge_operands_(rollback_merge_operands) {}
82
83 Status Rollback(uint32_t cf, const Slice& key) {
84 Status s;
85 CFKeys& cf_keys = keys_[cf];
86 if (cf_keys.size() == 0) { // just inserted
87 auto cmp = comparators_[cf];
88 keys_[cf] = CFKeys(SetComparator(cmp));
89 }
90 auto res = cf_keys.insert(key);
91 if (res.second ==
92 false) { // second is false if a element already existed.
93 return s;
94 }
95
96 PinnableSlice pinnable_val;
97 bool not_used;
98 auto cf_handle = handles_[cf];
99 s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
100 &callback);
101 assert(s.ok() || s.IsNotFound());
102 if (s.ok()) {
103 s = rollback_batch_->Put(cf_handle, key, pinnable_val);
104 assert(s.ok());
105 } else if (s.IsNotFound()) {
106 // There has been no readable value before txn. By adding a delete we
107 // make sure that there will be none afterwards either.
108 s = rollback_batch_->Delete(cf_handle, key);
109 assert(s.ok());
110 } else {
111 // Unexpected status. Return it to the user.
112 }
113 return s;
114 }
115
116 Status PutCF(uint32_t cf, const Slice& key,
117 const Slice& /*val*/) override {
118 return Rollback(cf, key);
119 }
120
121 Status DeleteCF(uint32_t cf, const Slice& key) override {
122 return Rollback(cf, key);
123 }
124
125 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
126 return Rollback(cf, key);
127 }
128
129 Status MergeCF(uint32_t cf, const Slice& key,
130 const Slice& /*val*/) override {
131 if (rollback_merge_operands_) {
132 return Rollback(cf, key);
133 } else {
134 return Status::OK();
135 }
136 }
137
138 // Recovered batches do not contain 2PC markers.
139 Status MarkNoop(bool) override { return Status::InvalidArgument(); }
140 Status MarkBeginPrepare(bool) override {
141 return Status::InvalidArgument();
142 }
143 Status MarkEndPrepare(const Slice&) override {
144 return Status::InvalidArgument();
145 }
146 Status MarkCommit(const Slice&) override {
147 return Status::InvalidArgument();
148 }
149 Status MarkRollback(const Slice&) override {
150 return Status::InvalidArgument();
151 }
152 } rollback_handler(db_impl_, this, last_visible_txn, &rollback_batch,
153 *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
154 txn_db_options_.rollback_merge_operands);
155
156 auto s = batch->Iterate(&rollback_handler);
157 if (!s.ok()) {
158 return s;
159 }
160
161 // The Rollback marker will be used as a batch separator
162 WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
163
164 const uint64_t kNoLogRef = 0;
165 const bool kDisableMemtable = true;
166 const size_t kOneBatch = 1;
167 uint64_t seq_used = kMaxSequenceNumber;
168 s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
169 kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch);
170 if (!s.ok()) {
171 return s;
172 }
173
174 // If two_write_queues, we must manually release the sequence number to
175 // readers.
176 if (db_impl_->immutable_db_options().two_write_queues) {
177 db_impl_->SetLastPublishedSequence(seq_used);
178 }
179 }
180
181 return Status::OK();
182 }
183
184 Status WriteUnpreparedTxnDB::Initialize(
185 const std::vector<size_t>& compaction_enabled_cf_indices,
186 const std::vector<ColumnFamilyHandle*>& handles) {
187 // TODO(lth): Reduce code duplication in this function.
188 auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
189 assert(dbimpl != nullptr);
190
191 db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
192 // A callback to commit a single sub-batch
193 class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
194 public:
195 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
196 : db_(db) {}
197 Status Callback(SequenceNumber commit_seq,
198 bool is_mem_disabled __attribute__((__unused__)),
199 uint64_t) override {
200 assert(!is_mem_disabled);
201 db_->AddCommitted(commit_seq, commit_seq);
202 return Status::OK();
203 }
204
205 private:
206 WritePreparedTxnDB* db_;
207 };
208 db_impl_->SetRecoverableStatePreReleaseCallback(
209 new CommitSubBatchPreReleaseCallback(this));
210
211 // PessimisticTransactionDB::Initialize
212 for (auto cf_ptr : handles) {
213 AddColumnFamily(cf_ptr);
214 }
215 // Verify cf options
216 for (auto handle : handles) {
217 ColumnFamilyDescriptor cfd;
218 Status s = handle->GetDescriptor(&cfd);
219 if (!s.ok()) {
220 return s;
221 }
222 s = VerifyCFOptions(cfd.options);
223 if (!s.ok()) {
224 return s;
225 }
226 }
227
228 // Re-enable compaction for the column families that initially had
229 // compaction enabled.
230 std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
231 compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
232 for (auto index : compaction_enabled_cf_indices) {
233 compaction_enabled_cf_handles.push_back(handles[index]);
234 }
235
236 // create 'real' transactions from recovered shell transactions
237 auto rtxns = dbimpl->recovered_transactions();
238 for (auto rtxn : rtxns) {
239 auto recovered_trx = rtxn.second;
240 assert(recovered_trx);
241 assert(recovered_trx->batches_.size() >= 1);
242 assert(recovered_trx->name_.length());
243
244 // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
245 // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
246 // two iterations is required.
247 if (recovered_trx->unprepared_) {
248 continue;
249 }
250
251 WriteOptions w_options;
252 w_options.sync = true;
253 TransactionOptions t_options;
254
255 auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
256 auto first_seq = recovered_trx->batches_.begin()->first;
257 auto last_prepare_batch_cnt =
258 recovered_trx->batches_.begin()->second.batch_cnt_;
259
260 Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
261 assert(real_trx);
262 auto wupt =
263 static_cast_with_check<WriteUnpreparedTxn, Transaction>(real_trx);
264
265 real_trx->SetLogNumber(first_log_number);
266 real_trx->SetId(first_seq);
267 Status s = real_trx->SetName(recovered_trx->name_);
268 if (!s.ok()) {
269 break;
270 }
271 wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
272
273 for (auto batch : recovered_trx->batches_) {
274 const auto& seq = batch.first;
275 const auto& batch_info = batch.second;
276 auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
277 assert(batch_info.log_number_);
278
279 for (size_t i = 0; i < cnt; i++) {
280 AddPrepared(seq + i);
281 }
282 assert(wupt->unprep_seqs_.count(seq) == 0);
283 wupt->unprep_seqs_[seq] = cnt;
284 KeySetBuilder keyset_handler(wupt,
285 txn_db_options_.rollback_merge_operands);
286 s = batch_info.batch_->Iterate(&keyset_handler);
287 assert(s.ok());
288 if (!s.ok()) {
289 break;
290 }
291 }
292
293 wupt->write_batch_.Clear();
294 WriteBatchInternal::InsertNoop(wupt->write_batch_.GetWriteBatch());
295
296 real_trx->SetState(Transaction::PREPARED);
297 if (!s.ok()) {
298 break;
299 }
300 }
301
302 SequenceNumber prev_max = max_evicted_seq_;
303 SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
304 AdvanceMaxEvictedSeq(prev_max, last_seq);
305 // Create a gap between max and the next snapshot. This simplifies the logic
306 // in IsInSnapshot by not having to consider the special case of max ==
307 // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
308 if (last_seq) {
309 db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
310 db_impl_->versions_->SetLastSequence(last_seq + 1);
311 db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
312 }
313
314 // Compaction should start only after max_evicted_seq_ is set.
315 Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
316 if (!s.ok()) {
317 return s;
318 }
319
320 // Rollback unprepared transactions.
321 for (auto rtxn : rtxns) {
322 auto recovered_trx = rtxn.second;
323 if (recovered_trx->unprepared_) {
324 s = RollbackRecoveredTransaction(recovered_trx);
325 if (!s.ok()) {
326 return s;
327 }
328 continue;
329 }
330 }
331
332 if (s.ok()) {
333 dbimpl->DeleteAllRecoveredTransactions();
334 }
335
336 return s;
337 }
338
339 Transaction* WriteUnpreparedTxnDB::BeginTransaction(
340 const WriteOptions& write_options, const TransactionOptions& txn_options,
341 Transaction* old_txn) {
342 if (old_txn != nullptr) {
343 ReinitializeTransaction(old_txn, write_options, txn_options);
344 return old_txn;
345 } else {
346 return new WriteUnpreparedTxn(this, write_options, txn_options);
347 }
348 }
349
350 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
351 struct WriteUnpreparedTxnDB::IteratorState {
352 IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
353 std::shared_ptr<ManagedSnapshot> s,
354 SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
355 : callback(txn_db, sequence, min_uncommitted, txn), snapshot(s) {}
356 SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }
357
358 WriteUnpreparedTxnReadCallback callback;
359 std::shared_ptr<ManagedSnapshot> snapshot;
360 };
361
362 namespace {
363 static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
364 delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
365 }
366 } // anonymous namespace
367
368 Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
369 ColumnFamilyHandle* column_family,
370 WriteUnpreparedTxn* txn) {
371 // TODO(lth): Refactor so that this logic is shared with WritePrepared.
372 constexpr bool ALLOW_BLOB = true;
373 constexpr bool ALLOW_REFRESH = true;
374 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
375 SequenceNumber snapshot_seq;
376 SequenceNumber min_uncommitted = 0;
377 if (options.snapshot != nullptr) {
378 snapshot_seq = options.snapshot->GetSequenceNumber();
379 min_uncommitted =
380 static_cast_with_check<const SnapshotImpl, const Snapshot>(
381 options.snapshot)
382 ->min_uncommitted_;
383 } else {
384 auto* snapshot = GetSnapshot();
385 // We take a snapshot to make sure that the related data in the commit map
386 // are not deleted.
387 snapshot_seq = snapshot->GetSequenceNumber();
388 min_uncommitted =
389 static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
390 ->min_uncommitted_;
391 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
392 }
393 assert(snapshot_seq != kMaxSequenceNumber);
394 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
395 auto* state =
396 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
397 auto* db_iter =
398 db_impl_->NewIteratorImpl(options, cfd, state->MaxVisibleSeq(),
399 &state->callback, !ALLOW_BLOB, !ALLOW_REFRESH);
400 db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
401 return db_iter;
402 }
403
404 Status KeySetBuilder::PutCF(uint32_t cf, const Slice& key,
405 const Slice& /*val*/) {
406 txn_->UpdateWriteKeySet(cf, key);
407 return Status::OK();
408 }
409
410 Status KeySetBuilder::DeleteCF(uint32_t cf, const Slice& key) {
411 txn_->UpdateWriteKeySet(cf, key);
412 return Status::OK();
413 }
414
415 Status KeySetBuilder::SingleDeleteCF(uint32_t cf, const Slice& key) {
416 txn_->UpdateWriteKeySet(cf, key);
417 return Status::OK();
418 }
419
420 Status KeySetBuilder::MergeCF(uint32_t cf, const Slice& key,
421 const Slice& /*val*/) {
422 if (rollback_merge_operands_) {
423 txn_->UpdateWriteKeySet(cf, key);
424 }
425 return Status::OK();
426 }
427
428 } // namespace rocksdb
429 #endif // ROCKSDB_LITE