]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/write_prepared_txn.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / write_prepared_txn.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/write_prepared_txn.h"
9
10 #include <cinttypes>
11 #include <map>
12 #include <set>
13
14 #include "db/column_family.h"
15 #include "db/db_impl/db_impl.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/status.h"
18 #include "rocksdb/utilities/transaction_db.h"
19 #include "util/cast_util.h"
20 #include "utilities/transactions/pessimistic_transaction.h"
21 #include "utilities/transactions/write_prepared_txn_db.h"
22
23 namespace ROCKSDB_NAMESPACE {
24
25 struct WriteOptions;
26
27 WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
28 const WriteOptions& write_options,
29 const TransactionOptions& txn_options)
30 : PessimisticTransaction(txn_db, write_options, txn_options, false),
31 wpt_db_(txn_db) {
32 // Call Initialize outside PessimisticTransaction constructor otherwise it
33 // would skip overridden functions in WritePreparedTxn since they are not
34 // defined yet in the constructor of PessimisticTransaction
35 Initialize(txn_options);
36 }
37
38 void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
39 PessimisticTransaction::Initialize(txn_options);
40 prepare_batch_cnt_ = 0;
41 }
42
43 void WritePreparedTxn::MultiGet(const ReadOptions& options,
44 ColumnFamilyHandle* column_family,
45 const size_t num_keys, const Slice* keys,
46 PinnableSlice* values, Status* statuses,
47 const bool sorted_input) {
48 SequenceNumber min_uncommitted, snap_seq;
49 const SnapshotBackup backed_by_snapshot =
50 wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
51 WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
52 backed_by_snapshot);
53 write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
54 keys, values, statuses, sorted_input,
55 &callback);
56 if (UNLIKELY(!callback.valid() ||
57 !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
58 wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
59 for (size_t i = 0; i < num_keys; i++) {
60 statuses[i] = Status::TryAgain();
61 }
62 }
63 }
64
65 Status WritePreparedTxn::Get(const ReadOptions& options,
66 ColumnFamilyHandle* column_family,
67 const Slice& key, PinnableSlice* pinnable_val) {
68 SequenceNumber min_uncommitted, snap_seq;
69 const SnapshotBackup backed_by_snapshot =
70 wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
71 WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
72 backed_by_snapshot);
73 Status res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
74 pinnable_val, &callback);
75 const bool callback_valid =
76 callback.valid(); // NOTE: validity of callback must always be checked
77 // before it is destructed
78 if (res.ok()) {
79 if (!LIKELY(callback_valid &&
80 wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
81 backed_by_snapshot))) {
82 wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
83 res = Status::TryAgain();
84 }
85 }
86
87 return res;
88 }
89
90 Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
91 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
92 Iterator* db_iter = wpt_db_->NewIterator(options);
93 assert(db_iter);
94
95 return write_batch_.NewIteratorWithBase(db_iter);
96 }
97
98 Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
99 ColumnFamilyHandle* column_family) {
100 // Make sure to get iterator from WritePrepareTxnDB, not the root db.
101 Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
102 assert(db_iter);
103
104 return write_batch_.NewIteratorWithBase(column_family, db_iter);
105 }
106
107 Status WritePreparedTxn::PrepareInternal() {
108 WriteOptions write_options = write_options_;
109 write_options.disableWAL = false;
110 const bool WRITE_AFTER_COMMIT = true;
111 const bool kFirstPrepareBatch = true;
112 auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
113 name_, !WRITE_AFTER_COMMIT);
114 assert(s.ok());
115 // For each duplicate key we account for a new sub-batch
116 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
117 // Having AddPrepared in the PreReleaseCallback allows in-order addition of
118 // prepared entries to PreparedHeap and hence enables an optimization. Refer
119 // to SmallestUnCommittedSeq for more details.
120 AddPreparedCallback add_prepared_callback(
121 wpt_db_, db_impl_, prepare_batch_cnt_,
122 db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
123 const bool DISABLE_MEMTABLE = true;
124 uint64_t seq_used = kMaxSequenceNumber;
125 s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
126 /*callback*/ nullptr, &log_number_, /*log ref*/ 0,
127 !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_,
128 &add_prepared_callback);
129 assert(!s.ok() || seq_used != kMaxSequenceNumber);
130 auto prepare_seq = seq_used;
131 SetId(prepare_seq);
132 return s;
133 }
134
135 Status WritePreparedTxn::CommitWithoutPrepareInternal() {
136 // For each duplicate key we account for a new sub-batch
137 const size_t batch_cnt = GetWriteBatch()->SubBatchCnt();
138 return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
139 }
140
141 Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
142 size_t batch_cnt) {
143 return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this);
144 }
145
146 Status WritePreparedTxn::CommitInternal() {
147 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
148 "CommitInternal prepare_seq: %" PRIu64, GetID());
149 // We take the commit-time batch and append the Commit marker.
150 // The Memtable will ignore the Commit marker in non-recovery mode
151 WriteBatch* working_batch = GetCommitTimeWriteBatch();
152 const bool empty = working_batch->Count() == 0;
153 auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
154 assert(s.ok());
155
156 const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
157 if (!empty) {
158 // When not writing to memtable, we can still cache the latest write batch.
159 // The cached batch will be written to memtable in WriteRecoverableState
160 // during FlushMemTable
161 if (for_recovery) {
162 WriteBatchInternal::SetAsLatestPersistentState(working_batch);
163 } else {
164 return Status::InvalidArgument(
165 "Commit-time-batch can only be used if "
166 "use_only_the_last_commit_time_batch_for_recovery is true");
167 }
168 }
169
170 auto prepare_seq = GetId();
171 const bool includes_data = !empty && !for_recovery;
172 assert(prepare_batch_cnt_);
173 size_t commit_batch_cnt = 0;
174 if (UNLIKELY(includes_data)) {
175 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
176 "Duplicate key overhead");
177 SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
178 s = working_batch->Iterate(&counter);
179 assert(s.ok());
180 commit_batch_cnt = counter.BatchCount();
181 }
182 const bool disable_memtable = !includes_data;
183 const bool do_one_write =
184 !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
185 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
186 wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
187 // This is to call AddPrepared on CommitTimeWriteBatch
188 const bool kFirstPrepareBatch = true;
189 AddPreparedCallback add_prepared_callback(
190 wpt_db_, db_impl_, commit_batch_cnt,
191 db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
192 PreReleaseCallback* pre_release_callback;
193 if (do_one_write) {
194 pre_release_callback = &update_commit_map;
195 } else {
196 pre_release_callback = &add_prepared_callback;
197 }
198 uint64_t seq_used = kMaxSequenceNumber;
199 // Since the prepared batch is directly written to memtable, there is already
200 // a connection between the memtable and its WAL, so there is no need to
201 // redundantly reference the log that contains the prepared data.
202 const uint64_t zero_log_number = 0ull;
203 size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
204 // If `two_write_queues && includes_data`, then `do_one_write` is false. The
205 // following `WriteImpl` will insert the data of the commit-time-batch into
206 // the database before updating the commit cache. Therefore, the data of the
207 // commmit-time-batch is considered uncommitted. Furthermore, since data of
208 // the commit-time-batch are not locked, it is possible for two uncommitted
209 // versions of the same key to co-exist for a (short) period of time until
210 // the commit cache is updated by the second write. If the two uncommitted
211 // keys are compacted to the bottommost level in the meantime, it is possible
212 // that compaction iterator will zero out the sequence numbers of both, thus
213 // violating the invariant that an SST does not have two identical internal
214 // keys. To prevent this situation, we should allow the usage of
215 // commit-time-batch only if the user sets
216 // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery to
217 // true. See the comments about GetCommitTimeWriteBatch() in
218 // include/rocksdb/utilities/transaction.h.
219 s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
220 zero_log_number, disable_memtable, &seq_used,
221 batch_cnt, pre_release_callback);
222 assert(!s.ok() || seq_used != kMaxSequenceNumber);
223 const SequenceNumber commit_batch_seq = seq_used;
224 if (LIKELY(do_one_write || !s.ok())) {
225 if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
226 s.ok())) {
227 // Note: RemovePrepared should be called after WriteImpl that publishsed
228 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
229 wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
230 } // else RemovePrepared is called from within PreReleaseCallback
231 if (UNLIKELY(!do_one_write)) {
232 assert(!s.ok());
233 // Cleanup the prepared entry we added with add_prepared_callback
234 wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
235 }
236 return s;
237 } // else do the 2nd write to publish seq
238 // Note: the 2nd write comes with a performance penality. So if we have too
239 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
240 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
241 // two_write_queues should be disabled to avoid many additional writes here.
242 const size_t kZeroData = 0;
243 // Update commit map only from the 2nd queue
244 WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch(
245 wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
246 commit_batch_seq, commit_batch_cnt);
247 WriteBatch empty_batch;
248 s = empty_batch.PutLogData(Slice());
249 assert(s.ok());
250 // In the absence of Prepare markers, use Noop as a batch separator
251 s = WriteBatchInternal::InsertNoop(&empty_batch);
252 assert(s.ok());
253 const bool DISABLE_MEMTABLE = true;
254 const size_t ONE_BATCH = 1;
255 const uint64_t NO_REF_LOG = 0;
256 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
257 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
258 &update_commit_map_with_aux_batch);
259 assert(!s.ok() || seq_used != kMaxSequenceNumber);
260 return s;
261 }
262
263 Status WritePreparedTxn::RollbackInternal() {
264 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
265 "RollbackInternal prepare_seq: %" PRIu64, GetId());
266
267 assert(db_impl_);
268 assert(wpt_db_);
269
270 WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */,
271 write_options_.protection_bytes_per_key,
272 0 /* default_cf_ts_sz */);
273 assert(GetId() != kMaxSequenceNumber);
274 assert(GetId() > 0);
275 auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
276 auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
277 auto read_at_seq = kMaxSequenceNumber;
278 ReadOptions roptions;
279 // to prevent callback's seq to be overrriden inside DBImpk::Get
280 roptions.snapshot = wpt_db_->GetMaxSnapshot();
281 struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
282 DBImpl* const db_;
283 WritePreparedTxnDB* const wpt_db_;
284 WritePreparedTxnReadCallback callback_;
285 WriteBatch* rollback_batch_;
286 std::map<uint32_t, const Comparator*>& comparators_;
287 std::map<uint32_t, ColumnFamilyHandle*>& handles_;
288 using CFKeys = std::set<Slice, SetComparator>;
289 std::map<uint32_t, CFKeys> keys_;
290 bool rollback_merge_operands_;
291 ReadOptions roptions_;
292
293 RollbackWriteBatchBuilder(
294 DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
295 WriteBatch* dst_batch,
296 std::map<uint32_t, const Comparator*>& comparators,
297 std::map<uint32_t, ColumnFamilyHandle*>& handles,
298 bool rollback_merge_operands, const ReadOptions& _roptions)
299 : db_(db),
300 wpt_db_(wpt_db),
301 callback_(wpt_db, snap_seq), // disable min_uncommitted optimization
302 rollback_batch_(dst_batch),
303 comparators_(comparators),
304 handles_(handles),
305 rollback_merge_operands_(rollback_merge_operands),
306 roptions_(_roptions) {}
307
308 Status Rollback(uint32_t cf, const Slice& key) {
309 Status s;
310 CFKeys& cf_keys = keys_[cf];
311 if (cf_keys.size() == 0) { // just inserted
312 auto cmp = comparators_[cf];
313 keys_[cf] = CFKeys(SetComparator(cmp));
314 }
315 auto it = cf_keys.insert(key);
316 // second is false if a element already existed.
317 if (it.second == false) {
318 return s;
319 }
320
321 PinnableSlice pinnable_val;
322 bool not_used;
323 auto cf_handle = handles_[cf];
324 DBImpl::GetImplOptions get_impl_options;
325 get_impl_options.column_family = cf_handle;
326 get_impl_options.value = &pinnable_val;
327 get_impl_options.value_found = &not_used;
328 get_impl_options.callback = &callback_;
329 s = db_->GetImpl(roptions_, key, get_impl_options);
330 assert(s.ok() || s.IsNotFound());
331 if (s.ok()) {
332 s = rollback_batch_->Put(cf_handle, key, pinnable_val);
333 assert(s.ok());
334 } else if (s.IsNotFound()) {
335 // There has been no readable value before txn. By adding a delete we
336 // make sure that there will be none afterwards either.
337 if (wpt_db_->ShouldRollbackWithSingleDelete(cf_handle, key)) {
338 s = rollback_batch_->SingleDelete(cf_handle, key);
339 } else {
340 s = rollback_batch_->Delete(cf_handle, key);
341 }
342 assert(s.ok());
343 } else {
344 // Unexpected status. Return it to the user.
345 }
346 return s;
347 }
348
349 Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override {
350 return Rollback(cf, key);
351 }
352
353 Status DeleteCF(uint32_t cf, const Slice& key) override {
354 return Rollback(cf, key);
355 }
356
357 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
358 return Rollback(cf, key);
359 }
360
361 Status MergeCF(uint32_t cf, const Slice& key,
362 const Slice& /*val*/) override {
363 if (rollback_merge_operands_) {
364 return Rollback(cf, key);
365 } else {
366 return Status::OK();
367 }
368 }
369
370 Status MarkNoop(bool) override { return Status::OK(); }
371 Status MarkBeginPrepare(bool) override { return Status::OK(); }
372 Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
373 Status MarkCommit(const Slice&) override { return Status::OK(); }
374 Status MarkRollback(const Slice&) override {
375 return Status::InvalidArgument();
376 }
377
378 protected:
379 Handler::OptionState WriteAfterCommit() const override {
380 return Handler::OptionState::kDisabled;
381 }
382 } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
383 *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
384 wpt_db_->txn_db_options_.rollback_merge_operands,
385 roptions);
386 auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
387 if (!s.ok()) {
388 return s;
389 }
390 // The Rollback marker will be used as a batch separator
391 s = WriteBatchInternal::MarkRollback(&rollback_batch, name_);
392 assert(s.ok());
393 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
394 const bool DISABLE_MEMTABLE = true;
395 const uint64_t NO_REF_LOG = 0;
396 uint64_t seq_used = kMaxSequenceNumber;
397 const size_t ONE_BATCH = 1;
398 const bool kFirstPrepareBatch = true;
399 // We commit the rolled back prepared batches. Although this is
400 // counter-intuitive, i) it is safe to do so, since the prepared batches are
401 // already canceled out by the rollback batch, ii) adding the commit entry to
402 // CommitCache will allow us to benefit from the existing mechanism in
403 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
404 // with a live snapshot around so that the live snapshot properly skips the
405 // entry even if its prepare seq is lower than max_evicted_seq_.
406 AddPreparedCallback add_prepared_callback(
407 wpt_db_, db_impl_, ONE_BATCH,
408 db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
409 WritePreparedCommitEntryPreReleaseCallback update_commit_map(
410 wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
411 PreReleaseCallback* pre_release_callback;
412 if (do_one_write) {
413 pre_release_callback = &update_commit_map;
414 } else {
415 pre_release_callback = &add_prepared_callback;
416 }
417 // Note: the rollback batch does not need AddPrepared since it is written to
418 // DB in one shot. min_uncommitted still works since it requires capturing
419 // data that is written to DB but not yet committed, while
420 // the rollback batch commits with PreReleaseCallback.
421 s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
422 NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
423 pre_release_callback);
424 assert(!s.ok() || seq_used != kMaxSequenceNumber);
425 if (!s.ok()) {
426 return s;
427 }
428 if (do_one_write) {
429 assert(!db_impl_->immutable_db_options().two_write_queues);
430 wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
431 return s;
432 } // else do the 2nd write for commit
433 uint64_t rollback_seq = seq_used;
434 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
435 "RollbackInternal 2nd write rollback_seq: %" PRIu64,
436 rollback_seq);
437 // Commit the batch by writing an empty batch to the queue that will release
438 // the commit sequence number to readers.
439 WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
440 wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
441 WriteBatch empty_batch;
442 s = empty_batch.PutLogData(Slice());
443 assert(s.ok());
444 // In the absence of Prepare markers, use Noop as a batch separator
445 s = WriteBatchInternal::InsertNoop(&empty_batch);
446 assert(s.ok());
447 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
448 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
449 &update_commit_map_with_prepare);
450 assert(!s.ok() || seq_used != kMaxSequenceNumber);
451 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
452 "RollbackInternal (status=%s) commit: %" PRIu64,
453 s.ToString().c_str(), GetId());
454 // TODO(lth): For WriteUnPrepared that rollback is called frequently,
455 // RemovePrepared could be moved to the callback to reduce lock contention.
456 if (s.ok()) {
457 wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
458 }
459 // Note: RemovePrepared for prepared batch is called from within
460 // PreReleaseCallback
461 wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
462
463 return s;
464 }
465
466 Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
467 const Slice& key,
468 SequenceNumber* tracked_at_seq) {
469 assert(snapshot_);
470
471 SequenceNumber min_uncommitted =
472 static_cast_with_check<const SnapshotImpl>(snapshot_.get())
473 ->min_uncommitted_;
474 SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
475 // tracked_at_seq is either max or the last snapshot with which this key was
476 // trackeed so there is no need to apply the IsInSnapshot to this comparison
477 // here as tracked_at_seq is not a prepare seq.
478 if (*tracked_at_seq <= snap_seq) {
479 // If the key has been previous validated at a sequence number earlier
480 // than the curent snapshot's sequence number, we already know it has not
481 // been modified.
482 return Status::OK();
483 }
484
485 *tracked_at_seq = snap_seq;
486
487 ColumnFamilyHandle* cfh =
488 column_family ? column_family : db_impl_->DefaultColumnFamily();
489
490 WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted,
491 kBackedByDBSnapshot);
492 // TODO(yanqin): support user-defined timestamp
493 return TransactionUtil::CheckKeyForConflicts(
494 db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr,
495 false /* cache_only */, &snap_checker, min_uncommitted);
496 }
497
498 void WritePreparedTxn::SetSnapshot() {
499 const bool kForWWConflictCheck = true;
500 SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck);
501 SetSnapshotInternal(snapshot);
502 }
503
504 Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
505 auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
506 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
507 return ret;
508 }
509
510 } // namespace ROCKSDB_NAMESPACE
511
512 #endif // ROCKSDB_LITE