]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_unprepared_txn_db.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
10 #endif
11
12 #include "utilities/transactions/write_unprepared_txn_db.h"
13 #include "rocksdb/utilities/transaction_db.h"
14 #include "util/cast_util.h"
15
16 namespace rocksdb {
17
18 // Instead of reconstructing a Transaction object, and calling rollback on it,
19 // we can be more efficient with RollbackRecoveredTransaction by skipping
20 // unnecessary steps (eg. updating CommitMap, reconstructing keyset)
21 Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
22 const DBImpl::RecoveredTransaction* rtxn) {
23 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
24 assert(rtxn->unprepared_);
25 auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
26 auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
27 WriteOptions w_options;
28 // If we crash during recovery, we can just recalculate and rewrite the
29 // rollback batch.
30 w_options.disableWAL = true;
31
32 // Iterate starting with largest sequence number.
33 for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); it++) {
34 auto last_visible_txn = it->first - 1;
35 const auto& batch = it->second.batch_;
36 WriteBatch rollback_batch;
37
38 struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
39 DBImpl* db_;
40 ReadOptions roptions;
41 WritePreparedTxnReadCallback callback;
42 WriteBatch* rollback_batch_;
43 std::map<uint32_t, const Comparator*>& comparators_;
44 std::map<uint32_t, ColumnFamilyHandle*>& handles_;
45 using CFKeys = std::set<Slice, SetComparator>;
46 std::map<uint32_t, CFKeys> keys_;
47 bool rollback_merge_operands_;
48 RollbackWriteBatchBuilder(
49 DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
50 WriteBatch* dst_batch,
51 std::map<uint32_t, const Comparator*>& comparators,
52 std::map<uint32_t, ColumnFamilyHandle*>& handles,
53 bool rollback_merge_operands)
54 : db_(db),
55 callback(wpt_db, snap_seq,
56 0), // 0 disables min_uncommitted optimization
57 rollback_batch_(dst_batch),
58 comparators_(comparators),
59 handles_(handles),
60 rollback_merge_operands_(rollback_merge_operands) {}
61
62 Status Rollback(uint32_t cf, const Slice& key) {
63 Status s;
64 CFKeys& cf_keys = keys_[cf];
65 if (cf_keys.size() == 0) { // just inserted
66 auto cmp = comparators_[cf];
67 keys_[cf] = CFKeys(SetComparator(cmp));
68 }
69 auto res = cf_keys.insert(key);
70 if (res.second ==
71 false) { // second is false if a element already existed.
72 return s;
73 }
74
75 PinnableSlice pinnable_val;
76 bool not_used;
77 auto cf_handle = handles_[cf];
78 s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
79 &callback);
80 assert(s.ok() || s.IsNotFound());
81 if (s.ok()) {
82 s = rollback_batch_->Put(cf_handle, key, pinnable_val);
83 assert(s.ok());
84 } else if (s.IsNotFound()) {
85 // There has been no readable value before txn. By adding a delete we
86 // make sure that there will be none afterwards either.
87 s = rollback_batch_->Delete(cf_handle, key);
88 assert(s.ok());
89 } else {
90 // Unexpected status. Return it to the user.
91 }
92 return s;
93 }
94
95 Status PutCF(uint32_t cf, const Slice& key,
96 const Slice& /*val*/) override {
97 return Rollback(cf, key);
98 }
99
100 Status DeleteCF(uint32_t cf, const Slice& key) override {
101 return Rollback(cf, key);
102 }
103
104 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
105 return Rollback(cf, key);
106 }
107
108 Status MergeCF(uint32_t cf, const Slice& key,
109 const Slice& /*val*/) override {
110 if (rollback_merge_operands_) {
111 return Rollback(cf, key);
112 } else {
113 return Status::OK();
114 }
115 }
116
117 // Recovered batches do not contain 2PC markers.
118 Status MarkNoop(bool) override { return Status::InvalidArgument(); }
119 Status MarkBeginPrepare(bool) override {
120 return Status::InvalidArgument();
121 }
122 Status MarkEndPrepare(const Slice&) override {
123 return Status::InvalidArgument();
124 }
125 Status MarkCommit(const Slice&) override {
126 return Status::InvalidArgument();
127 }
128 Status MarkRollback(const Slice&) override {
129 return Status::InvalidArgument();
130 }
131 } rollback_handler(db_impl_, this, last_visible_txn, &rollback_batch,
132 *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
133 txn_db_options_.rollback_merge_operands);
134
135 auto s = batch->Iterate(&rollback_handler);
136 if (!s.ok()) {
137 return s;
138 }
139
140 // The Rollback marker will be used as a batch separator
141 WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
142
143 const uint64_t kNoLogRef = 0;
144 const bool kDisableMemtable = true;
145 const size_t kOneBatch = 1;
146 uint64_t seq_used = kMaxSequenceNumber;
147 s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
148 kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch);
149 if (!s.ok()) {
150 return s;
151 }
152
153 // If two_write_queues, we must manually release the sequence number to
154 // readers.
155 if (db_impl_->immutable_db_options().two_write_queues) {
156 db_impl_->SetLastPublishedSequence(seq_used);
157 }
158 }
159
160 return Status::OK();
161 }
162
163 Status WriteUnpreparedTxnDB::Initialize(
164 const std::vector<size_t>& compaction_enabled_cf_indices,
165 const std::vector<ColumnFamilyHandle*>& handles) {
166 // TODO(lth): Reduce code duplication in this function.
167 auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
168 assert(dbimpl != nullptr);
169
170 db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
171 // A callback to commit a single sub-batch
172 class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
173 public:
174 explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
175 : db_(db) {}
176 virtual Status Callback(SequenceNumber commit_seq,
177 bool is_mem_disabled) override {
178 #ifdef NDEBUG
179 (void)is_mem_disabled;
180 #endif
181 assert(!is_mem_disabled);
182 db_->AddCommitted(commit_seq, commit_seq);
183 return Status::OK();
184 }
185
186 private:
187 WritePreparedTxnDB* db_;
188 };
189 db_impl_->SetRecoverableStatePreReleaseCallback(
190 new CommitSubBatchPreReleaseCallback(this));
191
192 // PessimisticTransactionDB::Initialize
193 for (auto cf_ptr : handles) {
194 AddColumnFamily(cf_ptr);
195 }
196 // Verify cf options
197 for (auto handle : handles) {
198 ColumnFamilyDescriptor cfd;
199 Status s = handle->GetDescriptor(&cfd);
200 if (!s.ok()) {
201 return s;
202 }
203 s = VerifyCFOptions(cfd.options);
204 if (!s.ok()) {
205 return s;
206 }
207 }
208
209 // Re-enable compaction for the column families that initially had
210 // compaction enabled.
211 std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
212 compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
213 for (auto index : compaction_enabled_cf_indices) {
214 compaction_enabled_cf_handles.push_back(handles[index]);
215 }
216
217 Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
218 if (!s.ok()) {
219 return s;
220 }
221
222 // create 'real' transactions from recovered shell transactions
223 auto rtxns = dbimpl->recovered_transactions();
224 for (auto rtxn : rtxns) {
225 auto recovered_trx = rtxn.second;
226 assert(recovered_trx);
227 assert(recovered_trx->batches_.size() >= 1);
228 assert(recovered_trx->name_.length());
229
230 // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
231 // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
232 // two iterations is required.
233 if (recovered_trx->unprepared_) {
234 continue;
235 }
236
237 WriteOptions w_options;
238 w_options.sync = true;
239 TransactionOptions t_options;
240
241 auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
242 auto first_seq = recovered_trx->batches_.begin()->first;
243 auto last_prepare_batch_cnt =
244 recovered_trx->batches_.begin()->second.batch_cnt_;
245
246 Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
247 assert(real_trx);
248 auto wupt =
249 static_cast_with_check<WriteUnpreparedTxn, Transaction>(real_trx);
250
251 real_trx->SetLogNumber(first_log_number);
252 real_trx->SetId(first_seq);
253 s = real_trx->SetName(recovered_trx->name_);
254 if (!s.ok()) {
255 break;
256 }
257 wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
258
259 for (auto batch : recovered_trx->batches_) {
260 const auto& seq = batch.first;
261 const auto& batch_info = batch.second;
262 auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
263 assert(batch_info.log_number_);
264
265 for (size_t i = 0; i < cnt; i++) {
266 AddPrepared(seq + i);
267 }
268 assert(wupt->unprep_seqs_.count(seq) == 0);
269 wupt->unprep_seqs_[seq] = cnt;
270 KeySetBuilder keyset_handler(wupt,
271 txn_db_options_.rollback_merge_operands);
272 s = batch_info.batch_->Iterate(&keyset_handler);
273 assert(s.ok());
274 if (!s.ok()) {
275 break;
276 }
277 }
278
279 wupt->write_batch_.Clear();
280 WriteBatchInternal::InsertNoop(wupt->write_batch_.GetWriteBatch());
281
282 real_trx->SetState(Transaction::PREPARED);
283 if (!s.ok()) {
284 break;
285 }
286 }
287
288 SequenceNumber prev_max = max_evicted_seq_;
289 SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
290 AdvanceMaxEvictedSeq(prev_max, last_seq);
291
292 // Rollback unprepared transactions.
293 for (auto rtxn : rtxns) {
294 auto recovered_trx = rtxn.second;
295 if (recovered_trx->unprepared_) {
296 s = RollbackRecoveredTransaction(recovered_trx);
297 if (!s.ok()) {
298 return s;
299 }
300 continue;
301 }
302 }
303
304 if (s.ok()) {
305 dbimpl->DeleteAllRecoveredTransactions();
306 }
307
308 return s;
309 }
310
311 Transaction* WriteUnpreparedTxnDB::BeginTransaction(
312 const WriteOptions& write_options, const TransactionOptions& txn_options,
313 Transaction* old_txn) {
314 if (old_txn != nullptr) {
315 ReinitializeTransaction(old_txn, write_options, txn_options);
316 return old_txn;
317 } else {
318 return new WriteUnpreparedTxn(this, write_options, txn_options);
319 }
320 }
321
322 // Struct to hold ownership of snapshot and read callback for iterator cleanup.
323 struct WriteUnpreparedTxnDB::IteratorState {
324 IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
325 std::shared_ptr<ManagedSnapshot> s,
326 SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
327 : callback(txn_db, sequence, min_uncommitted, txn), snapshot(s) {}
328
329 WriteUnpreparedTxnReadCallback callback;
330 std::shared_ptr<ManagedSnapshot> snapshot;
331 };
332
333 namespace {
334 static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
335 delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
336 }
337 } // anonymous namespace
338
339 Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
340 ColumnFamilyHandle* column_family,
341 WriteUnpreparedTxn* txn) {
342 // TODO(lth): Refactor so that this logic is shared with WritePrepared.
343 constexpr bool ALLOW_BLOB = true;
344 constexpr bool ALLOW_REFRESH = true;
345 std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
346 SequenceNumber snapshot_seq;
347 SequenceNumber min_uncommitted = 0;
348 if (options.snapshot != nullptr) {
349 snapshot_seq = options.snapshot->GetSequenceNumber();
350 min_uncommitted =
351 static_cast_with_check<const SnapshotImpl, const Snapshot>(
352 options.snapshot)
353 ->min_uncommitted_;
354 } else {
355 auto* snapshot = GetSnapshot();
356 // We take a snapshot to make sure that the related data in the commit map
357 // are not deleted.
358 snapshot_seq = snapshot->GetSequenceNumber();
359 min_uncommitted =
360 static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
361 ->min_uncommitted_;
362 own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
363 }
364 assert(snapshot_seq != kMaxSequenceNumber);
365 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
366 auto* state =
367 new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
368 auto* db_iter =
369 db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
370 !ALLOW_BLOB, !ALLOW_REFRESH);
371 db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
372 return db_iter;
373 }
374
375 Status KeySetBuilder::PutCF(uint32_t cf, const Slice& key,
376 const Slice& /*val*/) {
377 txn_->UpdateWriteKeySet(cf, key);
378 return Status::OK();
379 }
380
381 Status KeySetBuilder::DeleteCF(uint32_t cf, const Slice& key) {
382 txn_->UpdateWriteKeySet(cf, key);
383 return Status::OK();
384 }
385
386 Status KeySetBuilder::SingleDeleteCF(uint32_t cf, const Slice& key) {
387 txn_->UpdateWriteKeySet(cf, key);
388 return Status::OK();
389 }
390
391 Status KeySetBuilder::MergeCF(uint32_t cf, const Slice& key,
392 const Slice& /*val*/) {
393 if (rollback_merge_operands_) {
394 txn_->UpdateWriteKeySet(cf, key);
395 }
396 return Status::OK();
397 }
398
399 } // namespace rocksdb
400 #endif // ROCKSDB_LITE