1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // WriteBatch::rep_ :=
13 // data: record[count]
15 // kTypeValue varstring varstring
16 // kTypeDeletion varstring
17 // kTypeSingleDeletion varstring
18 // kTypeRangeDeletion varstring varstring
19 // kTypeMerge varstring varstring
20 // kTypeColumnFamilyValue varint32 varstring varstring
21 // kTypeColumnFamilyDeletion varint32 varstring
22 // kTypeColumnFamilySingleDeletion varint32 varstring
23 // kTypeColumnFamilyRangeDeletion varint32 varstring varstring
24 // kTypeColumnFamilyMerge varint32 varstring varstring
25 // kTypeBeginPrepareXID
26 // kTypeEndPrepareXID varstring
27 // kTypeCommitXID varstring
28 // kTypeCommitXIDAndTimestamp varstring varstring
29 // kTypeRollbackXID varstring
30 // kTypeBeginPersistedPrepareXID
31 // kTypeBeginUnprepareXID
32 // kTypeWideColumnEntity varstring varstring
33 // kTypeColumnFamilyWideColumnEntity varint32 varstring varstring
39 #include "rocksdb/write_batch.h"
46 #include <type_traits>
47 #include <unordered_map>
50 #include "db/column_family.h"
51 #include "db/db_impl/db_impl.h"
52 #include "db/dbformat.h"
53 #include "db/flush_scheduler.h"
54 #include "db/kv_checksum.h"
55 #include "db/memtable.h"
56 #include "db/merge_context.h"
57 #include "db/snapshot_impl.h"
58 #include "db/trim_history_scheduler.h"
59 #include "db/wide/wide_column_serialization.h"
60 #include "db/write_batch_internal.h"
61 #include "monitoring/perf_context_imp.h"
62 #include "monitoring/statistics.h"
63 #include "port/lang.h"
64 #include "rocksdb/merge_operator.h"
65 #include "rocksdb/system_clock.h"
66 #include "util/autovector.h"
67 #include "util/cast_util.h"
68 #include "util/coding.h"
69 #include "util/duplicate_detector.h"
70 #include "util/string_util.h"
72 namespace ROCKSDB_NAMESPACE
{
74 // anon namespace for file-local types
77 enum ContentFlags
: uint32_t {
81 HAS_SINGLE_DELETE
= 1 << 3,
83 HAS_BEGIN_PREPARE
= 1 << 5,
84 HAS_END_PREPARE
= 1 << 6,
86 HAS_ROLLBACK
= 1 << 8,
87 HAS_DELETE_RANGE
= 1 << 9,
88 HAS_BLOB_INDEX
= 1 << 10,
89 HAS_BEGIN_UNPREPARE
= 1 << 11,
90 HAS_PUT_ENTITY
= 1 << 12,
93 struct BatchContentClassifier
: public WriteBatch::Handler
{
94 uint32_t content_flags
= 0;
96 Status
PutCF(uint32_t, const Slice
&, const Slice
&) override
{
97 content_flags
|= ContentFlags::HAS_PUT
;
101 Status
PutEntityCF(uint32_t /* column_family_id */, const Slice
& /* key */,
102 const Slice
& /* entity */) override
{
103 content_flags
|= ContentFlags::HAS_PUT_ENTITY
;
107 Status
DeleteCF(uint32_t, const Slice
&) override
{
108 content_flags
|= ContentFlags::HAS_DELETE
;
112 Status
SingleDeleteCF(uint32_t, const Slice
&) override
{
113 content_flags
|= ContentFlags::HAS_SINGLE_DELETE
;
117 Status
DeleteRangeCF(uint32_t, const Slice
&, const Slice
&) override
{
118 content_flags
|= ContentFlags::HAS_DELETE_RANGE
;
122 Status
MergeCF(uint32_t, const Slice
&, const Slice
&) override
{
123 content_flags
|= ContentFlags::HAS_MERGE
;
127 Status
PutBlobIndexCF(uint32_t, const Slice
&, const Slice
&) override
{
128 content_flags
|= ContentFlags::HAS_BLOB_INDEX
;
132 Status
MarkBeginPrepare(bool unprepare
) override
{
133 content_flags
|= ContentFlags::HAS_BEGIN_PREPARE
;
135 content_flags
|= ContentFlags::HAS_BEGIN_UNPREPARE
;
140 Status
MarkEndPrepare(const Slice
&) override
{
141 content_flags
|= ContentFlags::HAS_END_PREPARE
;
145 Status
MarkCommit(const Slice
&) override
{
146 content_flags
|= ContentFlags::HAS_COMMIT
;
150 Status
MarkCommitWithTimestamp(const Slice
&, const Slice
&) override
{
151 content_flags
|= ContentFlags::HAS_COMMIT
;
155 Status
MarkRollback(const Slice
&) override
{
156 content_flags
|= ContentFlags::HAS_ROLLBACK
;
161 } // anonymous namespace
164 std::stack
<SavePoint
, autovector
<SavePoint
>> stack
;
167 WriteBatch::WriteBatch(size_t reserved_bytes
, size_t max_bytes
,
168 size_t protection_bytes_per_key
, size_t default_cf_ts_sz
)
170 max_bytes_(max_bytes
),
171 default_cf_ts_sz_(default_cf_ts_sz
),
173 // Currently `protection_bytes_per_key` can only be enabled at 8 bytes per
175 assert(protection_bytes_per_key
== 0 || protection_bytes_per_key
== 8);
176 if (protection_bytes_per_key
!= 0) {
177 prot_info_
.reset(new WriteBatch::ProtectionInfo());
179 rep_
.reserve((reserved_bytes
> WriteBatchInternal::kHeader
)
181 : WriteBatchInternal::kHeader
);
182 rep_
.resize(WriteBatchInternal::kHeader
);
185 WriteBatch::WriteBatch(const std::string
& rep
)
186 : content_flags_(ContentFlags::DEFERRED
), max_bytes_(0), rep_(rep
) {}
188 WriteBatch::WriteBatch(std::string
&& rep
)
189 : content_flags_(ContentFlags::DEFERRED
),
191 rep_(std::move(rep
)) {}
193 WriteBatch::WriteBatch(const WriteBatch
& src
)
194 : wal_term_point_(src
.wal_term_point_
),
195 content_flags_(src
.content_flags_
.load(std::memory_order_relaxed
)),
196 max_bytes_(src
.max_bytes_
),
197 default_cf_ts_sz_(src
.default_cf_ts_sz_
),
199 if (src
.save_points_
!= nullptr) {
200 save_points_
.reset(new SavePoints());
201 save_points_
->stack
= src
.save_points_
->stack
;
203 if (src
.prot_info_
!= nullptr) {
204 prot_info_
.reset(new WriteBatch::ProtectionInfo());
205 prot_info_
->entries_
= src
.prot_info_
->entries_
;
209 WriteBatch::WriteBatch(WriteBatch
&& src
) noexcept
210 : save_points_(std::move(src
.save_points_
)),
211 wal_term_point_(std::move(src
.wal_term_point_
)),
212 content_flags_(src
.content_flags_
.load(std::memory_order_relaxed
)),
213 max_bytes_(src
.max_bytes_
),
214 prot_info_(std::move(src
.prot_info_
)),
215 default_cf_ts_sz_(src
.default_cf_ts_sz_
),
216 rep_(std::move(src
.rep_
)) {}
218 WriteBatch
& WriteBatch::operator=(const WriteBatch
& src
) {
221 new (this) WriteBatch(src
);
226 WriteBatch
& WriteBatch::operator=(WriteBatch
&& src
) {
229 new (this) WriteBatch(std::move(src
));
234 WriteBatch::~WriteBatch() {}
236 WriteBatch::Handler::~Handler() {}
238 void WriteBatch::Handler::LogData(const Slice
& /*blob*/) {
239 // If the user has not specified something to do with blobs, then we ignore
243 bool WriteBatch::Handler::Continue() { return true; }
245 void WriteBatch::Clear() {
247 rep_
.resize(WriteBatchInternal::kHeader
);
249 content_flags_
.store(0, std::memory_order_relaxed
);
251 if (save_points_
!= nullptr) {
252 while (!save_points_
->stack
.empty()) {
253 save_points_
->stack
.pop();
257 if (prot_info_
!= nullptr) {
258 prot_info_
->entries_
.clear();
260 wal_term_point_
.clear();
261 default_cf_ts_sz_
= 0;
264 uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
266 uint32_t WriteBatch::ComputeContentFlags() const {
267 auto rv
= content_flags_
.load(std::memory_order_relaxed
);
268 if ((rv
& ContentFlags::DEFERRED
) != 0) {
269 BatchContentClassifier classifier
;
270 // Should we handle status here?
271 Iterate(&classifier
).PermitUncheckedError();
272 rv
= classifier
.content_flags
;
274 // this method is conceptually const, because it is performing a lazy
275 // computation that doesn't affect the abstract state of the batch.
276 // content_flags_ is marked mutable so that we can perform the
277 // following assignment
278 content_flags_
.store(rv
, std::memory_order_relaxed
);
283 void WriteBatch::MarkWalTerminationPoint() {
284 wal_term_point_
.size
= GetDataSize();
285 wal_term_point_
.count
= Count();
286 wal_term_point_
.content_flags
= content_flags_
;
289 size_t WriteBatch::GetProtectionBytesPerKey() const {
290 if (prot_info_
!= nullptr) {
291 return prot_info_
->GetBytesPerKey();
296 bool WriteBatch::HasPut() const {
297 return (ComputeContentFlags() & ContentFlags::HAS_PUT
) != 0;
300 bool WriteBatch::HasPutEntity() const {
301 return (ComputeContentFlags() & ContentFlags::HAS_PUT_ENTITY
) != 0;
304 bool WriteBatch::HasDelete() const {
305 return (ComputeContentFlags() & ContentFlags::HAS_DELETE
) != 0;
308 bool WriteBatch::HasSingleDelete() const {
309 return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE
) != 0;
312 bool WriteBatch::HasDeleteRange() const {
313 return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE
) != 0;
316 bool WriteBatch::HasMerge() const {
317 return (ComputeContentFlags() & ContentFlags::HAS_MERGE
) != 0;
320 bool ReadKeyFromWriteBatchEntry(Slice
* input
, Slice
* key
, bool cf_record
) {
321 assert(input
!= nullptr && key
!= nullptr);
323 input
->remove_prefix(1);
326 // Skip column_family bytes
328 if (!GetVarint32(input
, &cf
)) {
334 return GetLengthPrefixedSlice(input
, key
);
337 bool WriteBatch::HasBeginPrepare() const {
338 return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE
) != 0;
341 bool WriteBatch::HasEndPrepare() const {
342 return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE
) != 0;
345 bool WriteBatch::HasCommit() const {
346 return (ComputeContentFlags() & ContentFlags::HAS_COMMIT
) != 0;
349 bool WriteBatch::HasRollback() const {
350 return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK
) != 0;
353 Status
ReadRecordFromWriteBatch(Slice
* input
, char* tag
,
354 uint32_t* column_family
, Slice
* key
,
355 Slice
* value
, Slice
* blob
, Slice
* xid
) {
356 assert(key
!= nullptr && value
!= nullptr);
358 input
->remove_prefix(1);
359 *column_family
= 0; // default
361 case kTypeColumnFamilyValue
:
362 if (!GetVarint32(input
, column_family
)) {
363 return Status::Corruption("bad WriteBatch Put");
365 FALLTHROUGH_INTENDED
;
367 if (!GetLengthPrefixedSlice(input
, key
) ||
368 !GetLengthPrefixedSlice(input
, value
)) {
369 return Status::Corruption("bad WriteBatch Put");
372 case kTypeColumnFamilyDeletion
:
373 case kTypeColumnFamilySingleDeletion
:
374 if (!GetVarint32(input
, column_family
)) {
375 return Status::Corruption("bad WriteBatch Delete");
377 FALLTHROUGH_INTENDED
;
379 case kTypeSingleDeletion
:
380 if (!GetLengthPrefixedSlice(input
, key
)) {
381 return Status::Corruption("bad WriteBatch Delete");
384 case kTypeColumnFamilyRangeDeletion
:
385 if (!GetVarint32(input
, column_family
)) {
386 return Status::Corruption("bad WriteBatch DeleteRange");
388 FALLTHROUGH_INTENDED
;
389 case kTypeRangeDeletion
:
390 // for range delete, "key" is begin_key, "value" is end_key
391 if (!GetLengthPrefixedSlice(input
, key
) ||
392 !GetLengthPrefixedSlice(input
, value
)) {
393 return Status::Corruption("bad WriteBatch DeleteRange");
396 case kTypeColumnFamilyMerge
:
397 if (!GetVarint32(input
, column_family
)) {
398 return Status::Corruption("bad WriteBatch Merge");
400 FALLTHROUGH_INTENDED
;
402 if (!GetLengthPrefixedSlice(input
, key
) ||
403 !GetLengthPrefixedSlice(input
, value
)) {
404 return Status::Corruption("bad WriteBatch Merge");
407 case kTypeColumnFamilyBlobIndex
:
408 if (!GetVarint32(input
, column_family
)) {
409 return Status::Corruption("bad WriteBatch BlobIndex");
411 FALLTHROUGH_INTENDED
;
413 if (!GetLengthPrefixedSlice(input
, key
) ||
414 !GetLengthPrefixedSlice(input
, value
)) {
415 return Status::Corruption("bad WriteBatch BlobIndex");
419 assert(blob
!= nullptr);
420 if (!GetLengthPrefixedSlice(input
, blob
)) {
421 return Status::Corruption("bad WriteBatch Blob");
425 case kTypeBeginPrepareXID
:
426 // This indicates that the prepared batch is also persisted in the db.
427 // This is used in WritePreparedTxn
428 case kTypeBeginPersistedPrepareXID
:
429 // This is used in WriteUnpreparedTxn
430 case kTypeBeginUnprepareXID
:
432 case kTypeEndPrepareXID
:
433 if (!GetLengthPrefixedSlice(input
, xid
)) {
434 return Status::Corruption("bad EndPrepare XID");
437 case kTypeCommitXIDAndTimestamp
:
438 if (!GetLengthPrefixedSlice(input
, key
)) {
439 return Status::Corruption("bad commit timestamp");
441 FALLTHROUGH_INTENDED
;
443 if (!GetLengthPrefixedSlice(input
, xid
)) {
444 return Status::Corruption("bad Commit XID");
447 case kTypeRollbackXID
:
448 if (!GetLengthPrefixedSlice(input
, xid
)) {
449 return Status::Corruption("bad Rollback XID");
452 case kTypeColumnFamilyWideColumnEntity
:
453 if (!GetVarint32(input
, column_family
)) {
454 return Status::Corruption("bad WriteBatch PutEntity");
456 FALLTHROUGH_INTENDED
;
457 case kTypeWideColumnEntity
:
458 if (!GetLengthPrefixedSlice(input
, key
) ||
459 !GetLengthPrefixedSlice(input
, value
)) {
460 return Status::Corruption("bad WriteBatch PutEntity");
464 return Status::Corruption("unknown WriteBatch tag");
469 Status
WriteBatch::Iterate(Handler
* handler
) const {
470 if (rep_
.size() < WriteBatchInternal::kHeader
) {
471 return Status::Corruption("malformed WriteBatch (too small)");
474 return WriteBatchInternal::Iterate(this, handler
, WriteBatchInternal::kHeader
,
478 Status
WriteBatchInternal::Iterate(const WriteBatch
* wb
,
479 WriteBatch::Handler
* handler
, size_t begin
,
481 if (begin
> wb
->rep_
.size() || end
> wb
->rep_
.size() || end
< begin
) {
482 return Status::Corruption("Invalid start/end bounds for Iterate");
484 assert(begin
<= end
);
485 Slice
input(wb
->rep_
.data() + begin
, static_cast<size_t>(end
- begin
));
487 (begin
== WriteBatchInternal::kHeader
) && (end
== wb
->rep_
.size());
489 Slice key
, value
, blob
, xid
;
491 // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
492 // the batch boundary symbols otherwise we would mis-count the number of
493 // batches. We do that by checking whether the accumulated batch is empty
494 // before seeing the next Noop.
495 bool empty_batch
= true;
499 uint32_t column_family
= 0; // default
500 bool last_was_try_again
= false;
501 bool handler_continue
= true;
502 while (((s
.ok() && !input
.empty()) || UNLIKELY(s
.IsTryAgain()))) {
503 handler_continue
= handler
->Continue();
504 if (!handler_continue
) {
508 if (LIKELY(!s
.IsTryAgain())) {
509 last_was_try_again
= false;
511 column_family
= 0; // default
513 s
= ReadRecordFromWriteBatch(&input
, &tag
, &column_family
, &key
, &value
,
519 assert(s
.IsTryAgain());
520 assert(!last_was_try_again
); // to detect infinite loop bugs
521 if (UNLIKELY(last_was_try_again
)) {
522 return Status::Corruption(
523 "two consecutive TryAgain in WriteBatch handler; this is either a "
524 "software bug or data corruption.");
526 last_was_try_again
= true;
531 case kTypeColumnFamilyValue
:
533 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
534 (ContentFlags::DEFERRED
| ContentFlags::HAS_PUT
));
535 s
= handler
->PutCF(column_family
, key
, value
);
536 if (LIKELY(s
.ok())) {
541 case kTypeColumnFamilyDeletion
:
543 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
544 (ContentFlags::DEFERRED
| ContentFlags::HAS_DELETE
));
545 s
= handler
->DeleteCF(column_family
, key
);
546 if (LIKELY(s
.ok())) {
551 case kTypeColumnFamilySingleDeletion
:
552 case kTypeSingleDeletion
:
553 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
554 (ContentFlags::DEFERRED
| ContentFlags::HAS_SINGLE_DELETE
));
555 s
= handler
->SingleDeleteCF(column_family
, key
);
556 if (LIKELY(s
.ok())) {
561 case kTypeColumnFamilyRangeDeletion
:
562 case kTypeRangeDeletion
:
563 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
564 (ContentFlags::DEFERRED
| ContentFlags::HAS_DELETE_RANGE
));
565 s
= handler
->DeleteRangeCF(column_family
, key
, value
);
566 if (LIKELY(s
.ok())) {
571 case kTypeColumnFamilyMerge
:
573 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
574 (ContentFlags::DEFERRED
| ContentFlags::HAS_MERGE
));
575 s
= handler
->MergeCF(column_family
, key
, value
);
576 if (LIKELY(s
.ok())) {
581 case kTypeColumnFamilyBlobIndex
:
583 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
584 (ContentFlags::DEFERRED
| ContentFlags::HAS_BLOB_INDEX
));
585 s
= handler
->PutBlobIndexCF(column_family
, key
, value
);
586 if (LIKELY(s
.ok())) {
591 handler
->LogData(blob
);
592 // A batch might have nothing but LogData. It is still a batch.
595 case kTypeBeginPrepareXID
:
596 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
597 (ContentFlags::DEFERRED
| ContentFlags::HAS_BEGIN_PREPARE
));
598 s
= handler
->MarkBeginPrepare();
601 if (handler
->WriteAfterCommit() ==
602 WriteBatch::Handler::OptionState::kDisabled
) {
603 s
= Status::NotSupported(
604 "WriteCommitted txn tag when write_after_commit_ is disabled (in "
605 "WritePrepared/WriteUnprepared mode). If it is not due to "
606 "corruption, the WAL must be emptied before changing the "
609 if (handler
->WriteBeforePrepare() ==
610 WriteBatch::Handler::OptionState::kEnabled
) {
611 s
= Status::NotSupported(
612 "WriteCommitted txn tag when write_before_prepare_ is enabled "
613 "(in WriteUnprepared mode). If it is not due to corruption, the "
614 "WAL must be emptied before changing the WritePolicy.");
617 case kTypeBeginPersistedPrepareXID
:
618 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
619 (ContentFlags::DEFERRED
| ContentFlags::HAS_BEGIN_PREPARE
));
620 s
= handler
->MarkBeginPrepare();
623 if (handler
->WriteAfterCommit() ==
624 WriteBatch::Handler::OptionState::kEnabled
) {
625 s
= Status::NotSupported(
626 "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
627 "is enabled (in default WriteCommitted mode). If it is not due "
628 "to corruption, the WAL must be emptied before changing the "
632 case kTypeBeginUnprepareXID
:
633 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
634 (ContentFlags::DEFERRED
| ContentFlags::HAS_BEGIN_UNPREPARE
));
635 s
= handler
->MarkBeginPrepare(true /* unprepared */);
638 if (handler
->WriteAfterCommit() ==
639 WriteBatch::Handler::OptionState::kEnabled
) {
640 s
= Status::NotSupported(
641 "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
642 "default WriteCommitted mode). If it is not due to corruption, "
643 "the WAL must be emptied before changing the WritePolicy.");
645 if (handler
->WriteBeforePrepare() ==
646 WriteBatch::Handler::OptionState::kDisabled
) {
647 s
= Status::NotSupported(
648 "WriteUnprepared txn tag when write_before_prepare_ is disabled "
649 "(in WriteCommitted/WritePrepared mode). If it is not due to "
650 "corruption, the WAL must be emptied before changing the "
654 case kTypeEndPrepareXID
:
655 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
656 (ContentFlags::DEFERRED
| ContentFlags::HAS_END_PREPARE
));
657 s
= handler
->MarkEndPrepare(xid
);
662 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
663 (ContentFlags::DEFERRED
| ContentFlags::HAS_COMMIT
));
664 s
= handler
->MarkCommit(xid
);
668 case kTypeCommitXIDAndTimestamp
:
669 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
670 (ContentFlags::DEFERRED
| ContentFlags::HAS_COMMIT
));
671 // key stores the commit timestamp.
672 assert(!key
.empty());
673 s
= handler
->MarkCommitWithTimestamp(xid
, key
);
674 if (LIKELY(s
.ok())) {
678 case kTypeRollbackXID
:
679 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
680 (ContentFlags::DEFERRED
| ContentFlags::HAS_ROLLBACK
));
681 s
= handler
->MarkRollback(xid
);
686 s
= handler
->MarkNoop(empty_batch
);
690 case kTypeWideColumnEntity
:
691 case kTypeColumnFamilyWideColumnEntity
:
692 assert(wb
->content_flags_
.load(std::memory_order_relaxed
) &
693 (ContentFlags::DEFERRED
| ContentFlags::HAS_PUT_ENTITY
));
694 s
= handler
->PutEntityCF(column_family
, key
, value
);
695 if (LIKELY(s
.ok())) {
701 return Status::Corruption("unknown WriteBatch tag");
707 if (handler_continue
&& whole_batch
&&
708 found
!= WriteBatchInternal::Count(wb
)) {
709 return Status::Corruption("WriteBatch has wrong count");
715 bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch
* b
) {
716 return b
->is_latest_persistent_state_
;
719 void WriteBatchInternal::SetAsLatestPersistentState(WriteBatch
* b
) {
720 b
->is_latest_persistent_state_
= true;
723 uint32_t WriteBatchInternal::Count(const WriteBatch
* b
) {
724 return DecodeFixed32(b
->rep_
.data() + 8);
727 void WriteBatchInternal::SetCount(WriteBatch
* b
, uint32_t n
) {
728 EncodeFixed32(&b
->rep_
[8], n
);
731 SequenceNumber
WriteBatchInternal::Sequence(const WriteBatch
* b
) {
732 return SequenceNumber(DecodeFixed64(b
->rep_
.data()));
735 void WriteBatchInternal::SetSequence(WriteBatch
* b
, SequenceNumber seq
) {
736 EncodeFixed64(&b
->rep_
[0], seq
);
739 size_t WriteBatchInternal::GetFirstOffset(WriteBatch
* /*b*/) {
740 return WriteBatchInternal::kHeader
;
743 std::tuple
<Status
, uint32_t, size_t>
744 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(
745 WriteBatch
* b
, ColumnFamilyHandle
* column_family
) {
746 uint32_t cf_id
= GetColumnFamilyID(column_family
);
750 const Comparator
* const ucmp
= column_family
->GetComparator();
752 ts_sz
= ucmp
->timestamp_size();
753 if (0 == cf_id
&& b
->default_cf_ts_sz_
!= ts_sz
) {
754 s
= Status::InvalidArgument("Default cf timestamp size mismatch");
757 } else if (b
->default_cf_ts_sz_
> 0) {
758 ts_sz
= b
->default_cf_ts_sz_
;
760 return std::make_tuple(s
, cf_id
, ts_sz
);
764 Status
CheckColumnFamilyTimestampSize(ColumnFamilyHandle
* column_family
,
766 if (!column_family
) {
767 return Status::InvalidArgument("column family handle cannot be null");
769 const Comparator
* const ucmp
= column_family
->GetComparator();
771 size_t cf_ts_sz
= ucmp
->timestamp_size();
773 return Status::InvalidArgument("timestamp disabled");
775 if (cf_ts_sz
!= ts
.size()) {
776 return Status::InvalidArgument("timestamp size mismatch");
780 } // anonymous namespace
782 Status
WriteBatchInternal::Put(WriteBatch
* b
, uint32_t column_family_id
,
783 const Slice
& key
, const Slice
& value
) {
784 if (key
.size() > size_t{std::numeric_limits
<uint32_t>::max()}) {
785 return Status::InvalidArgument("key is too large");
787 if (value
.size() > size_t{std::numeric_limits
<uint32_t>::max()}) {
788 return Status::InvalidArgument("value is too large");
791 LocalSavePoint
save(b
);
792 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
793 if (column_family_id
== 0) {
794 b
->rep_
.push_back(static_cast<char>(kTypeValue
));
796 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilyValue
));
797 PutVarint32(&b
->rep_
, column_family_id
);
799 PutLengthPrefixedSlice(&b
->rep_
, key
);
800 PutLengthPrefixedSlice(&b
->rep_
, value
);
801 b
->content_flags_
.store(
802 b
->content_flags_
.load(std::memory_order_relaxed
) | ContentFlags::HAS_PUT
,
803 std::memory_order_relaxed
);
804 if (b
->prot_info_
!= nullptr) {
805 // Technically the optype could've been `kTypeColumnFamilyValue` with the
806 // CF ID encoded in the `WriteBatch`. That distinction is unimportant
807 // however since we verify CF ID is correct, as well as all other fields
808 // (a missing/extra encoded CF ID would corrupt another field). It is
809 // convenient to consolidate on `kTypeValue` here as that is what will be
810 // inserted into memtable.
811 b
->prot_info_
->entries_
.emplace_back(ProtectionInfo64()
812 .ProtectKVO(key
, value
, kTypeValue
)
813 .ProtectC(column_family_id
));
815 return save
.commit();
818 Status
WriteBatch::Put(ColumnFamilyHandle
* column_family
, const Slice
& key
,
819 const Slice
& value
) {
824 std::tie(s
, cf_id
, ts_sz
) =
825 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
833 return WriteBatchInternal::Put(this, cf_id
, key
, value
);
836 needs_in_place_update_ts_
= true;
837 has_key_with_ts_
= true;
838 std::string
dummy_ts(ts_sz
, '\0');
839 std::array
<Slice
, 2> key_with_ts
{{key
, dummy_ts
}};
840 return WriteBatchInternal::Put(this, cf_id
, SliceParts(key_with_ts
.data(), 2),
841 SliceParts(&value
, 1));
844 Status
WriteBatch::Put(ColumnFamilyHandle
* column_family
, const Slice
& key
,
845 const Slice
& ts
, const Slice
& value
) {
846 const Status s
= CheckColumnFamilyTimestampSize(column_family
, ts
);
850 has_key_with_ts_
= true;
851 assert(column_family
);
852 uint32_t cf_id
= column_family
->GetID();
853 std::array
<Slice
, 2> key_with_ts
{{key
, ts
}};
854 return WriteBatchInternal::Put(this, cf_id
, SliceParts(key_with_ts
.data(), 2),
855 SliceParts(&value
, 1));
858 Status
WriteBatchInternal::CheckSlicePartsLength(const SliceParts
& key
,
859 const SliceParts
& value
) {
860 size_t total_key_bytes
= 0;
861 for (int i
= 0; i
< key
.num_parts
; ++i
) {
862 total_key_bytes
+= key
.parts
[i
].size();
864 if (total_key_bytes
>= size_t{std::numeric_limits
<uint32_t>::max()}) {
865 return Status::InvalidArgument("key is too large");
868 size_t total_value_bytes
= 0;
869 for (int i
= 0; i
< value
.num_parts
; ++i
) {
870 total_value_bytes
+= value
.parts
[i
].size();
872 if (total_value_bytes
>= size_t{std::numeric_limits
<uint32_t>::max()}) {
873 return Status::InvalidArgument("value is too large");
878 Status
WriteBatchInternal::Put(WriteBatch
* b
, uint32_t column_family_id
,
879 const SliceParts
& key
, const SliceParts
& value
) {
880 Status s
= CheckSlicePartsLength(key
, value
);
885 LocalSavePoint
save(b
);
886 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
887 if (column_family_id
== 0) {
888 b
->rep_
.push_back(static_cast<char>(kTypeValue
));
890 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilyValue
));
891 PutVarint32(&b
->rep_
, column_family_id
);
893 PutLengthPrefixedSliceParts(&b
->rep_
, key
);
894 PutLengthPrefixedSliceParts(&b
->rep_
, value
);
895 b
->content_flags_
.store(
896 b
->content_flags_
.load(std::memory_order_relaxed
) | ContentFlags::HAS_PUT
,
897 std::memory_order_relaxed
);
898 if (b
->prot_info_
!= nullptr) {
899 // See comment in first `WriteBatchInternal::Put()` overload concerning the
900 // `ValueType` argument passed to `ProtectKVO()`.
901 b
->prot_info_
->entries_
.emplace_back(ProtectionInfo64()
902 .ProtectKVO(key
, value
, kTypeValue
)
903 .ProtectC(column_family_id
));
905 return save
.commit();
908 Status
WriteBatch::Put(ColumnFamilyHandle
* column_family
, const SliceParts
& key
,
909 const SliceParts
& value
) {
914 std::tie(s
, cf_id
, ts_sz
) =
915 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
923 return WriteBatchInternal::Put(this, cf_id
, key
, value
);
926 return Status::InvalidArgument(
927 "Cannot call this method on column family enabling timestamp");
930 Status
WriteBatchInternal::PutEntity(WriteBatch
* b
, uint32_t column_family_id
,
932 const WideColumns
& columns
) {
935 if (key
.size() > size_t{std::numeric_limits
<uint32_t>::max()}) {
936 return Status::InvalidArgument("key is too large");
939 WideColumns
sorted_columns(columns
);
940 std::sort(sorted_columns
.begin(), sorted_columns
.end(),
941 [](const WideColumn
& lhs
, const WideColumn
& rhs
) {
942 return lhs
.name().compare(rhs
.name()) < 0;
946 const Status s
= WideColumnSerialization::Serialize(sorted_columns
, entity
);
951 if (entity
.size() > size_t{std::numeric_limits
<uint32_t>::max()}) {
952 return Status::InvalidArgument("wide column entity is too large");
955 LocalSavePoint
save(b
);
957 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
959 if (column_family_id
== 0) {
960 b
->rep_
.push_back(static_cast<char>(kTypeWideColumnEntity
));
962 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilyWideColumnEntity
));
963 PutVarint32(&b
->rep_
, column_family_id
);
966 PutLengthPrefixedSlice(&b
->rep_
, key
);
967 PutLengthPrefixedSlice(&b
->rep_
, entity
);
969 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
970 ContentFlags::HAS_PUT_ENTITY
,
971 std::memory_order_relaxed
);
973 if (b
->prot_info_
!= nullptr) {
974 b
->prot_info_
->entries_
.emplace_back(
976 .ProtectKVO(key
, entity
, kTypeWideColumnEntity
)
977 .ProtectC(column_family_id
));
980 return save
.commit();
983 Status
WriteBatch::PutEntity(ColumnFamilyHandle
* column_family
,
984 const Slice
& key
, const WideColumns
& columns
) {
985 if (!column_family
) {
986 return Status::InvalidArgument(
987 "Cannot call this method without a column family handle");
994 std::tie(s
, cf_id
, ts_sz
) =
995 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1003 return Status::InvalidArgument(
1004 "Cannot call this method on column family enabling timestamp");
1007 return WriteBatchInternal::PutEntity(this, cf_id
, key
, columns
);
1010 Status
WriteBatchInternal::InsertNoop(WriteBatch
* b
) {
1011 b
->rep_
.push_back(static_cast<char>(kTypeNoop
));
1012 return Status::OK();
1015 Status
WriteBatchInternal::MarkEndPrepare(WriteBatch
* b
, const Slice
& xid
,
1016 bool write_after_commit
,
1017 bool unprepared_batch
) {
1018 // a manually constructed batch can only contain one prepare section
1019 assert(b
->rep_
[12] == static_cast<char>(kTypeNoop
));
1021 // all savepoints up to this point are cleared
1022 if (b
->save_points_
!= nullptr) {
1023 while (!b
->save_points_
->stack
.empty()) {
1024 b
->save_points_
->stack
.pop();
1028 // rewrite noop as begin marker
1029 b
->rep_
[12] = static_cast<char>(
1030 write_after_commit
? kTypeBeginPrepareXID
1031 : (unprepared_batch
? kTypeBeginUnprepareXID
1032 : kTypeBeginPersistedPrepareXID
));
1033 b
->rep_
.push_back(static_cast<char>(kTypeEndPrepareXID
));
1034 PutLengthPrefixedSlice(&b
->rep_
, xid
);
1035 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1036 ContentFlags::HAS_END_PREPARE
|
1037 ContentFlags::HAS_BEGIN_PREPARE
,
1038 std::memory_order_relaxed
);
1039 if (unprepared_batch
) {
1040 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1041 ContentFlags::HAS_BEGIN_UNPREPARE
,
1042 std::memory_order_relaxed
);
1044 return Status::OK();
1047 Status
WriteBatchInternal::MarkCommit(WriteBatch
* b
, const Slice
& xid
) {
1048 b
->rep_
.push_back(static_cast<char>(kTypeCommitXID
));
1049 PutLengthPrefixedSlice(&b
->rep_
, xid
);
1050 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1051 ContentFlags::HAS_COMMIT
,
1052 std::memory_order_relaxed
);
1053 return Status::OK();
1056 Status
WriteBatchInternal::MarkCommitWithTimestamp(WriteBatch
* b
,
1058 const Slice
& commit_ts
) {
1059 assert(!commit_ts
.empty());
1060 b
->rep_
.push_back(static_cast<char>(kTypeCommitXIDAndTimestamp
));
1061 PutLengthPrefixedSlice(&b
->rep_
, commit_ts
);
1062 PutLengthPrefixedSlice(&b
->rep_
, xid
);
1063 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1064 ContentFlags::HAS_COMMIT
,
1065 std::memory_order_relaxed
);
1066 return Status::OK();
1069 Status
WriteBatchInternal::MarkRollback(WriteBatch
* b
, const Slice
& xid
) {
1070 b
->rep_
.push_back(static_cast<char>(kTypeRollbackXID
));
1071 PutLengthPrefixedSlice(&b
->rep_
, xid
);
1072 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1073 ContentFlags::HAS_ROLLBACK
,
1074 std::memory_order_relaxed
);
1075 return Status::OK();
1078 Status
WriteBatchInternal::Delete(WriteBatch
* b
, uint32_t column_family_id
,
1080 LocalSavePoint
save(b
);
1081 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
1082 if (column_family_id
== 0) {
1083 b
->rep_
.push_back(static_cast<char>(kTypeDeletion
));
1085 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilyDeletion
));
1086 PutVarint32(&b
->rep_
, column_family_id
);
1088 PutLengthPrefixedSlice(&b
->rep_
, key
);
1089 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1090 ContentFlags::HAS_DELETE
,
1091 std::memory_order_relaxed
);
1092 if (b
->prot_info_
!= nullptr) {
1093 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1094 // `ValueType` argument passed to `ProtectKVO()`.
1095 b
->prot_info_
->entries_
.emplace_back(
1097 .ProtectKVO(key
, "" /* value */, kTypeDeletion
)
1098 .ProtectC(column_family_id
));
1100 return save
.commit();
1103 Status
WriteBatch::Delete(ColumnFamilyHandle
* column_family
, const Slice
& key
) {
1108 std::tie(s
, cf_id
, ts_sz
) =
1109 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1117 return WriteBatchInternal::Delete(this, cf_id
, key
);
1120 needs_in_place_update_ts_
= true;
1121 has_key_with_ts_
= true;
1122 std::string
dummy_ts(ts_sz
, '\0');
1123 std::array
<Slice
, 2> key_with_ts
{{key
, dummy_ts
}};
1124 return WriteBatchInternal::Delete(this, cf_id
,
1125 SliceParts(key_with_ts
.data(), 2));
1128 Status
WriteBatch::Delete(ColumnFamilyHandle
* column_family
, const Slice
& key
,
1130 const Status s
= CheckColumnFamilyTimestampSize(column_family
, ts
);
1134 assert(column_family
);
1135 has_key_with_ts_
= true;
1136 uint32_t cf_id
= column_family
->GetID();
1137 std::array
<Slice
, 2> key_with_ts
{{key
, ts
}};
1138 return WriteBatchInternal::Delete(this, cf_id
,
1139 SliceParts(key_with_ts
.data(), 2));
1142 Status
WriteBatchInternal::Delete(WriteBatch
* b
, uint32_t column_family_id
,
1143 const SliceParts
& key
) {
1144 LocalSavePoint
save(b
);
1145 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
1146 if (column_family_id
== 0) {
1147 b
->rep_
.push_back(static_cast<char>(kTypeDeletion
));
1149 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilyDeletion
));
1150 PutVarint32(&b
->rep_
, column_family_id
);
1152 PutLengthPrefixedSliceParts(&b
->rep_
, key
);
1153 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1154 ContentFlags::HAS_DELETE
,
1155 std::memory_order_relaxed
);
1156 if (b
->prot_info_
!= nullptr) {
1157 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1158 // `ValueType` argument passed to `ProtectKVO()`.
1159 b
->prot_info_
->entries_
.emplace_back(
1162 SliceParts(nullptr /* _parts */, 0 /* _num_parts */),
1164 .ProtectC(column_family_id
));
1166 return save
.commit();
1169 Status
WriteBatch::Delete(ColumnFamilyHandle
* column_family
,
1170 const SliceParts
& key
) {
1175 std::tie(s
, cf_id
, ts_sz
) =
1176 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1184 return WriteBatchInternal::Delete(this, cf_id
, key
);
1187 return Status::InvalidArgument(
1188 "Cannot call this method on column family enabling timestamp");
1191 Status
WriteBatchInternal::SingleDelete(WriteBatch
* b
,
1192 uint32_t column_family_id
,
1194 LocalSavePoint
save(b
);
1195 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
1196 if (column_family_id
== 0) {
1197 b
->rep_
.push_back(static_cast<char>(kTypeSingleDeletion
));
1199 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion
));
1200 PutVarint32(&b
->rep_
, column_family_id
);
1202 PutLengthPrefixedSlice(&b
->rep_
, key
);
1203 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1204 ContentFlags::HAS_SINGLE_DELETE
,
1205 std::memory_order_relaxed
);
1206 if (b
->prot_info_
!= nullptr) {
1207 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1208 // `ValueType` argument passed to `ProtectKVO()`.
1209 b
->prot_info_
->entries_
.emplace_back(
1211 .ProtectKVO(key
, "" /* value */, kTypeSingleDeletion
)
1212 .ProtectC(column_family_id
));
1214 return save
.commit();
1217 Status
WriteBatch::SingleDelete(ColumnFamilyHandle
* column_family
,
1223 std::tie(s
, cf_id
, ts_sz
) =
1224 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1232 return WriteBatchInternal::SingleDelete(this, cf_id
, key
);
1235 needs_in_place_update_ts_
= true;
1236 has_key_with_ts_
= true;
1237 std::string
dummy_ts(ts_sz
, '\0');
1238 std::array
<Slice
, 2> key_with_ts
{{key
, dummy_ts
}};
1239 return WriteBatchInternal::SingleDelete(this, cf_id
,
1240 SliceParts(key_with_ts
.data(), 2));
1243 Status
WriteBatch::SingleDelete(ColumnFamilyHandle
* column_family
,
1244 const Slice
& key
, const Slice
& ts
) {
1245 const Status s
= CheckColumnFamilyTimestampSize(column_family
, ts
);
1249 has_key_with_ts_
= true;
1250 assert(column_family
);
1251 uint32_t cf_id
= column_family
->GetID();
1252 std::array
<Slice
, 2> key_with_ts
{{key
, ts
}};
1253 return WriteBatchInternal::SingleDelete(this, cf_id
,
1254 SliceParts(key_with_ts
.data(), 2));
1257 Status
WriteBatchInternal::SingleDelete(WriteBatch
* b
,
1258 uint32_t column_family_id
,
1259 const SliceParts
& key
) {
1260 LocalSavePoint
save(b
);
1261 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
1262 if (column_family_id
== 0) {
1263 b
->rep_
.push_back(static_cast<char>(kTypeSingleDeletion
));
1265 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion
));
1266 PutVarint32(&b
->rep_
, column_family_id
);
1268 PutLengthPrefixedSliceParts(&b
->rep_
, key
);
1269 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1270 ContentFlags::HAS_SINGLE_DELETE
,
1271 std::memory_order_relaxed
);
1272 if (b
->prot_info_
!= nullptr) {
1273 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1274 // `ValueType` argument passed to `ProtectKVO()`.
1275 b
->prot_info_
->entries_
.emplace_back(
1278 SliceParts(nullptr /* _parts */,
1279 0 /* _num_parts */) /* value */,
1280 kTypeSingleDeletion
)
1281 .ProtectC(column_family_id
));
1283 return save
.commit();
1286 Status
WriteBatch::SingleDelete(ColumnFamilyHandle
* column_family
,
1287 const SliceParts
& key
) {
1292 std::tie(s
, cf_id
, ts_sz
) =
1293 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1301 return WriteBatchInternal::SingleDelete(this, cf_id
, key
);
1304 return Status::InvalidArgument(
1305 "Cannot call this method on column family enabling timestamp");
1308 Status
WriteBatchInternal::DeleteRange(WriteBatch
* b
, uint32_t column_family_id
,
1309 const Slice
& begin_key
,
1310 const Slice
& end_key
) {
1311 LocalSavePoint
save(b
);
1312 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
1313 if (column_family_id
== 0) {
1314 b
->rep_
.push_back(static_cast<char>(kTypeRangeDeletion
));
1316 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion
));
1317 PutVarint32(&b
->rep_
, column_family_id
);
1319 PutLengthPrefixedSlice(&b
->rep_
, begin_key
);
1320 PutLengthPrefixedSlice(&b
->rep_
, end_key
);
1321 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1322 ContentFlags::HAS_DELETE_RANGE
,
1323 std::memory_order_relaxed
);
1324 if (b
->prot_info_
!= nullptr) {
1325 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1326 // `ValueType` argument passed to `ProtectKVO()`.
1327 // In `DeleteRange()`, the end key is treated as the value.
1328 b
->prot_info_
->entries_
.emplace_back(
1330 .ProtectKVO(begin_key
, end_key
, kTypeRangeDeletion
)
1331 .ProtectC(column_family_id
));
1333 return save
.commit();
1336 Status
WriteBatch::DeleteRange(ColumnFamilyHandle
* column_family
,
1337 const Slice
& begin_key
, const Slice
& end_key
) {
1342 std::tie(s
, cf_id
, ts_sz
) =
1343 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1351 return WriteBatchInternal::DeleteRange(this, cf_id
, begin_key
, end_key
);
1354 needs_in_place_update_ts_
= true;
1355 has_key_with_ts_
= true;
1356 std::string
dummy_ts(ts_sz
, '\0');
1357 std::array
<Slice
, 2> begin_key_with_ts
{{begin_key
, dummy_ts
}};
1358 std::array
<Slice
, 2> end_key_with_ts
{{end_key
, dummy_ts
}};
1359 return WriteBatchInternal::DeleteRange(
1360 this, cf_id
, SliceParts(begin_key_with_ts
.data(), 2),
1361 SliceParts(end_key_with_ts
.data(), 2));
1364 Status
WriteBatch::DeleteRange(ColumnFamilyHandle
* column_family
,
1365 const Slice
& begin_key
, const Slice
& end_key
,
1367 const Status s
= CheckColumnFamilyTimestampSize(column_family
, ts
);
1371 assert(column_family
);
1372 has_key_with_ts_
= true;
1373 uint32_t cf_id
= column_family
->GetID();
1374 std::array
<Slice
, 2> key_with_ts
{{begin_key
, ts
}};
1375 std::array
<Slice
, 2> end_key_with_ts
{{end_key
, ts
}};
1376 return WriteBatchInternal::DeleteRange(this, cf_id
,
1377 SliceParts(key_with_ts
.data(), 2),
1378 SliceParts(end_key_with_ts
.data(), 2));
1381 Status
WriteBatchInternal::DeleteRange(WriteBatch
* b
, uint32_t column_family_id
,
1382 const SliceParts
& begin_key
,
1383 const SliceParts
& end_key
) {
1384 LocalSavePoint
save(b
);
1385 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
1386 if (column_family_id
== 0) {
1387 b
->rep_
.push_back(static_cast<char>(kTypeRangeDeletion
));
1389 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion
));
1390 PutVarint32(&b
->rep_
, column_family_id
);
1392 PutLengthPrefixedSliceParts(&b
->rep_
, begin_key
);
1393 PutLengthPrefixedSliceParts(&b
->rep_
, end_key
);
1394 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1395 ContentFlags::HAS_DELETE_RANGE
,
1396 std::memory_order_relaxed
);
1397 if (b
->prot_info_
!= nullptr) {
1398 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1399 // `ValueType` argument passed to `ProtectKVO()`.
1400 // In `DeleteRange()`, the end key is treated as the value.
1401 b
->prot_info_
->entries_
.emplace_back(
1403 .ProtectKVO(begin_key
, end_key
, kTypeRangeDeletion
)
1404 .ProtectC(column_family_id
));
1406 return save
.commit();
1409 Status
WriteBatch::DeleteRange(ColumnFamilyHandle
* column_family
,
1410 const SliceParts
& begin_key
,
1411 const SliceParts
& end_key
) {
1416 std::tie(s
, cf_id
, ts_sz
) =
1417 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1425 return WriteBatchInternal::DeleteRange(this, cf_id
, begin_key
, end_key
);
1428 return Status::InvalidArgument(
1429 "Cannot call this method on column family enabling timestamp");
1432 Status
WriteBatchInternal::Merge(WriteBatch
* b
, uint32_t column_family_id
,
1433 const Slice
& key
, const Slice
& value
) {
1434 if (key
.size() > size_t{std::numeric_limits
<uint32_t>::max()}) {
1435 return Status::InvalidArgument("key is too large");
1437 if (value
.size() > size_t{std::numeric_limits
<uint32_t>::max()}) {
1438 return Status::InvalidArgument("value is too large");
1441 LocalSavePoint
save(b
);
1442 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
1443 if (column_family_id
== 0) {
1444 b
->rep_
.push_back(static_cast<char>(kTypeMerge
));
1446 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilyMerge
));
1447 PutVarint32(&b
->rep_
, column_family_id
);
1449 PutLengthPrefixedSlice(&b
->rep_
, key
);
1450 PutLengthPrefixedSlice(&b
->rep_
, value
);
1451 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1452 ContentFlags::HAS_MERGE
,
1453 std::memory_order_relaxed
);
1454 if (b
->prot_info_
!= nullptr) {
1455 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1456 // `ValueType` argument passed to `ProtectKVO()`.
1457 b
->prot_info_
->entries_
.emplace_back(ProtectionInfo64()
1458 .ProtectKVO(key
, value
, kTypeMerge
)
1459 .ProtectC(column_family_id
));
1461 return save
.commit();
1464 Status
WriteBatch::Merge(ColumnFamilyHandle
* column_family
, const Slice
& key
,
1465 const Slice
& value
) {
1470 std::tie(s
, cf_id
, ts_sz
) =
1471 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1479 return WriteBatchInternal::Merge(this, cf_id
, key
, value
);
1482 needs_in_place_update_ts_
= true;
1483 has_key_with_ts_
= true;
1484 std::string
dummy_ts(ts_sz
, '\0');
1485 std::array
<Slice
, 2> key_with_ts
{{key
, dummy_ts
}};
1487 return WriteBatchInternal::Merge(
1488 this, cf_id
, SliceParts(key_with_ts
.data(), 2), SliceParts(&value
, 1));
1491 Status
WriteBatch::Merge(ColumnFamilyHandle
* column_family
, const Slice
& key
,
1492 const Slice
& ts
, const Slice
& value
) {
1493 const Status s
= CheckColumnFamilyTimestampSize(column_family
, ts
);
1497 has_key_with_ts_
= true;
1498 assert(column_family
);
1499 uint32_t cf_id
= column_family
->GetID();
1500 std::array
<Slice
, 2> key_with_ts
{{key
, ts
}};
1501 return WriteBatchInternal::Merge(
1502 this, cf_id
, SliceParts(key_with_ts
.data(), 2), SliceParts(&value
, 1));
1505 Status
WriteBatchInternal::Merge(WriteBatch
* b
, uint32_t column_family_id
,
1506 const SliceParts
& key
,
1507 const SliceParts
& value
) {
1508 Status s
= CheckSlicePartsLength(key
, value
);
1513 LocalSavePoint
save(b
);
1514 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
1515 if (column_family_id
== 0) {
1516 b
->rep_
.push_back(static_cast<char>(kTypeMerge
));
1518 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilyMerge
));
1519 PutVarint32(&b
->rep_
, column_family_id
);
1521 PutLengthPrefixedSliceParts(&b
->rep_
, key
);
1522 PutLengthPrefixedSliceParts(&b
->rep_
, value
);
1523 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1524 ContentFlags::HAS_MERGE
,
1525 std::memory_order_relaxed
);
1526 if (b
->prot_info_
!= nullptr) {
1527 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1528 // `ValueType` argument passed to `ProtectKVO()`.
1529 b
->prot_info_
->entries_
.emplace_back(ProtectionInfo64()
1530 .ProtectKVO(key
, value
, kTypeMerge
)
1531 .ProtectC(column_family_id
));
1533 return save
.commit();
1536 Status
WriteBatch::Merge(ColumnFamilyHandle
* column_family
,
1537 const SliceParts
& key
, const SliceParts
& value
) {
1542 std::tie(s
, cf_id
, ts_sz
) =
1543 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1551 return WriteBatchInternal::Merge(this, cf_id
, key
, value
);
1554 return Status::InvalidArgument(
1555 "Cannot call this method on column family enabling timestamp");
1558 Status
WriteBatchInternal::PutBlobIndex(WriteBatch
* b
,
1559 uint32_t column_family_id
,
1560 const Slice
& key
, const Slice
& value
) {
1561 LocalSavePoint
save(b
);
1562 WriteBatchInternal::SetCount(b
, WriteBatchInternal::Count(b
) + 1);
1563 if (column_family_id
== 0) {
1564 b
->rep_
.push_back(static_cast<char>(kTypeBlobIndex
));
1566 b
->rep_
.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex
));
1567 PutVarint32(&b
->rep_
, column_family_id
);
1569 PutLengthPrefixedSlice(&b
->rep_
, key
);
1570 PutLengthPrefixedSlice(&b
->rep_
, value
);
1571 b
->content_flags_
.store(b
->content_flags_
.load(std::memory_order_relaxed
) |
1572 ContentFlags::HAS_BLOB_INDEX
,
1573 std::memory_order_relaxed
);
1574 if (b
->prot_info_
!= nullptr) {
1575 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1576 // `ValueType` argument passed to `ProtectKVO()`.
1577 b
->prot_info_
->entries_
.emplace_back(
1579 .ProtectKVO(key
, value
, kTypeBlobIndex
)
1580 .ProtectC(column_family_id
));
1582 return save
.commit();
1585 Status
WriteBatch::PutLogData(const Slice
& blob
) {
1586 LocalSavePoint
save(this);
1587 rep_
.push_back(static_cast<char>(kTypeLogData
));
1588 PutLengthPrefixedSlice(&rep_
, blob
);
1589 return save
.commit();
1592 void WriteBatch::SetSavePoint() {
1593 if (save_points_
== nullptr) {
1594 save_points_
.reset(new SavePoints());
1596 // Record length and count of current batch of writes.
1597 save_points_
->stack
.push(SavePoint(
1598 GetDataSize(), Count(), content_flags_
.load(std::memory_order_relaxed
)));
1601 Status
WriteBatch::RollbackToSavePoint() {
1602 if (save_points_
== nullptr || save_points_
->stack
.size() == 0) {
1603 return Status::NotFound();
1606 // Pop the most recent savepoint off the stack
1607 SavePoint savepoint
= save_points_
->stack
.top();
1608 save_points_
->stack
.pop();
1610 assert(savepoint
.size
<= rep_
.size());
1611 assert(static_cast<uint32_t>(savepoint
.count
) <= Count());
1613 if (savepoint
.size
== rep_
.size()) {
1614 // No changes to rollback
1615 } else if (savepoint
.size
== 0) {
1616 // Rollback everything
1619 rep_
.resize(savepoint
.size
);
1620 if (prot_info_
!= nullptr) {
1621 prot_info_
->entries_
.resize(savepoint
.count
);
1623 WriteBatchInternal::SetCount(this, savepoint
.count
);
1624 content_flags_
.store(savepoint
.content_flags
, std::memory_order_relaxed
);
1627 return Status::OK();
1630 Status
WriteBatch::PopSavePoint() {
1631 if (save_points_
== nullptr || save_points_
->stack
.size() == 0) {
1632 return Status::NotFound();
1635 // Pop the most recent savepoint off the stack
1636 save_points_
->stack
.pop();
1638 return Status::OK();
1641 Status
WriteBatch::UpdateTimestamps(
1642 const Slice
& ts
, std::function
<size_t(uint32_t)> ts_sz_func
) {
1643 TimestampUpdater
<decltype(ts_sz_func
)> ts_updater(prot_info_
.get(),
1644 std::move(ts_sz_func
), ts
);
1645 const Status s
= Iterate(&ts_updater
);
1647 needs_in_place_update_ts_
= false;
1652 Status
WriteBatch::VerifyChecksum() const {
1653 if (prot_info_
== nullptr) {
1654 return Status::OK();
1656 Slice
input(rep_
.data() + WriteBatchInternal::kHeader
,
1657 rep_
.size() - WriteBatchInternal::kHeader
);
1658 Slice key
, value
, blob
, xid
;
1660 uint32_t column_family
= 0; // default
1662 size_t prot_info_idx
= 0;
1663 bool checksum_protected
= true;
1664 while (!input
.empty() && prot_info_idx
< prot_info_
->entries_
.size()) {
1665 // In case key/value/column_family are not updated by
1666 // ReadRecordFromWriteBatch
1670 s
= ReadRecordFromWriteBatch(&input
, &tag
, &column_family
, &key
, &value
,
1675 checksum_protected
= true;
1676 // Write batch checksum uses op_type without ColumnFamily (e.g., if op_type
1677 // in the write batch is kTypeColumnFamilyValue, kTypeValue is used to
1678 // compute the checksum), and encodes column family id separately. See
1679 // comment in first `WriteBatchInternal::Put()` for more detail.
1681 case kTypeColumnFamilyValue
:
1685 case kTypeColumnFamilyDeletion
:
1687 tag
= kTypeDeletion
;
1689 case kTypeColumnFamilySingleDeletion
:
1690 case kTypeSingleDeletion
:
1691 tag
= kTypeSingleDeletion
;
1693 case kTypeColumnFamilyRangeDeletion
:
1694 case kTypeRangeDeletion
:
1695 tag
= kTypeRangeDeletion
;
1697 case kTypeColumnFamilyMerge
:
1701 case kTypeColumnFamilyBlobIndex
:
1702 case kTypeBlobIndex
:
1703 tag
= kTypeBlobIndex
;
1706 case kTypeBeginPrepareXID
:
1707 case kTypeEndPrepareXID
:
1708 case kTypeCommitXID
:
1709 case kTypeRollbackXID
:
1711 case kTypeBeginPersistedPrepareXID
:
1712 case kTypeBeginUnprepareXID
:
1713 case kTypeDeletionWithTimestamp
:
1714 case kTypeCommitXIDAndTimestamp
:
1715 checksum_protected
= false;
1717 case kTypeColumnFamilyWideColumnEntity
:
1718 case kTypeWideColumnEntity
:
1719 tag
= kTypeWideColumnEntity
;
1722 return Status::Corruption(
1723 "unknown WriteBatch tag",
1724 std::to_string(static_cast<unsigned int>(tag
)));
1726 if (checksum_protected
) {
1727 s
= prot_info_
->entries_
[prot_info_idx
++]
1728 .StripC(column_family
)
1729 .StripKVO(key
, value
, static_cast<ValueType
>(tag
))
1737 if (prot_info_idx
!= WriteBatchInternal::Count(this)) {
1738 return Status::Corruption("WriteBatch has wrong count");
1740 assert(WriteBatchInternal::Count(this) == prot_info_
->entries_
.size());
1741 return Status::OK();
1746 class MemTableInserter
: public WriteBatch::Handler
{
1747 SequenceNumber sequence_
;
1748 ColumnFamilyMemTables
* const cf_mems_
;
1749 FlushScheduler
* const flush_scheduler_
;
1750 TrimHistoryScheduler
* const trim_history_scheduler_
;
1751 const bool ignore_missing_column_families_
;
1752 const uint64_t recovering_log_number_
;
1753 // log number that all Memtables inserted into should reference
1754 uint64_t log_number_ref_
;
1756 const bool concurrent_memtable_writes_
;
1757 bool post_info_created_
;
1758 const WriteBatch::ProtectionInfo
* prot_info_
;
1759 size_t prot_info_idx_
;
1761 bool* has_valid_writes_
;
1762 // On some (!) platforms just default creating
1763 // a map is too expensive in the Write() path as they
1764 // cause memory allocations though unused.
1765 // Make creation optional but do not incur
1766 // std::unique_ptr additional allocation
1767 using MemPostInfoMap
= std::map
<MemTable
*, MemTablePostProcessInfo
>;
1768 using PostMapType
= std::aligned_storage
<sizeof(MemPostInfoMap
)>::type
;
1769 PostMapType mem_post_info_map_
;
1770 // current recovered transaction we are rebuilding (recovery)
1771 WriteBatch
* rebuilding_trx_
;
1772 SequenceNumber rebuilding_trx_seq_
;
1773 // Increase seq number once per each write batch. Otherwise increase it once
1775 bool seq_per_batch_
;
1776 // Whether the memtable write will be done only after the commit
1777 bool write_after_commit_
;
1778 // Whether memtable write can be done before prepare
1779 bool write_before_prepare_
;
1780 // Whether this batch was unprepared or not
1781 bool unprepared_batch_
;
1782 using DupDetector
= std::aligned_storage
<sizeof(DuplicateDetector
)>::type
;
1783 DupDetector duplicate_detector_
;
1784 bool dup_dectector_on_
;
1786 bool hint_per_batch_
;
1788 // Hints for this batch
1789 using HintMap
= std::unordered_map
<MemTable
*, void*>;
1790 using HintMapType
= std::aligned_storage
<sizeof(HintMap
)>::type
;
1793 HintMap
& GetHintMap() {
1794 assert(hint_per_batch_
);
1795 if (!hint_created_
) {
1796 new (&hint_
) HintMap();
1797 hint_created_
= true;
1799 return *reinterpret_cast<HintMap
*>(&hint_
);
1802 MemPostInfoMap
& GetPostMap() {
1803 assert(concurrent_memtable_writes_
);
1804 if (!post_info_created_
) {
1805 new (&mem_post_info_map_
) MemPostInfoMap();
1806 post_info_created_
= true;
1808 return *reinterpret_cast<MemPostInfoMap
*>(&mem_post_info_map_
);
1811 bool IsDuplicateKeySeq(uint32_t column_family_id
, const Slice
& key
) {
1812 assert(!write_after_commit_
);
1813 assert(rebuilding_trx_
!= nullptr);
1814 if (!dup_dectector_on_
) {
1815 new (&duplicate_detector_
) DuplicateDetector(db_
);
1816 dup_dectector_on_
= true;
1818 return reinterpret_cast<DuplicateDetector
*>(&duplicate_detector_
)
1819 ->IsDuplicateKeySeq(column_family_id
, key
, sequence_
);
1822 const ProtectionInfoKVOC64
* NextProtectionInfo() {
1823 const ProtectionInfoKVOC64
* res
= nullptr;
1824 if (prot_info_
!= nullptr) {
1825 assert(prot_info_idx_
< prot_info_
->entries_
.size());
1826 res
= &prot_info_
->entries_
[prot_info_idx_
];
1832 void DecrementProtectionInfoIdxForTryAgain() {
1833 if (prot_info_
!= nullptr) --prot_info_idx_
;
1836 void ResetProtectionInfo() {
1838 prot_info_
= nullptr;
1842 Handler::OptionState
WriteBeforePrepare() const override
{
1843 return write_before_prepare_
? Handler::OptionState::kEnabled
1844 : Handler::OptionState::kDisabled
;
1846 Handler::OptionState
WriteAfterCommit() const override
{
1847 return write_after_commit_
? Handler::OptionState::kEnabled
1848 : Handler::OptionState::kDisabled
;
1852 // cf_mems should not be shared with concurrent inserters
1853 MemTableInserter(SequenceNumber _sequence
, ColumnFamilyMemTables
* cf_mems
,
1854 FlushScheduler
* flush_scheduler
,
1855 TrimHistoryScheduler
* trim_history_scheduler
,
1856 bool ignore_missing_column_families
,
1857 uint64_t recovering_log_number
, DB
* db
,
1858 bool concurrent_memtable_writes
,
1859 const WriteBatch::ProtectionInfo
* prot_info
,
1860 bool* has_valid_writes
= nullptr, bool seq_per_batch
= false,
1861 bool batch_per_txn
= true, bool hint_per_batch
= false)
1862 : sequence_(_sequence
),
1864 flush_scheduler_(flush_scheduler
),
1865 trim_history_scheduler_(trim_history_scheduler
),
1866 ignore_missing_column_families_(ignore_missing_column_families
),
1867 recovering_log_number_(recovering_log_number
),
1869 db_(static_cast_with_check
<DBImpl
>(db
)),
1870 concurrent_memtable_writes_(concurrent_memtable_writes
),
1871 post_info_created_(false),
1872 prot_info_(prot_info
),
1874 has_valid_writes_(has_valid_writes
),
1875 rebuilding_trx_(nullptr),
1876 rebuilding_trx_seq_(0),
1877 seq_per_batch_(seq_per_batch
),
1878 // Write after commit currently uses one seq per key (instead of per
1879 // batch). So seq_per_batch being false indicates write_after_commit
1881 write_after_commit_(!seq_per_batch
),
1882 // WriteUnprepared can write WriteBatches per transaction, so
1883 // batch_per_txn being false indicates write_before_prepare.
1884 write_before_prepare_(!batch_per_txn
),
1885 unprepared_batch_(false),
1886 duplicate_detector_(),
1887 dup_dectector_on_(false),
1888 hint_per_batch_(hint_per_batch
),
1889 hint_created_(false) {
1893 ~MemTableInserter() override
{
1894 if (dup_dectector_on_
) {
1895 reinterpret_cast<DuplicateDetector
*>(&duplicate_detector_
)
1896 ->~DuplicateDetector();
1898 if (post_info_created_
) {
1899 reinterpret_cast<MemPostInfoMap
*>(&mem_post_info_map_
)->~MemPostInfoMap();
1901 if (hint_created_
) {
1902 for (auto iter
: GetHintMap()) {
1903 delete[] reinterpret_cast<char*>(iter
.second
);
1905 reinterpret_cast<HintMap
*>(&hint_
)->~HintMap();
1907 delete rebuilding_trx_
;
1910 MemTableInserter(const MemTableInserter
&) = delete;
1911 MemTableInserter
& operator=(const MemTableInserter
&) = delete;
1913 // The batch seq is regularly restarted; In normal mode it is set when
1914 // MemTableInserter is constructed in the write thread and in recovery mode it
1915 // is set when a batch, which is tagged with seq, is read from the WAL.
1916 // Within a sequenced batch, which could be a merge of multiple batches, we
1917 // have two policies to advance the seq: i) seq_per_key (default) and ii)
1918 // seq_per_batch. To implement the latter we need to mark the boundary between
1919 // the individual batches. The approach is this: 1) Use the terminating
1920 // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
1921 // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
1922 // natural boundary marker.
1923 void MaybeAdvanceSeq(bool batch_boundry
= false) {
1924 if (batch_boundry
== seq_per_batch_
) {
1929 void set_log_number_ref(uint64_t log
) { log_number_ref_
= log
; }
1930 void set_prot_info(const WriteBatch::ProtectionInfo
* prot_info
) {
1931 prot_info_
= prot_info
;
1935 SequenceNumber
sequence() const { return sequence_
; }
1937 void PostProcess() {
1938 assert(concurrent_memtable_writes_
);
1939 // If post info was not created there is nothing
1940 // to process and no need to create on demand
1941 if (post_info_created_
) {
1942 for (auto& pair
: GetPostMap()) {
1943 pair
.first
->BatchPostProcess(pair
.second
);
1948 bool SeekToColumnFamily(uint32_t column_family_id
, Status
* s
) {
1949 // If we are in a concurrent mode, it is the caller's responsibility
1950 // to clone the original ColumnFamilyMemTables so that each thread
1951 // has its own instance. Otherwise, it must be guaranteed that there
1952 // is no concurrent access
1953 bool found
= cf_mems_
->Seek(column_family_id
);
1955 if (ignore_missing_column_families_
) {
1958 *s
= Status::InvalidArgument(
1959 "Invalid column family specified in write batch");
1963 if (recovering_log_number_
!= 0 &&
1964 recovering_log_number_
< cf_mems_
->GetLogNumber()) {
1965 // This is true only in recovery environment (recovering_log_number_ is
1967 // non-recovery, regular write code-path)
1968 // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
1969 // column family already contains updates from this log. We can't apply
1970 // updates twice because of update-in-place or merge workloads -- ignore
1976 if (has_valid_writes_
!= nullptr) {
1977 *has_valid_writes_
= true;
1980 if (log_number_ref_
> 0) {
1981 cf_mems_
->GetMemTable()->RefLogContainingPrepSection(log_number_ref_
);
1987 Status
PutCFImpl(uint32_t column_family_id
, const Slice
& key
,
1988 const Slice
& value
, ValueType value_type
,
1989 const ProtectionInfoKVOS64
* kv_prot_info
) {
1990 // optimize for non-recovery mode
1991 if (UNLIKELY(write_after_commit_
&& rebuilding_trx_
!= nullptr)) {
1992 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1993 return WriteBatchInternal::Put(rebuilding_trx_
, column_family_id
, key
,
1995 // else insert the values to the memtable right away
1999 if (UNLIKELY(!SeekToColumnFamily(column_family_id
, &ret_status
))) {
2000 if (ret_status
.ok() && rebuilding_trx_
!= nullptr) {
2001 assert(!write_after_commit_
);
2002 // The CF is probably flushed and hence no need for insert but we still
2003 // need to keep track of the keys for upcoming rollback/commit.
2004 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2005 ret_status
= WriteBatchInternal::Put(rebuilding_trx_
, column_family_id
,
2007 if (ret_status
.ok()) {
2008 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id
, key
));
2010 } else if (ret_status
.ok()) {
2011 MaybeAdvanceSeq(false /* batch_boundary */);
2015 assert(ret_status
.ok());
2017 MemTable
* mem
= cf_mems_
->GetMemTable();
2018 auto* moptions
= mem
->GetImmutableMemTableOptions();
2019 // inplace_update_support is inconsistent with snapshots, and therefore with
2020 // any kind of transactions including the ones that use seq_per_batch
2021 assert(!seq_per_batch_
|| !moptions
->inplace_update_support
);
2022 if (!moptions
->inplace_update_support
) {
2024 mem
->Add(sequence_
, value_type
, key
, value
, kv_prot_info
,
2025 concurrent_memtable_writes_
, get_post_process_info(mem
),
2026 hint_per_batch_
? &GetHintMap()[mem
] : nullptr);
2027 } else if (moptions
->inplace_callback
== nullptr ||
2028 value_type
!= kTypeValue
) {
2029 assert(!concurrent_memtable_writes_
);
2030 ret_status
= mem
->Update(sequence_
, value_type
, key
, value
, kv_prot_info
);
2032 assert(!concurrent_memtable_writes_
);
2033 assert(value_type
== kTypeValue
);
2034 ret_status
= mem
->UpdateCallback(sequence_
, key
, value
, kv_prot_info
);
2035 if (ret_status
.IsNotFound()) {
2036 // key not found in memtable. Do sst get, update, add
2037 SnapshotImpl read_from_snapshot
;
2038 read_from_snapshot
.number_
= sequence_
;
2040 // it's going to be overwritten for sure, so no point caching data block
2041 // containing the old version
2042 ropts
.fill_cache
= false;
2043 ropts
.snapshot
= &read_from_snapshot
;
2045 std::string prev_value
;
2046 std::string merged_value
;
2048 auto cf_handle
= cf_mems_
->GetColumnFamilyHandle();
2049 Status get_status
= Status::NotSupported();
2050 if (db_
!= nullptr && recovering_log_number_
== 0) {
2051 if (cf_handle
== nullptr) {
2052 cf_handle
= db_
->DefaultColumnFamily();
2054 // TODO (yanqin): fix when user-defined timestamp is enabled.
2055 get_status
= db_
->Get(ropts
, cf_handle
, key
, &prev_value
);
2057 // Intentionally overwrites the `NotFound` in `ret_status`.
2058 if (!get_status
.ok() && !get_status
.IsNotFound()) {
2059 ret_status
= get_status
;
2061 ret_status
= Status::OK();
2063 if (ret_status
.ok()) {
2064 UpdateStatus update_status
;
2065 char* prev_buffer
= const_cast<char*>(prev_value
.c_str());
2066 uint32_t prev_size
= static_cast<uint32_t>(prev_value
.size());
2067 if (get_status
.ok()) {
2068 update_status
= moptions
->inplace_callback(prev_buffer
, &prev_size
,
2069 value
, &merged_value
);
2071 update_status
= moptions
->inplace_callback(
2072 nullptr /* existing_value */, nullptr /* existing_value_size */,
2073 value
, &merged_value
);
2075 if (update_status
== UpdateStatus::UPDATED_INPLACE
) {
2076 assert(get_status
.ok());
2077 if (kv_prot_info
!= nullptr) {
2078 ProtectionInfoKVOS64
updated_kv_prot_info(*kv_prot_info
);
2079 updated_kv_prot_info
.UpdateV(value
,
2080 Slice(prev_buffer
, prev_size
));
2081 // prev_value is updated in-place with final value.
2082 ret_status
= mem
->Add(sequence_
, value_type
, key
,
2083 Slice(prev_buffer
, prev_size
),
2084 &updated_kv_prot_info
);
2086 ret_status
= mem
->Add(sequence_
, value_type
, key
,
2087 Slice(prev_buffer
, prev_size
),
2088 nullptr /* kv_prot_info */);
2090 if (ret_status
.ok()) {
2091 RecordTick(moptions
->statistics
, NUMBER_KEYS_WRITTEN
);
2093 } else if (update_status
== UpdateStatus::UPDATED
) {
2094 if (kv_prot_info
!= nullptr) {
2095 ProtectionInfoKVOS64
updated_kv_prot_info(*kv_prot_info
);
2096 updated_kv_prot_info
.UpdateV(value
, merged_value
);
2097 // merged_value contains the final value.
2098 ret_status
= mem
->Add(sequence_
, value_type
, key
,
2099 Slice(merged_value
), &updated_kv_prot_info
);
2101 // merged_value contains the final value.
2103 mem
->Add(sequence_
, value_type
, key
, Slice(merged_value
),
2104 nullptr /* kv_prot_info */);
2106 if (ret_status
.ok()) {
2107 RecordTick(moptions
->statistics
, NUMBER_KEYS_WRITTEN
);
2113 if (UNLIKELY(ret_status
.IsTryAgain())) {
2114 assert(seq_per_batch_
);
2115 const bool kBatchBoundary
= true;
2116 MaybeAdvanceSeq(kBatchBoundary
);
2117 } else if (ret_status
.ok()) {
2119 CheckMemtableFull();
2121 // optimize for non-recovery mode
2122 // If `ret_status` is `TryAgain` then the next (successful) try will add
2123 // the key to the rebuilding transaction object. If `ret_status` is
2124 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2125 // away. So we only need to add to it when `ret_status.ok()`.
2126 if (UNLIKELY(ret_status
.ok() && rebuilding_trx_
!= nullptr)) {
2127 assert(!write_after_commit_
);
2128 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2129 ret_status
= WriteBatchInternal::Put(rebuilding_trx_
, column_family_id
,
2135 Status
PutCF(uint32_t column_family_id
, const Slice
& key
,
2136 const Slice
& value
) override
{
2137 const auto* kv_prot_info
= NextProtectionInfo();
2139 if (kv_prot_info
!= nullptr) {
2140 // Memtable needs seqno, doesn't need CF ID
2141 auto mem_kv_prot_info
=
2142 kv_prot_info
->StripC(column_family_id
).ProtectS(sequence_
);
2143 ret_status
= PutCFImpl(column_family_id
, key
, value
, kTypeValue
,
2146 ret_status
= PutCFImpl(column_family_id
, key
, value
, kTypeValue
,
2147 nullptr /* kv_prot_info */);
2149 // TODO: this assumes that if TryAgain status is returned to the caller,
2150 // the operation is actually tried again. The proper way to do this is to
2151 // pass a `try_again` parameter to the operation itself and decrement
2152 // prot_info_idx_ based on that
2153 if (UNLIKELY(ret_status
.IsTryAgain())) {
2154 DecrementProtectionInfoIdxForTryAgain();
2159 Status
PutEntityCF(uint32_t column_family_id
, const Slice
& key
,
2160 const Slice
& value
) override
{
2161 const auto* kv_prot_info
= NextProtectionInfo();
2165 // Memtable needs seqno, doesn't need CF ID
2166 auto mem_kv_prot_info
=
2167 kv_prot_info
->StripC(column_family_id
).ProtectS(sequence_
);
2168 s
= PutCFImpl(column_family_id
, key
, value
, kTypeWideColumnEntity
,
2171 s
= PutCFImpl(column_family_id
, key
, value
, kTypeWideColumnEntity
,
2172 /* kv_prot_info */ nullptr);
2175 if (UNLIKELY(s
.IsTryAgain())) {
2176 DecrementProtectionInfoIdxForTryAgain();
2182 Status
DeleteImpl(uint32_t /*column_family_id*/, const Slice
& key
,
2183 const Slice
& value
, ValueType delete_type
,
2184 const ProtectionInfoKVOS64
* kv_prot_info
) {
2186 MemTable
* mem
= cf_mems_
->GetMemTable();
2188 mem
->Add(sequence_
, delete_type
, key
, value
, kv_prot_info
,
2189 concurrent_memtable_writes_
, get_post_process_info(mem
),
2190 hint_per_batch_
? &GetHintMap()[mem
] : nullptr);
2191 if (UNLIKELY(ret_status
.IsTryAgain())) {
2192 assert(seq_per_batch_
);
2193 const bool kBatchBoundary
= true;
2194 MaybeAdvanceSeq(kBatchBoundary
);
2195 } else if (ret_status
.ok()) {
2197 CheckMemtableFull();
2202 Status
DeleteCF(uint32_t column_family_id
, const Slice
& key
) override
{
2203 const auto* kv_prot_info
= NextProtectionInfo();
2204 // optimize for non-recovery mode
2205 if (UNLIKELY(write_after_commit_
&& rebuilding_trx_
!= nullptr)) {
2206 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2207 return WriteBatchInternal::Delete(rebuilding_trx_
, column_family_id
, key
);
2208 // else insert the values to the memtable right away
2212 if (UNLIKELY(!SeekToColumnFamily(column_family_id
, &ret_status
))) {
2213 if (ret_status
.ok() && rebuilding_trx_
!= nullptr) {
2214 assert(!write_after_commit_
);
2215 // The CF is probably flushed and hence no need for insert but we still
2216 // need to keep track of the keys for upcoming rollback/commit.
2217 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2219 WriteBatchInternal::Delete(rebuilding_trx_
, column_family_id
, key
);
2220 if (ret_status
.ok()) {
2221 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id
, key
));
2223 } else if (ret_status
.ok()) {
2224 MaybeAdvanceSeq(false /* batch_boundary */);
2226 if (UNLIKELY(ret_status
.IsTryAgain())) {
2227 DecrementProtectionInfoIdxForTryAgain();
2232 ColumnFamilyData
* cfd
= cf_mems_
->current();
2233 assert(!cfd
|| cfd
->user_comparator());
2234 const size_t ts_sz
= (cfd
&& cfd
->user_comparator())
2235 ? cfd
->user_comparator()->timestamp_size()
2237 const ValueType delete_type
=
2238 (0 == ts_sz
) ? kTypeDeletion
: kTypeDeletionWithTimestamp
;
2239 if (kv_prot_info
!= nullptr) {
2240 auto mem_kv_prot_info
=
2241 kv_prot_info
->StripC(column_family_id
).ProtectS(sequence_
);
2242 mem_kv_prot_info
.UpdateO(kTypeDeletion
, delete_type
);
2243 ret_status
= DeleteImpl(column_family_id
, key
, Slice(), delete_type
,
2246 ret_status
= DeleteImpl(column_family_id
, key
, Slice(), delete_type
,
2247 nullptr /* kv_prot_info */);
2249 // optimize for non-recovery mode
2250 // If `ret_status` is `TryAgain` then the next (successful) try will add
2251 // the key to the rebuilding transaction object. If `ret_status` is
2252 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2253 // away. So we only need to add to it when `ret_status.ok()`.
2254 if (UNLIKELY(ret_status
.ok() && rebuilding_trx_
!= nullptr)) {
2255 assert(!write_after_commit_
);
2256 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2258 WriteBatchInternal::Delete(rebuilding_trx_
, column_family_id
, key
);
2260 if (UNLIKELY(ret_status
.IsTryAgain())) {
2261 DecrementProtectionInfoIdxForTryAgain();
2266 Status
SingleDeleteCF(uint32_t column_family_id
, const Slice
& key
) override
{
2267 const auto* kv_prot_info
= NextProtectionInfo();
2268 // optimize for non-recovery mode
2269 if (UNLIKELY(write_after_commit_
&& rebuilding_trx_
!= nullptr)) {
2270 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2271 return WriteBatchInternal::SingleDelete(rebuilding_trx_
, column_family_id
,
2273 // else insert the values to the memtable right away
2277 if (UNLIKELY(!SeekToColumnFamily(column_family_id
, &ret_status
))) {
2278 if (ret_status
.ok() && rebuilding_trx_
!= nullptr) {
2279 assert(!write_after_commit_
);
2280 // The CF is probably flushed and hence no need for insert but we still
2281 // need to keep track of the keys for upcoming rollback/commit.
2282 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2283 ret_status
= WriteBatchInternal::SingleDelete(rebuilding_trx_
,
2284 column_family_id
, key
);
2285 if (ret_status
.ok()) {
2286 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id
, key
));
2288 } else if (ret_status
.ok()) {
2289 MaybeAdvanceSeq(false /* batch_boundary */);
2291 if (UNLIKELY(ret_status
.IsTryAgain())) {
2292 DecrementProtectionInfoIdxForTryAgain();
2296 assert(ret_status
.ok());
2298 if (kv_prot_info
!= nullptr) {
2299 auto mem_kv_prot_info
=
2300 kv_prot_info
->StripC(column_family_id
).ProtectS(sequence_
);
2301 ret_status
= DeleteImpl(column_family_id
, key
, Slice(),
2302 kTypeSingleDeletion
, &mem_kv_prot_info
);
2304 ret_status
= DeleteImpl(column_family_id
, key
, Slice(),
2305 kTypeSingleDeletion
, nullptr /* kv_prot_info */);
2307 // optimize for non-recovery mode
2308 // If `ret_status` is `TryAgain` then the next (successful) try will add
2309 // the key to the rebuilding transaction object. If `ret_status` is
2310 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2311 // away. So we only need to add to it when `ret_status.ok()`.
2312 if (UNLIKELY(ret_status
.ok() && rebuilding_trx_
!= nullptr)) {
2313 assert(!write_after_commit_
);
2314 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2315 ret_status
= WriteBatchInternal::SingleDelete(rebuilding_trx_
,
2316 column_family_id
, key
);
2318 if (UNLIKELY(ret_status
.IsTryAgain())) {
2319 DecrementProtectionInfoIdxForTryAgain();
2324 Status
DeleteRangeCF(uint32_t column_family_id
, const Slice
& begin_key
,
2325 const Slice
& end_key
) override
{
2326 const auto* kv_prot_info
= NextProtectionInfo();
2327 // optimize for non-recovery mode
2328 if (UNLIKELY(write_after_commit_
&& rebuilding_trx_
!= nullptr)) {
2329 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2330 return WriteBatchInternal::DeleteRange(rebuilding_trx_
, column_family_id
,
2331 begin_key
, end_key
);
2332 // else insert the values to the memtable right away
2336 if (UNLIKELY(!SeekToColumnFamily(column_family_id
, &ret_status
))) {
2337 if (ret_status
.ok() && rebuilding_trx_
!= nullptr) {
2338 assert(!write_after_commit_
);
2339 // The CF is probably flushed and hence no need for insert but we still
2340 // need to keep track of the keys for upcoming rollback/commit.
2341 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2342 ret_status
= WriteBatchInternal::DeleteRange(
2343 rebuilding_trx_
, column_family_id
, begin_key
, end_key
);
2344 if (ret_status
.ok()) {
2345 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id
, begin_key
));
2347 } else if (ret_status
.ok()) {
2348 MaybeAdvanceSeq(false /* batch_boundary */);
2350 if (UNLIKELY(ret_status
.IsTryAgain())) {
2351 DecrementProtectionInfoIdxForTryAgain();
2355 assert(ret_status
.ok());
2357 if (db_
!= nullptr) {
2358 auto cf_handle
= cf_mems_
->GetColumnFamilyHandle();
2359 if (cf_handle
== nullptr) {
2360 cf_handle
= db_
->DefaultColumnFamily();
2363 static_cast_with_check
<ColumnFamilyHandleImpl
>(cf_handle
)->cfd();
2364 if (!cfd
->is_delete_range_supported()) {
2365 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
2366 ret_status
.PermitUncheckedError();
2367 return Status::NotSupported(
2368 std::string("DeleteRange not supported for table type ") +
2369 cfd
->ioptions()->table_factory
->Name() + " in CF " +
2373 cfd
->user_comparator()->CompareWithoutTimestamp(begin_key
, end_key
);
2375 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
2376 ret_status
.PermitUncheckedError();
2377 // It's an empty range where endpoints appear mistaken. Don't bother
2378 // applying it to the DB, and return an error to the user.
2379 return Status::InvalidArgument("end key comes before start key");
2380 } else if (cmp
== 0) {
2381 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
2382 ret_status
.PermitUncheckedError();
2383 // It's an empty range. Don't bother applying it to the DB.
2384 return Status::OK();
2388 if (kv_prot_info
!= nullptr) {
2389 auto mem_kv_prot_info
=
2390 kv_prot_info
->StripC(column_family_id
).ProtectS(sequence_
);
2391 ret_status
= DeleteImpl(column_family_id
, begin_key
, end_key
,
2392 kTypeRangeDeletion
, &mem_kv_prot_info
);
2394 ret_status
= DeleteImpl(column_family_id
, begin_key
, end_key
,
2395 kTypeRangeDeletion
, nullptr /* kv_prot_info */);
2397 // optimize for non-recovery mode
2398 // If `ret_status` is `TryAgain` then the next (successful) try will add
2399 // the key to the rebuilding transaction object. If `ret_status` is
2400 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2401 // away. So we only need to add to it when `ret_status.ok()`.
2402 if (UNLIKELY(!ret_status
.IsTryAgain() && rebuilding_trx_
!= nullptr)) {
2403 assert(!write_after_commit_
);
2404 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2405 ret_status
= WriteBatchInternal::DeleteRange(
2406 rebuilding_trx_
, column_family_id
, begin_key
, end_key
);
2408 if (UNLIKELY(ret_status
.IsTryAgain())) {
2409 DecrementProtectionInfoIdxForTryAgain();
2414 Status
MergeCF(uint32_t column_family_id
, const Slice
& key
,
2415 const Slice
& value
) override
{
2416 const auto* kv_prot_info
= NextProtectionInfo();
2417 // optimize for non-recovery mode
2418 if (UNLIKELY(write_after_commit_
&& rebuilding_trx_
!= nullptr)) {
2419 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2420 return WriteBatchInternal::Merge(rebuilding_trx_
, column_family_id
, key
,
2422 // else insert the values to the memtable right away
2426 if (UNLIKELY(!SeekToColumnFamily(column_family_id
, &ret_status
))) {
2427 if (ret_status
.ok() && rebuilding_trx_
!= nullptr) {
2428 assert(!write_after_commit_
);
2429 // The CF is probably flushed and hence no need for insert but we still
2430 // need to keep track of the keys for upcoming rollback/commit.
2431 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2432 ret_status
= WriteBatchInternal::Merge(rebuilding_trx_
,
2433 column_family_id
, key
, value
);
2434 if (ret_status
.ok()) {
2435 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id
, key
));
2437 } else if (ret_status
.ok()) {
2438 MaybeAdvanceSeq(false /* batch_boundary */);
2440 if (UNLIKELY(ret_status
.IsTryAgain())) {
2441 DecrementProtectionInfoIdxForTryAgain();
2445 assert(ret_status
.ok());
2447 MemTable
* mem
= cf_mems_
->GetMemTable();
2448 auto* moptions
= mem
->GetImmutableMemTableOptions();
2449 if (moptions
->merge_operator
== nullptr) {
2450 return Status::InvalidArgument(
2451 "Merge requires `ColumnFamilyOptions::merge_operator != nullptr`");
2453 bool perform_merge
= false;
2454 assert(!concurrent_memtable_writes_
||
2455 moptions
->max_successive_merges
== 0);
2457 // If we pass DB through and options.max_successive_merges is hit
2458 // during recovery, Get() will be issued which will try to acquire
2459 // DB mutex and cause deadlock, as DB mutex is already held.
2460 // So we disable merge in recovery
2461 if (moptions
->max_successive_merges
> 0 && db_
!= nullptr &&
2462 recovering_log_number_
== 0) {
2463 assert(!concurrent_memtable_writes_
);
2464 LookupKey
lkey(key
, sequence_
);
2466 // Count the number of successive merges at the head
2467 // of the key in the memtable
2468 size_t num_merges
= mem
->CountSuccessiveMergeEntries(lkey
);
2470 if (num_merges
>= moptions
->max_successive_merges
) {
2471 perform_merge
= true;
2475 if (perform_merge
) {
2476 // 1) Get the existing value
2477 std::string get_value
;
2479 // Pass in the sequence number so that we also include previous merge
2480 // operations in the same batch.
2481 SnapshotImpl read_from_snapshot
;
2482 read_from_snapshot
.number_
= sequence_
;
2483 ReadOptions read_options
;
2484 read_options
.snapshot
= &read_from_snapshot
;
2486 auto cf_handle
= cf_mems_
->GetColumnFamilyHandle();
2487 if (cf_handle
== nullptr) {
2488 cf_handle
= db_
->DefaultColumnFamily();
2490 Status get_status
= db_
->Get(read_options
, cf_handle
, key
, &get_value
);
2491 if (!get_status
.ok()) {
2492 // Failed to read a key we know exists. Store the delta in memtable.
2493 perform_merge
= false;
2495 Slice get_value_slice
= Slice(get_value
);
2497 // 2) Apply this merge
2498 auto merge_operator
= moptions
->merge_operator
;
2499 assert(merge_operator
);
2501 std::string new_value
;
2502 Status merge_status
= MergeHelper::TimedFullMerge(
2503 merge_operator
, key
, &get_value_slice
, {value
}, &new_value
,
2504 moptions
->info_log
, moptions
->statistics
,
2505 SystemClock::Default().get(), /* result_operand */ nullptr,
2506 /* update_num_ops_stats */ false);
2508 if (!merge_status
.ok()) {
2510 // Store the delta in memtable
2511 perform_merge
= false;
2513 // 3) Add value to memtable
2514 assert(!concurrent_memtable_writes_
);
2515 if (kv_prot_info
!= nullptr) {
2516 auto merged_kv_prot_info
=
2517 kv_prot_info
->StripC(column_family_id
).ProtectS(sequence_
);
2518 merged_kv_prot_info
.UpdateV(value
, new_value
);
2519 merged_kv_prot_info
.UpdateO(kTypeMerge
, kTypeValue
);
2520 ret_status
= mem
->Add(sequence_
, kTypeValue
, key
, new_value
,
2521 &merged_kv_prot_info
);
2523 ret_status
= mem
->Add(sequence_
, kTypeValue
, key
, new_value
,
2524 nullptr /* kv_prot_info */);
2530 if (!perform_merge
) {
2531 assert(ret_status
.ok());
2532 // Add merge operand to memtable
2533 if (kv_prot_info
!= nullptr) {
2534 auto mem_kv_prot_info
=
2535 kv_prot_info
->StripC(column_family_id
).ProtectS(sequence_
);
2537 mem
->Add(sequence_
, kTypeMerge
, key
, value
, &mem_kv_prot_info
,
2538 concurrent_memtable_writes_
, get_post_process_info(mem
));
2540 ret_status
= mem
->Add(
2541 sequence_
, kTypeMerge
, key
, value
, nullptr /* kv_prot_info */,
2542 concurrent_memtable_writes_
, get_post_process_info(mem
));
2546 if (UNLIKELY(ret_status
.IsTryAgain())) {
2547 assert(seq_per_batch_
);
2548 const bool kBatchBoundary
= true;
2549 MaybeAdvanceSeq(kBatchBoundary
);
2550 } else if (ret_status
.ok()) {
2552 CheckMemtableFull();
2554 // optimize for non-recovery mode
2555 // If `ret_status` is `TryAgain` then the next (successful) try will add
2556 // the key to the rebuilding transaction object. If `ret_status` is
2557 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2558 // away. So we only need to add to it when `ret_status.ok()`.
2559 if (UNLIKELY(ret_status
.ok() && rebuilding_trx_
!= nullptr)) {
2560 assert(!write_after_commit_
);
2561 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2562 ret_status
= WriteBatchInternal::Merge(rebuilding_trx_
, column_family_id
,
2565 if (UNLIKELY(ret_status
.IsTryAgain())) {
2566 DecrementProtectionInfoIdxForTryAgain();
2571 Status
PutBlobIndexCF(uint32_t column_family_id
, const Slice
& key
,
2572 const Slice
& value
) override
{
2573 const auto* kv_prot_info
= NextProtectionInfo();
2575 if (kv_prot_info
!= nullptr) {
2576 // Memtable needs seqno, doesn't need CF ID
2577 auto mem_kv_prot_info
=
2578 kv_prot_info
->StripC(column_family_id
).ProtectS(sequence_
);
2579 // Same as PutCF except for value type.
2580 ret_status
= PutCFImpl(column_family_id
, key
, value
, kTypeBlobIndex
,
2583 ret_status
= PutCFImpl(column_family_id
, key
, value
, kTypeBlobIndex
,
2584 nullptr /* kv_prot_info */);
2586 if (UNLIKELY(ret_status
.IsTryAgain())) {
2587 DecrementProtectionInfoIdxForTryAgain();
2592 void CheckMemtableFull() {
2593 if (flush_scheduler_
!= nullptr) {
2594 auto* cfd
= cf_mems_
->current();
2595 assert(cfd
!= nullptr);
2596 if (cfd
->mem()->ShouldScheduleFlush() &&
2597 cfd
->mem()->MarkFlushScheduled()) {
2598 // MarkFlushScheduled only returns true if we are the one that
2599 // should take action, so no need to dedup further
2600 flush_scheduler_
->ScheduleWork(cfd
);
2603 // check if memtable_list size exceeds max_write_buffer_size_to_maintain
2604 if (trim_history_scheduler_
!= nullptr) {
2605 auto* cfd
= cf_mems_
->current();
2608 assert(cfd
->ioptions());
2610 const size_t size_to_maintain
= static_cast<size_t>(
2611 cfd
->ioptions()->max_write_buffer_size_to_maintain
);
2613 if (size_to_maintain
> 0) {
2614 MemTableList
* const imm
= cfd
->imm();
2617 if (imm
->HasHistory()) {
2618 const MemTable
* const mem
= cfd
->mem();
2621 if (mem
->MemoryAllocatedBytes() +
2622 imm
->MemoryAllocatedBytesExcludingLast() >=
2624 imm
->MarkTrimHistoryNeeded()) {
2625 trim_history_scheduler_
->ScheduleWork(cfd
);
2632 // The write batch handler calls MarkBeginPrepare with unprepare set to true
2633 // if it encounters the kTypeBeginUnprepareXID marker.
2634 Status
MarkBeginPrepare(bool unprepare
) override
{
2635 assert(rebuilding_trx_
== nullptr);
2638 if (recovering_log_number_
!= 0) {
2639 db_
->mutex()->AssertHeld();
2640 // during recovery we rebuild a hollow transaction
2641 // from all encountered prepare sections of the wal
2642 if (db_
->allow_2pc() == false) {
2643 return Status::NotSupported(
2644 "WAL contains prepared transactions. Open with "
2645 "TransactionDB::Open().");
2648 // we are now iterating through a prepared section
2649 rebuilding_trx_
= new WriteBatch();
2650 rebuilding_trx_seq_
= sequence_
;
2651 // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
2652 // unprepared_batch_ should be false because it is false by default, and
2653 // gets reset to false in MarkEndPrepare.
2654 assert(!unprepared_batch_
);
2655 unprepared_batch_
= unprepare
;
2657 if (has_valid_writes_
!= nullptr) {
2658 *has_valid_writes_
= true;
2662 return Status::OK();
2665 Status
MarkEndPrepare(const Slice
& name
) override
{
2667 assert((rebuilding_trx_
!= nullptr) == (recovering_log_number_
!= 0));
2669 if (recovering_log_number_
!= 0) {
2670 db_
->mutex()->AssertHeld();
2671 assert(db_
->allow_2pc());
2674 ? 0 // 0 will disable further checks
2675 : static_cast<size_t>(sequence_
- rebuilding_trx_seq_
+ 1);
2676 db_
->InsertRecoveredTransaction(recovering_log_number_
, name
.ToString(),
2677 rebuilding_trx_
, rebuilding_trx_seq_
,
2678 batch_cnt
, unprepared_batch_
);
2679 unprepared_batch_
= false;
2680 rebuilding_trx_
= nullptr;
2682 assert(rebuilding_trx_
== nullptr);
2684 const bool batch_boundry
= true;
2685 MaybeAdvanceSeq(batch_boundry
);
2687 return Status::OK();
2690 Status
MarkNoop(bool empty_batch
) override
{
2691 if (recovering_log_number_
!= 0) {
2692 db_
->mutex()->AssertHeld();
2694 // A hack in pessimistic transaction could result into a noop at the start
2695 // of the write batch, that should be ignored.
2697 // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
2698 // a batch. This happens when write batch commits skipping the prepare
2700 const bool batch_boundry
= true;
2701 MaybeAdvanceSeq(batch_boundry
);
2703 return Status::OK();
2706 Status
MarkCommit(const Slice
& name
) override
{
2711 if (recovering_log_number_
!= 0) {
2712 // We must hold db mutex in recovery.
2713 db_
->mutex()->AssertHeld();
2714 // in recovery when we encounter a commit marker
2715 // we lookup this transaction in our set of rebuilt transactions
2717 auto trx
= db_
->GetRecoveredTransaction(name
.ToString());
2719 // the log containing the prepared section may have
2720 // been released in the last incarnation because the
2721 // data was flushed to L0
2722 if (trx
!= nullptr) {
2723 // at this point individual CF lognumbers will prevent
2724 // duplicate re-insertion of values.
2725 assert(log_number_ref_
== 0);
2726 if (write_after_commit_
) {
2727 // write_after_commit_ can only have one batch in trx.
2728 assert(trx
->batches_
.size() == 1);
2729 const auto& batch_info
= trx
->batches_
.begin()->second
;
2730 // all inserts must reference this trx log number
2731 log_number_ref_
= batch_info
.log_number_
;
2732 ResetProtectionInfo();
2733 s
= batch_info
.batch_
->Iterate(this);
2734 log_number_ref_
= 0;
2736 // else the values are already inserted before the commit
2739 db_
->DeleteRecoveredTransaction(name
.ToString());
2741 if (has_valid_writes_
!= nullptr) {
2742 *has_valid_writes_
= true;
2746 // When writes are not delayed until commit, there is no disconnect
2747 // between a memtable write and the WAL that supports it. So the commit
2748 // need not reference any log as the only log to which it depends.
2749 assert(!write_after_commit_
|| log_number_ref_
> 0);
2751 const bool batch_boundry
= true;
2752 MaybeAdvanceSeq(batch_boundry
);
2754 if (UNLIKELY(s
.IsTryAgain())) {
2755 DecrementProtectionInfoIdxForTryAgain();
2761 Status
MarkCommitWithTimestamp(const Slice
& name
,
2762 const Slice
& commit_ts
) override
{
2767 if (recovering_log_number_
!= 0) {
2768 // In recovery, db mutex must be held.
2769 db_
->mutex()->AssertHeld();
2770 // in recovery when we encounter a commit marker
2771 // we lookup this transaction in our set of rebuilt transactions
2773 auto trx
= db_
->GetRecoveredTransaction(name
.ToString());
2774 // the log containing the prepared section may have
2775 // been released in the last incarnation because the
2776 // data was flushed to L0
2778 // at this point individual CF lognumbers will prevent
2779 // duplicate re-insertion of values.
2780 assert(0 == log_number_ref_
);
2781 if (write_after_commit_
) {
2782 // write_after_commit_ can only have one batch in trx.
2783 assert(trx
->batches_
.size() == 1);
2784 const auto& batch_info
= trx
->batches_
.begin()->second
;
2785 // all inserts must reference this trx log number
2786 log_number_ref_
= batch_info
.log_number_
;
2788 s
= batch_info
.batch_
->UpdateTimestamps(
2789 commit_ts
, [this](uint32_t cf
) {
2791 VersionSet
* const vset
= db_
->GetVersionSet();
2793 ColumnFamilySet
* const cf_set
= vset
->GetColumnFamilySet();
2795 ColumnFamilyData
* cfd
= cf_set
->GetColumnFamily(cf
);
2797 const auto* const ucmp
= cfd
->user_comparator();
2799 return ucmp
->timestamp_size();
2802 ResetProtectionInfo();
2803 s
= batch_info
.batch_
->Iterate(this);
2804 log_number_ref_
= 0;
2807 // else the values are already inserted before the commit
2810 db_
->DeleteRecoveredTransaction(name
.ToString());
2812 if (has_valid_writes_
) {
2813 *has_valid_writes_
= true;
2817 // When writes are not delayed until commit, there is no connection
2818 // between a memtable write and the WAL that supports it. So the commit
2819 // need not reference any log as the only log to which it depends.
2820 assert(!write_after_commit_
|| log_number_ref_
> 0);
2822 constexpr bool batch_boundary
= true;
2823 MaybeAdvanceSeq(batch_boundary
);
2825 if (UNLIKELY(s
.IsTryAgain())) {
2826 DecrementProtectionInfoIdxForTryAgain();
2832 Status
MarkRollback(const Slice
& name
) override
{
2835 if (recovering_log_number_
!= 0) {
2836 auto trx
= db_
->GetRecoveredTransaction(name
.ToString());
2838 // the log containing the transactions prep section
2839 // may have been released in the previous incarnation
2840 // because we knew it had been rolled back
2841 if (trx
!= nullptr) {
2842 db_
->DeleteRecoveredTransaction(name
.ToString());
2845 // in non recovery we simply ignore this tag
2848 const bool batch_boundry
= true;
2849 MaybeAdvanceSeq(batch_boundry
);
2851 return Status::OK();
2855 MemTablePostProcessInfo
* get_post_process_info(MemTable
* mem
) {
2856 if (!concurrent_memtable_writes_
) {
2857 // No need to batch counters locally if we don't use concurrent mode.
2860 return &GetPostMap()[mem
];
2864 } // anonymous namespace
2866 // This function can only be called in these conditions:
2867 // 1) During Recovery()
2868 // 2) During Write(), in a single-threaded write thread
2869 // 3) During Write(), in a concurrent context where memtables has been cloned
2870 // The reason is that it calls memtables->Seek(), which has a stateful cache
2871 Status
WriteBatchInternal::InsertInto(
2872 WriteThread::WriteGroup
& write_group
, SequenceNumber sequence
,
2873 ColumnFamilyMemTables
* memtables
, FlushScheduler
* flush_scheduler
,
2874 TrimHistoryScheduler
* trim_history_scheduler
,
2875 bool ignore_missing_column_families
, uint64_t recovery_log_number
, DB
* db
,
2876 bool concurrent_memtable_writes
, bool seq_per_batch
, bool batch_per_txn
) {
2877 MemTableInserter
inserter(
2878 sequence
, memtables
, flush_scheduler
, trim_history_scheduler
,
2879 ignore_missing_column_families
, recovery_log_number
, db
,
2880 concurrent_memtable_writes
, nullptr /* prot_info */,
2881 nullptr /*has_valid_writes*/, seq_per_batch
, batch_per_txn
);
2882 for (auto w
: write_group
) {
2883 if (w
->CallbackFailed()) {
2886 w
->sequence
= inserter
.sequence();
2887 if (!w
->ShouldWriteToMemtable()) {
2888 // In seq_per_batch_ mode this advances the seq by one.
2889 inserter
.MaybeAdvanceSeq(true);
2892 SetSequence(w
->batch
, inserter
.sequence());
2893 inserter
.set_log_number_ref(w
->log_ref
);
2894 inserter
.set_prot_info(w
->batch
->prot_info_
.get());
2895 w
->status
= w
->batch
->Iterate(&inserter
);
2896 if (!w
->status
.ok()) {
2899 assert(!seq_per_batch
|| w
->batch_cnt
!= 0);
2900 assert(!seq_per_batch
|| inserter
.sequence() - w
->sequence
== w
->batch_cnt
);
2902 return Status::OK();
2905 Status
WriteBatchInternal::InsertInto(
2906 WriteThread::Writer
* writer
, SequenceNumber sequence
,
2907 ColumnFamilyMemTables
* memtables
, FlushScheduler
* flush_scheduler
,
2908 TrimHistoryScheduler
* trim_history_scheduler
,
2909 bool ignore_missing_column_families
, uint64_t log_number
, DB
* db
,
2910 bool concurrent_memtable_writes
, bool seq_per_batch
, size_t batch_cnt
,
2911 bool batch_per_txn
, bool hint_per_batch
) {
2915 assert(writer
->ShouldWriteToMemtable());
2916 MemTableInserter
inserter(sequence
, memtables
, flush_scheduler
,
2917 trim_history_scheduler
,
2918 ignore_missing_column_families
, log_number
, db
,
2919 concurrent_memtable_writes
, nullptr /* prot_info */,
2920 nullptr /*has_valid_writes*/, seq_per_batch
,
2921 batch_per_txn
, hint_per_batch
);
2922 SetSequence(writer
->batch
, sequence
);
2923 inserter
.set_log_number_ref(writer
->log_ref
);
2924 inserter
.set_prot_info(writer
->batch
->prot_info_
.get());
2925 Status s
= writer
->batch
->Iterate(&inserter
);
2926 assert(!seq_per_batch
|| batch_cnt
!= 0);
2927 assert(!seq_per_batch
|| inserter
.sequence() - sequence
== batch_cnt
);
2928 if (concurrent_memtable_writes
) {
2929 inserter
.PostProcess();
2934 Status
WriteBatchInternal::InsertInto(
2935 const WriteBatch
* batch
, ColumnFamilyMemTables
* memtables
,
2936 FlushScheduler
* flush_scheduler
,
2937 TrimHistoryScheduler
* trim_history_scheduler
,
2938 bool ignore_missing_column_families
, uint64_t log_number
, DB
* db
,
2939 bool concurrent_memtable_writes
, SequenceNumber
* next_seq
,
2940 bool* has_valid_writes
, bool seq_per_batch
, bool batch_per_txn
) {
2941 MemTableInserter
inserter(Sequence(batch
), memtables
, flush_scheduler
,
2942 trim_history_scheduler
,
2943 ignore_missing_column_families
, log_number
, db
,
2944 concurrent_memtable_writes
, batch
->prot_info_
.get(),
2945 has_valid_writes
, seq_per_batch
, batch_per_txn
);
2946 Status s
= batch
->Iterate(&inserter
);
2947 if (next_seq
!= nullptr) {
2948 *next_seq
= inserter
.sequence();
2950 if (concurrent_memtable_writes
) {
2951 inserter
.PostProcess();
2958 // This class updates protection info for a WriteBatch.
2959 class ProtectionInfoUpdater
: public WriteBatch::Handler
{
2961 explicit ProtectionInfoUpdater(WriteBatch::ProtectionInfo
* prot_info
)
2962 : prot_info_(prot_info
) {}
2964 ~ProtectionInfoUpdater() override
{}
2966 Status
PutCF(uint32_t cf
, const Slice
& key
, const Slice
& val
) override
{
2967 return UpdateProtInfo(cf
, key
, val
, kTypeValue
);
2970 Status
PutEntityCF(uint32_t cf
, const Slice
& key
,
2971 const Slice
& entity
) override
{
2972 return UpdateProtInfo(cf
, key
, entity
, kTypeWideColumnEntity
);
2975 Status
DeleteCF(uint32_t cf
, const Slice
& key
) override
{
2976 return UpdateProtInfo(cf
, key
, "", kTypeDeletion
);
2979 Status
SingleDeleteCF(uint32_t cf
, const Slice
& key
) override
{
2980 return UpdateProtInfo(cf
, key
, "", kTypeSingleDeletion
);
2983 Status
DeleteRangeCF(uint32_t cf
, const Slice
& begin_key
,
2984 const Slice
& end_key
) override
{
2985 return UpdateProtInfo(cf
, begin_key
, end_key
, kTypeRangeDeletion
);
2988 Status
MergeCF(uint32_t cf
, const Slice
& key
, const Slice
& val
) override
{
2989 return UpdateProtInfo(cf
, key
, val
, kTypeMerge
);
2992 Status
PutBlobIndexCF(uint32_t cf
, const Slice
& key
,
2993 const Slice
& val
) override
{
2994 return UpdateProtInfo(cf
, key
, val
, kTypeBlobIndex
);
2997 Status
MarkBeginPrepare(bool /* unprepare */) override
{
2998 return Status::OK();
3001 Status
MarkEndPrepare(const Slice
& /* xid */) override
{
3002 return Status::OK();
3005 Status
MarkCommit(const Slice
& /* xid */) override
{ return Status::OK(); }
3007 Status
MarkCommitWithTimestamp(const Slice
& /* xid */,
3008 const Slice
& /* ts */) override
{
3009 return Status::OK();
3012 Status
MarkRollback(const Slice
& /* xid */) override
{ return Status::OK(); }
3014 Status
MarkNoop(bool /* empty_batch */) override
{ return Status::OK(); }
3017 Status
UpdateProtInfo(uint32_t cf
, const Slice
& key
, const Slice
& val
,
3018 const ValueType op_type
) {
3020 prot_info_
->entries_
.emplace_back(
3021 ProtectionInfo64().ProtectKVO(key
, val
, op_type
).ProtectC(cf
));
3023 return Status::OK();
3027 ProtectionInfoUpdater(const ProtectionInfoUpdater
&) = delete;
3028 ProtectionInfoUpdater(ProtectionInfoUpdater
&&) = delete;
3029 ProtectionInfoUpdater
& operator=(const ProtectionInfoUpdater
&) = delete;
3030 ProtectionInfoUpdater
& operator=(ProtectionInfoUpdater
&&) = delete;
3032 WriteBatch::ProtectionInfo
* const prot_info_
= nullptr;
3035 } // anonymous namespace
3037 Status
WriteBatchInternal::SetContents(WriteBatch
* b
, const Slice
& contents
) {
3038 assert(contents
.size() >= WriteBatchInternal::kHeader
);
3039 assert(b
->prot_info_
== nullptr);
3041 b
->rep_
.assign(contents
.data(), contents
.size());
3042 b
->content_flags_
.store(ContentFlags::DEFERRED
, std::memory_order_relaxed
);
3043 return Status::OK();
3046 Status
WriteBatchInternal::Append(WriteBatch
* dst
, const WriteBatch
* src
,
3047 const bool wal_only
) {
3048 assert(dst
->Count() == 0 ||
3049 (dst
->prot_info_
== nullptr) == (src
->prot_info_
== nullptr));
3050 if ((src
->prot_info_
!= nullptr &&
3051 src
->prot_info_
->entries_
.size() != src
->Count()) ||
3052 (dst
->prot_info_
!= nullptr &&
3053 dst
->prot_info_
->entries_
.size() != dst
->Count())) {
3054 return Status::Corruption(
3055 "Write batch has inconsistent count and number of checksums");
3062 const SavePoint
& batch_end
= src
->GetWalTerminationPoint();
3064 if (wal_only
&& !batch_end
.is_cleared()) {
3065 src_len
= batch_end
.size
- WriteBatchInternal::kHeader
;
3066 src_count
= batch_end
.count
;
3067 src_flags
= batch_end
.content_flags
;
3069 src_len
= src
->rep_
.size() - WriteBatchInternal::kHeader
;
3070 src_count
= Count(src
);
3071 src_flags
= src
->content_flags_
.load(std::memory_order_relaxed
);
3074 if (src
->prot_info_
!= nullptr) {
3075 if (dst
->prot_info_
== nullptr) {
3076 dst
->prot_info_
.reset(new WriteBatch::ProtectionInfo());
3078 std::copy(src
->prot_info_
->entries_
.begin(),
3079 src
->prot_info_
->entries_
.begin() + src_count
,
3080 std::back_inserter(dst
->prot_info_
->entries_
));
3081 } else if (dst
->prot_info_
!= nullptr) {
3082 // dst has empty prot_info->entries
3083 // In this special case, we allow write batch without prot_info to
3084 // be appende to write batch with empty prot_info
3085 dst
->prot_info_
= nullptr;
3087 SetCount(dst
, Count(dst
) + src_count
);
3088 assert(src
->rep_
.size() >= WriteBatchInternal::kHeader
);
3089 dst
->rep_
.append(src
->rep_
.data() + WriteBatchInternal::kHeader
, src_len
);
3090 dst
->content_flags_
.store(
3091 dst
->content_flags_
.load(std::memory_order_relaxed
) | src_flags
,
3092 std::memory_order_relaxed
);
3093 return Status::OK();
3096 size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize
,
3097 size_t rightByteSize
) {
3098 if (leftByteSize
== 0 || rightByteSize
== 0) {
3099 return leftByteSize
+ rightByteSize
;
3101 return leftByteSize
+ rightByteSize
- WriteBatchInternal::kHeader
;
3105 Status
WriteBatchInternal::UpdateProtectionInfo(WriteBatch
* wb
,
3106 size_t bytes_per_key
,
3107 uint64_t* checksum
) {
3108 if (bytes_per_key
== 0) {
3109 if (wb
->prot_info_
!= nullptr) {
3110 wb
->prot_info_
.reset();
3111 return Status::OK();
3113 // Already not protected.
3114 return Status::OK();
3116 } else if (bytes_per_key
== 8) {
3117 if (wb
->prot_info_
== nullptr) {
3118 wb
->prot_info_
.reset(new WriteBatch::ProtectionInfo());
3119 ProtectionInfoUpdater
prot_info_updater(wb
->prot_info_
.get());
3120 Status s
= wb
->Iterate(&prot_info_updater
);
3121 if (s
.ok() && checksum
!= nullptr) {
3122 uint64_t expected_hash
= XXH3_64bits(wb
->rep_
.data(), wb
->rep_
.size());
3123 if (expected_hash
!= *checksum
) {
3124 return Status::Corruption("Write batch content corrupted.");
3129 // Already protected.
3130 return Status::OK();
3133 return Status::NotSupported(
3134 "WriteBatch protection info must be zero or eight bytes/key");
3137 } // namespace ROCKSDB_NAMESPACE