]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/write_batch.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / write_batch.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // WriteBatch::rep_ :=
11 // sequence: fixed64
12 // count: fixed32
13 // data: record[count]
14 // record :=
15 // kTypeValue varstring varstring
16 // kTypeDeletion varstring
17 // kTypeSingleDeletion varstring
18 // kTypeRangeDeletion varstring varstring
19 // kTypeMerge varstring varstring
20 // kTypeColumnFamilyValue varint32 varstring varstring
21 // kTypeColumnFamilyDeletion varint32 varstring
22 // kTypeColumnFamilySingleDeletion varint32 varstring
23 // kTypeColumnFamilyRangeDeletion varint32 varstring varstring
24 // kTypeColumnFamilyMerge varint32 varstring varstring
25 // kTypeBeginPrepareXID
26 // kTypeEndPrepareXID varstring
27 // kTypeCommitXID varstring
28 // kTypeCommitXIDAndTimestamp varstring varstring
29 // kTypeRollbackXID varstring
30 // kTypeBeginPersistedPrepareXID
31 // kTypeBeginUnprepareXID
32 // kTypeWideColumnEntity varstring varstring
33 // kTypeColumnFamilyWideColumnEntity varint32 varstring varstring
34 // kTypeNoop
35 // varstring :=
36 // len: varint32
37 // data: uint8[len]
38
39 #include "rocksdb/write_batch.h"
40
41 #include <algorithm>
42 #include <limits>
43 #include <map>
44 #include <stack>
45 #include <stdexcept>
46 #include <type_traits>
47 #include <unordered_map>
48 #include <vector>
49
50 #include "db/column_family.h"
51 #include "db/db_impl/db_impl.h"
52 #include "db/dbformat.h"
53 #include "db/flush_scheduler.h"
54 #include "db/kv_checksum.h"
55 #include "db/memtable.h"
56 #include "db/merge_context.h"
57 #include "db/snapshot_impl.h"
58 #include "db/trim_history_scheduler.h"
59 #include "db/wide/wide_column_serialization.h"
60 #include "db/write_batch_internal.h"
61 #include "monitoring/perf_context_imp.h"
62 #include "monitoring/statistics.h"
63 #include "port/lang.h"
64 #include "rocksdb/merge_operator.h"
65 #include "rocksdb/system_clock.h"
66 #include "util/autovector.h"
67 #include "util/cast_util.h"
68 #include "util/coding.h"
69 #include "util/duplicate_detector.h"
70 #include "util/string_util.h"
71
72 namespace ROCKSDB_NAMESPACE {
73
74 // anon namespace for file-local types
75 namespace {
76
77 enum ContentFlags : uint32_t {
78 DEFERRED = 1 << 0,
79 HAS_PUT = 1 << 1,
80 HAS_DELETE = 1 << 2,
81 HAS_SINGLE_DELETE = 1 << 3,
82 HAS_MERGE = 1 << 4,
83 HAS_BEGIN_PREPARE = 1 << 5,
84 HAS_END_PREPARE = 1 << 6,
85 HAS_COMMIT = 1 << 7,
86 HAS_ROLLBACK = 1 << 8,
87 HAS_DELETE_RANGE = 1 << 9,
88 HAS_BLOB_INDEX = 1 << 10,
89 HAS_BEGIN_UNPREPARE = 1 << 11,
90 HAS_PUT_ENTITY = 1 << 12,
91 };
92
93 struct BatchContentClassifier : public WriteBatch::Handler {
94 uint32_t content_flags = 0;
95
96 Status PutCF(uint32_t, const Slice&, const Slice&) override {
97 content_flags |= ContentFlags::HAS_PUT;
98 return Status::OK();
99 }
100
101 Status PutEntityCF(uint32_t /* column_family_id */, const Slice& /* key */,
102 const Slice& /* entity */) override {
103 content_flags |= ContentFlags::HAS_PUT_ENTITY;
104 return Status::OK();
105 }
106
107 Status DeleteCF(uint32_t, const Slice&) override {
108 content_flags |= ContentFlags::HAS_DELETE;
109 return Status::OK();
110 }
111
112 Status SingleDeleteCF(uint32_t, const Slice&) override {
113 content_flags |= ContentFlags::HAS_SINGLE_DELETE;
114 return Status::OK();
115 }
116
117 Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
118 content_flags |= ContentFlags::HAS_DELETE_RANGE;
119 return Status::OK();
120 }
121
122 Status MergeCF(uint32_t, const Slice&, const Slice&) override {
123 content_flags |= ContentFlags::HAS_MERGE;
124 return Status::OK();
125 }
126
127 Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
128 content_flags |= ContentFlags::HAS_BLOB_INDEX;
129 return Status::OK();
130 }
131
132 Status MarkBeginPrepare(bool unprepare) override {
133 content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
134 if (unprepare) {
135 content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
136 }
137 return Status::OK();
138 }
139
140 Status MarkEndPrepare(const Slice&) override {
141 content_flags |= ContentFlags::HAS_END_PREPARE;
142 return Status::OK();
143 }
144
145 Status MarkCommit(const Slice&) override {
146 content_flags |= ContentFlags::HAS_COMMIT;
147 return Status::OK();
148 }
149
150 Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
151 content_flags |= ContentFlags::HAS_COMMIT;
152 return Status::OK();
153 }
154
155 Status MarkRollback(const Slice&) override {
156 content_flags |= ContentFlags::HAS_ROLLBACK;
157 return Status::OK();
158 }
159 };
160
161 } // anonymous namespace
162
163 struct SavePoints {
164 std::stack<SavePoint, autovector<SavePoint>> stack;
165 };
166
167 WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes,
168 size_t protection_bytes_per_key, size_t default_cf_ts_sz)
169 : content_flags_(0),
170 max_bytes_(max_bytes),
171 default_cf_ts_sz_(default_cf_ts_sz),
172 rep_() {
173 // Currently `protection_bytes_per_key` can only be enabled at 8 bytes per
174 // entry.
175 assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8);
176 if (protection_bytes_per_key != 0) {
177 prot_info_.reset(new WriteBatch::ProtectionInfo());
178 }
179 rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
180 ? reserved_bytes
181 : WriteBatchInternal::kHeader);
182 rep_.resize(WriteBatchInternal::kHeader);
183 }
184
185 WriteBatch::WriteBatch(const std::string& rep)
186 : content_flags_(ContentFlags::DEFERRED), max_bytes_(0), rep_(rep) {}
187
188 WriteBatch::WriteBatch(std::string&& rep)
189 : content_flags_(ContentFlags::DEFERRED),
190 max_bytes_(0),
191 rep_(std::move(rep)) {}
192
193 WriteBatch::WriteBatch(const WriteBatch& src)
194 : wal_term_point_(src.wal_term_point_),
195 content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
196 max_bytes_(src.max_bytes_),
197 default_cf_ts_sz_(src.default_cf_ts_sz_),
198 rep_(src.rep_) {
199 if (src.save_points_ != nullptr) {
200 save_points_.reset(new SavePoints());
201 save_points_->stack = src.save_points_->stack;
202 }
203 if (src.prot_info_ != nullptr) {
204 prot_info_.reset(new WriteBatch::ProtectionInfo());
205 prot_info_->entries_ = src.prot_info_->entries_;
206 }
207 }
208
209 WriteBatch::WriteBatch(WriteBatch&& src) noexcept
210 : save_points_(std::move(src.save_points_)),
211 wal_term_point_(std::move(src.wal_term_point_)),
212 content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
213 max_bytes_(src.max_bytes_),
214 prot_info_(std::move(src.prot_info_)),
215 default_cf_ts_sz_(src.default_cf_ts_sz_),
216 rep_(std::move(src.rep_)) {}
217
218 WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
219 if (&src != this) {
220 this->~WriteBatch();
221 new (this) WriteBatch(src);
222 }
223 return *this;
224 }
225
226 WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
227 if (&src != this) {
228 this->~WriteBatch();
229 new (this) WriteBatch(std::move(src));
230 }
231 return *this;
232 }
233
234 WriteBatch::~WriteBatch() {}
235
236 WriteBatch::Handler::~Handler() {}
237
238 void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
239 // If the user has not specified something to do with blobs, then we ignore
240 // them.
241 }
242
243 bool WriteBatch::Handler::Continue() { return true; }
244
245 void WriteBatch::Clear() {
246 rep_.clear();
247 rep_.resize(WriteBatchInternal::kHeader);
248
249 content_flags_.store(0, std::memory_order_relaxed);
250
251 if (save_points_ != nullptr) {
252 while (!save_points_->stack.empty()) {
253 save_points_->stack.pop();
254 }
255 }
256
257 if (prot_info_ != nullptr) {
258 prot_info_->entries_.clear();
259 }
260 wal_term_point_.clear();
261 default_cf_ts_sz_ = 0;
262 }
263
264 uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
265
266 uint32_t WriteBatch::ComputeContentFlags() const {
267 auto rv = content_flags_.load(std::memory_order_relaxed);
268 if ((rv & ContentFlags::DEFERRED) != 0) {
269 BatchContentClassifier classifier;
270 // Should we handle status here?
271 Iterate(&classifier).PermitUncheckedError();
272 rv = classifier.content_flags;
273
274 // this method is conceptually const, because it is performing a lazy
275 // computation that doesn't affect the abstract state of the batch.
276 // content_flags_ is marked mutable so that we can perform the
277 // following assignment
278 content_flags_.store(rv, std::memory_order_relaxed);
279 }
280 return rv;
281 }
282
283 void WriteBatch::MarkWalTerminationPoint() {
284 wal_term_point_.size = GetDataSize();
285 wal_term_point_.count = Count();
286 wal_term_point_.content_flags = content_flags_;
287 }
288
289 size_t WriteBatch::GetProtectionBytesPerKey() const {
290 if (prot_info_ != nullptr) {
291 return prot_info_->GetBytesPerKey();
292 }
293 return 0;
294 }
295
296 bool WriteBatch::HasPut() const {
297 return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
298 }
299
300 bool WriteBatch::HasPutEntity() const {
301 return (ComputeContentFlags() & ContentFlags::HAS_PUT_ENTITY) != 0;
302 }
303
304 bool WriteBatch::HasDelete() const {
305 return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
306 }
307
308 bool WriteBatch::HasSingleDelete() const {
309 return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
310 }
311
312 bool WriteBatch::HasDeleteRange() const {
313 return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
314 }
315
316 bool WriteBatch::HasMerge() const {
317 return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
318 }
319
320 bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
321 assert(input != nullptr && key != nullptr);
322 // Skip tag byte
323 input->remove_prefix(1);
324
325 if (cf_record) {
326 // Skip column_family bytes
327 uint32_t cf;
328 if (!GetVarint32(input, &cf)) {
329 return false;
330 }
331 }
332
333 // Extract key
334 return GetLengthPrefixedSlice(input, key);
335 }
336
337 bool WriteBatch::HasBeginPrepare() const {
338 return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
339 }
340
341 bool WriteBatch::HasEndPrepare() const {
342 return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
343 }
344
345 bool WriteBatch::HasCommit() const {
346 return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
347 }
348
349 bool WriteBatch::HasRollback() const {
350 return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
351 }
352
353 Status ReadRecordFromWriteBatch(Slice* input, char* tag,
354 uint32_t* column_family, Slice* key,
355 Slice* value, Slice* blob, Slice* xid) {
356 assert(key != nullptr && value != nullptr);
357 *tag = (*input)[0];
358 input->remove_prefix(1);
359 *column_family = 0; // default
360 switch (*tag) {
361 case kTypeColumnFamilyValue:
362 if (!GetVarint32(input, column_family)) {
363 return Status::Corruption("bad WriteBatch Put");
364 }
365 FALLTHROUGH_INTENDED;
366 case kTypeValue:
367 if (!GetLengthPrefixedSlice(input, key) ||
368 !GetLengthPrefixedSlice(input, value)) {
369 return Status::Corruption("bad WriteBatch Put");
370 }
371 break;
372 case kTypeColumnFamilyDeletion:
373 case kTypeColumnFamilySingleDeletion:
374 if (!GetVarint32(input, column_family)) {
375 return Status::Corruption("bad WriteBatch Delete");
376 }
377 FALLTHROUGH_INTENDED;
378 case kTypeDeletion:
379 case kTypeSingleDeletion:
380 if (!GetLengthPrefixedSlice(input, key)) {
381 return Status::Corruption("bad WriteBatch Delete");
382 }
383 break;
384 case kTypeColumnFamilyRangeDeletion:
385 if (!GetVarint32(input, column_family)) {
386 return Status::Corruption("bad WriteBatch DeleteRange");
387 }
388 FALLTHROUGH_INTENDED;
389 case kTypeRangeDeletion:
390 // for range delete, "key" is begin_key, "value" is end_key
391 if (!GetLengthPrefixedSlice(input, key) ||
392 !GetLengthPrefixedSlice(input, value)) {
393 return Status::Corruption("bad WriteBatch DeleteRange");
394 }
395 break;
396 case kTypeColumnFamilyMerge:
397 if (!GetVarint32(input, column_family)) {
398 return Status::Corruption("bad WriteBatch Merge");
399 }
400 FALLTHROUGH_INTENDED;
401 case kTypeMerge:
402 if (!GetLengthPrefixedSlice(input, key) ||
403 !GetLengthPrefixedSlice(input, value)) {
404 return Status::Corruption("bad WriteBatch Merge");
405 }
406 break;
407 case kTypeColumnFamilyBlobIndex:
408 if (!GetVarint32(input, column_family)) {
409 return Status::Corruption("bad WriteBatch BlobIndex");
410 }
411 FALLTHROUGH_INTENDED;
412 case kTypeBlobIndex:
413 if (!GetLengthPrefixedSlice(input, key) ||
414 !GetLengthPrefixedSlice(input, value)) {
415 return Status::Corruption("bad WriteBatch BlobIndex");
416 }
417 break;
418 case kTypeLogData:
419 assert(blob != nullptr);
420 if (!GetLengthPrefixedSlice(input, blob)) {
421 return Status::Corruption("bad WriteBatch Blob");
422 }
423 break;
424 case kTypeNoop:
425 case kTypeBeginPrepareXID:
426 // This indicates that the prepared batch is also persisted in the db.
427 // This is used in WritePreparedTxn
428 case kTypeBeginPersistedPrepareXID:
429 // This is used in WriteUnpreparedTxn
430 case kTypeBeginUnprepareXID:
431 break;
432 case kTypeEndPrepareXID:
433 if (!GetLengthPrefixedSlice(input, xid)) {
434 return Status::Corruption("bad EndPrepare XID");
435 }
436 break;
437 case kTypeCommitXIDAndTimestamp:
438 if (!GetLengthPrefixedSlice(input, key)) {
439 return Status::Corruption("bad commit timestamp");
440 }
441 FALLTHROUGH_INTENDED;
442 case kTypeCommitXID:
443 if (!GetLengthPrefixedSlice(input, xid)) {
444 return Status::Corruption("bad Commit XID");
445 }
446 break;
447 case kTypeRollbackXID:
448 if (!GetLengthPrefixedSlice(input, xid)) {
449 return Status::Corruption("bad Rollback XID");
450 }
451 break;
452 case kTypeColumnFamilyWideColumnEntity:
453 if (!GetVarint32(input, column_family)) {
454 return Status::Corruption("bad WriteBatch PutEntity");
455 }
456 FALLTHROUGH_INTENDED;
457 case kTypeWideColumnEntity:
458 if (!GetLengthPrefixedSlice(input, key) ||
459 !GetLengthPrefixedSlice(input, value)) {
460 return Status::Corruption("bad WriteBatch PutEntity");
461 }
462 break;
463 default:
464 return Status::Corruption("unknown WriteBatch tag");
465 }
466 return Status::OK();
467 }
468
469 Status WriteBatch::Iterate(Handler* handler) const {
470 if (rep_.size() < WriteBatchInternal::kHeader) {
471 return Status::Corruption("malformed WriteBatch (too small)");
472 }
473
474 return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
475 rep_.size());
476 }
477
478 Status WriteBatchInternal::Iterate(const WriteBatch* wb,
479 WriteBatch::Handler* handler, size_t begin,
480 size_t end) {
481 if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
482 return Status::Corruption("Invalid start/end bounds for Iterate");
483 }
484 assert(begin <= end);
485 Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
486 bool whole_batch =
487 (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
488
489 Slice key, value, blob, xid;
490
491 // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
492 // the batch boundary symbols otherwise we would mis-count the number of
493 // batches. We do that by checking whether the accumulated batch is empty
494 // before seeing the next Noop.
495 bool empty_batch = true;
496 uint32_t found = 0;
497 Status s;
498 char tag = 0;
499 uint32_t column_family = 0; // default
500 bool last_was_try_again = false;
501 bool handler_continue = true;
502 while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
503 handler_continue = handler->Continue();
504 if (!handler_continue) {
505 break;
506 }
507
508 if (LIKELY(!s.IsTryAgain())) {
509 last_was_try_again = false;
510 tag = 0;
511 column_family = 0; // default
512
513 s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
514 &blob, &xid);
515 if (!s.ok()) {
516 return s;
517 }
518 } else {
519 assert(s.IsTryAgain());
520 assert(!last_was_try_again); // to detect infinite loop bugs
521 if (UNLIKELY(last_was_try_again)) {
522 return Status::Corruption(
523 "two consecutive TryAgain in WriteBatch handler; this is either a "
524 "software bug or data corruption.");
525 }
526 last_was_try_again = true;
527 s = Status::OK();
528 }
529
530 switch (tag) {
531 case kTypeColumnFamilyValue:
532 case kTypeValue:
533 assert(wb->content_flags_.load(std::memory_order_relaxed) &
534 (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
535 s = handler->PutCF(column_family, key, value);
536 if (LIKELY(s.ok())) {
537 empty_batch = false;
538 found++;
539 }
540 break;
541 case kTypeColumnFamilyDeletion:
542 case kTypeDeletion:
543 assert(wb->content_flags_.load(std::memory_order_relaxed) &
544 (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
545 s = handler->DeleteCF(column_family, key);
546 if (LIKELY(s.ok())) {
547 empty_batch = false;
548 found++;
549 }
550 break;
551 case kTypeColumnFamilySingleDeletion:
552 case kTypeSingleDeletion:
553 assert(wb->content_flags_.load(std::memory_order_relaxed) &
554 (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
555 s = handler->SingleDeleteCF(column_family, key);
556 if (LIKELY(s.ok())) {
557 empty_batch = false;
558 found++;
559 }
560 break;
561 case kTypeColumnFamilyRangeDeletion:
562 case kTypeRangeDeletion:
563 assert(wb->content_flags_.load(std::memory_order_relaxed) &
564 (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
565 s = handler->DeleteRangeCF(column_family, key, value);
566 if (LIKELY(s.ok())) {
567 empty_batch = false;
568 found++;
569 }
570 break;
571 case kTypeColumnFamilyMerge:
572 case kTypeMerge:
573 assert(wb->content_flags_.load(std::memory_order_relaxed) &
574 (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
575 s = handler->MergeCF(column_family, key, value);
576 if (LIKELY(s.ok())) {
577 empty_batch = false;
578 found++;
579 }
580 break;
581 case kTypeColumnFamilyBlobIndex:
582 case kTypeBlobIndex:
583 assert(wb->content_flags_.load(std::memory_order_relaxed) &
584 (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
585 s = handler->PutBlobIndexCF(column_family, key, value);
586 if (LIKELY(s.ok())) {
587 found++;
588 }
589 break;
590 case kTypeLogData:
591 handler->LogData(blob);
592 // A batch might have nothing but LogData. It is still a batch.
593 empty_batch = false;
594 break;
595 case kTypeBeginPrepareXID:
596 assert(wb->content_flags_.load(std::memory_order_relaxed) &
597 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
598 s = handler->MarkBeginPrepare();
599 assert(s.ok());
600 empty_batch = false;
601 if (handler->WriteAfterCommit() ==
602 WriteBatch::Handler::OptionState::kDisabled) {
603 s = Status::NotSupported(
604 "WriteCommitted txn tag when write_after_commit_ is disabled (in "
605 "WritePrepared/WriteUnprepared mode). If it is not due to "
606 "corruption, the WAL must be emptied before changing the "
607 "WritePolicy.");
608 }
609 if (handler->WriteBeforePrepare() ==
610 WriteBatch::Handler::OptionState::kEnabled) {
611 s = Status::NotSupported(
612 "WriteCommitted txn tag when write_before_prepare_ is enabled "
613 "(in WriteUnprepared mode). If it is not due to corruption, the "
614 "WAL must be emptied before changing the WritePolicy.");
615 }
616 break;
617 case kTypeBeginPersistedPrepareXID:
618 assert(wb->content_flags_.load(std::memory_order_relaxed) &
619 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
620 s = handler->MarkBeginPrepare();
621 assert(s.ok());
622 empty_batch = false;
623 if (handler->WriteAfterCommit() ==
624 WriteBatch::Handler::OptionState::kEnabled) {
625 s = Status::NotSupported(
626 "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
627 "is enabled (in default WriteCommitted mode). If it is not due "
628 "to corruption, the WAL must be emptied before changing the "
629 "WritePolicy.");
630 }
631 break;
632 case kTypeBeginUnprepareXID:
633 assert(wb->content_flags_.load(std::memory_order_relaxed) &
634 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
635 s = handler->MarkBeginPrepare(true /* unprepared */);
636 assert(s.ok());
637 empty_batch = false;
638 if (handler->WriteAfterCommit() ==
639 WriteBatch::Handler::OptionState::kEnabled) {
640 s = Status::NotSupported(
641 "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
642 "default WriteCommitted mode). If it is not due to corruption, "
643 "the WAL must be emptied before changing the WritePolicy.");
644 }
645 if (handler->WriteBeforePrepare() ==
646 WriteBatch::Handler::OptionState::kDisabled) {
647 s = Status::NotSupported(
648 "WriteUnprepared txn tag when write_before_prepare_ is disabled "
649 "(in WriteCommitted/WritePrepared mode). If it is not due to "
650 "corruption, the WAL must be emptied before changing the "
651 "WritePolicy.");
652 }
653 break;
654 case kTypeEndPrepareXID:
655 assert(wb->content_flags_.load(std::memory_order_relaxed) &
656 (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
657 s = handler->MarkEndPrepare(xid);
658 assert(s.ok());
659 empty_batch = true;
660 break;
661 case kTypeCommitXID:
662 assert(wb->content_flags_.load(std::memory_order_relaxed) &
663 (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
664 s = handler->MarkCommit(xid);
665 assert(s.ok());
666 empty_batch = true;
667 break;
668 case kTypeCommitXIDAndTimestamp:
669 assert(wb->content_flags_.load(std::memory_order_relaxed) &
670 (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
671 // key stores the commit timestamp.
672 assert(!key.empty());
673 s = handler->MarkCommitWithTimestamp(xid, key);
674 if (LIKELY(s.ok())) {
675 empty_batch = true;
676 }
677 break;
678 case kTypeRollbackXID:
679 assert(wb->content_flags_.load(std::memory_order_relaxed) &
680 (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
681 s = handler->MarkRollback(xid);
682 assert(s.ok());
683 empty_batch = true;
684 break;
685 case kTypeNoop:
686 s = handler->MarkNoop(empty_batch);
687 assert(s.ok());
688 empty_batch = true;
689 break;
690 case kTypeWideColumnEntity:
691 case kTypeColumnFamilyWideColumnEntity:
692 assert(wb->content_flags_.load(std::memory_order_relaxed) &
693 (ContentFlags::DEFERRED | ContentFlags::HAS_PUT_ENTITY));
694 s = handler->PutEntityCF(column_family, key, value);
695 if (LIKELY(s.ok())) {
696 empty_batch = false;
697 ++found;
698 }
699 break;
700 default:
701 return Status::Corruption("unknown WriteBatch tag");
702 }
703 }
704 if (!s.ok()) {
705 return s;
706 }
707 if (handler_continue && whole_batch &&
708 found != WriteBatchInternal::Count(wb)) {
709 return Status::Corruption("WriteBatch has wrong count");
710 } else {
711 return Status::OK();
712 }
713 }
714
715 bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
716 return b->is_latest_persistent_state_;
717 }
718
719 void WriteBatchInternal::SetAsLatestPersistentState(WriteBatch* b) {
720 b->is_latest_persistent_state_ = true;
721 }
722
723 uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
724 return DecodeFixed32(b->rep_.data() + 8);
725 }
726
727 void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
728 EncodeFixed32(&b->rep_[8], n);
729 }
730
731 SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
732 return SequenceNumber(DecodeFixed64(b->rep_.data()));
733 }
734
735 void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
736 EncodeFixed64(&b->rep_[0], seq);
737 }
738
739 size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
740 return WriteBatchInternal::kHeader;
741 }
742
743 std::tuple<Status, uint32_t, size_t>
744 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(
745 WriteBatch* b, ColumnFamilyHandle* column_family) {
746 uint32_t cf_id = GetColumnFamilyID(column_family);
747 size_t ts_sz = 0;
748 Status s;
749 if (column_family) {
750 const Comparator* const ucmp = column_family->GetComparator();
751 if (ucmp) {
752 ts_sz = ucmp->timestamp_size();
753 if (0 == cf_id && b->default_cf_ts_sz_ != ts_sz) {
754 s = Status::InvalidArgument("Default cf timestamp size mismatch");
755 }
756 }
757 } else if (b->default_cf_ts_sz_ > 0) {
758 ts_sz = b->default_cf_ts_sz_;
759 }
760 return std::make_tuple(s, cf_id, ts_sz);
761 }
762
763 namespace {
764 Status CheckColumnFamilyTimestampSize(ColumnFamilyHandle* column_family,
765 const Slice& ts) {
766 if (!column_family) {
767 return Status::InvalidArgument("column family handle cannot be null");
768 }
769 const Comparator* const ucmp = column_family->GetComparator();
770 assert(ucmp);
771 size_t cf_ts_sz = ucmp->timestamp_size();
772 if (0 == cf_ts_sz) {
773 return Status::InvalidArgument("timestamp disabled");
774 }
775 if (cf_ts_sz != ts.size()) {
776 return Status::InvalidArgument("timestamp size mismatch");
777 }
778 return Status::OK();
779 }
780 } // anonymous namespace
781
782 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
783 const Slice& key, const Slice& value) {
784 if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
785 return Status::InvalidArgument("key is too large");
786 }
787 if (value.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
788 return Status::InvalidArgument("value is too large");
789 }
790
791 LocalSavePoint save(b);
792 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
793 if (column_family_id == 0) {
794 b->rep_.push_back(static_cast<char>(kTypeValue));
795 } else {
796 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
797 PutVarint32(&b->rep_, column_family_id);
798 }
799 PutLengthPrefixedSlice(&b->rep_, key);
800 PutLengthPrefixedSlice(&b->rep_, value);
801 b->content_flags_.store(
802 b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
803 std::memory_order_relaxed);
804 if (b->prot_info_ != nullptr) {
805 // Technically the optype could've been `kTypeColumnFamilyValue` with the
806 // CF ID encoded in the `WriteBatch`. That distinction is unimportant
807 // however since we verify CF ID is correct, as well as all other fields
808 // (a missing/extra encoded CF ID would corrupt another field). It is
809 // convenient to consolidate on `kTypeValue` here as that is what will be
810 // inserted into memtable.
811 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
812 .ProtectKVO(key, value, kTypeValue)
813 .ProtectC(column_family_id));
814 }
815 return save.commit();
816 }
817
818 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
819 const Slice& value) {
820 size_t ts_sz = 0;
821 uint32_t cf_id = 0;
822 Status s;
823
824 std::tie(s, cf_id, ts_sz) =
825 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
826 column_family);
827
828 if (!s.ok()) {
829 return s;
830 }
831
832 if (0 == ts_sz) {
833 return WriteBatchInternal::Put(this, cf_id, key, value);
834 }
835
836 needs_in_place_update_ts_ = true;
837 has_key_with_ts_ = true;
838 std::string dummy_ts(ts_sz, '\0');
839 std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
840 return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2),
841 SliceParts(&value, 1));
842 }
843
844 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
845 const Slice& ts, const Slice& value) {
846 const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
847 if (!s.ok()) {
848 return s;
849 }
850 has_key_with_ts_ = true;
851 assert(column_family);
852 uint32_t cf_id = column_family->GetID();
853 std::array<Slice, 2> key_with_ts{{key, ts}};
854 return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2),
855 SliceParts(&value, 1));
856 }
857
858 Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
859 const SliceParts& value) {
860 size_t total_key_bytes = 0;
861 for (int i = 0; i < key.num_parts; ++i) {
862 total_key_bytes += key.parts[i].size();
863 }
864 if (total_key_bytes >= size_t{std::numeric_limits<uint32_t>::max()}) {
865 return Status::InvalidArgument("key is too large");
866 }
867
868 size_t total_value_bytes = 0;
869 for (int i = 0; i < value.num_parts; ++i) {
870 total_value_bytes += value.parts[i].size();
871 }
872 if (total_value_bytes >= size_t{std::numeric_limits<uint32_t>::max()}) {
873 return Status::InvalidArgument("value is too large");
874 }
875 return Status::OK();
876 }
877
878 Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
879 const SliceParts& key, const SliceParts& value) {
880 Status s = CheckSlicePartsLength(key, value);
881 if (!s.ok()) {
882 return s;
883 }
884
885 LocalSavePoint save(b);
886 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
887 if (column_family_id == 0) {
888 b->rep_.push_back(static_cast<char>(kTypeValue));
889 } else {
890 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
891 PutVarint32(&b->rep_, column_family_id);
892 }
893 PutLengthPrefixedSliceParts(&b->rep_, key);
894 PutLengthPrefixedSliceParts(&b->rep_, value);
895 b->content_flags_.store(
896 b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
897 std::memory_order_relaxed);
898 if (b->prot_info_ != nullptr) {
899 // See comment in first `WriteBatchInternal::Put()` overload concerning the
900 // `ValueType` argument passed to `ProtectKVO()`.
901 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
902 .ProtectKVO(key, value, kTypeValue)
903 .ProtectC(column_family_id));
904 }
905 return save.commit();
906 }
907
908 Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
909 const SliceParts& value) {
910 size_t ts_sz = 0;
911 uint32_t cf_id = 0;
912 Status s;
913
914 std::tie(s, cf_id, ts_sz) =
915 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
916 column_family);
917
918 if (!s.ok()) {
919 return s;
920 }
921
922 if (ts_sz == 0) {
923 return WriteBatchInternal::Put(this, cf_id, key, value);
924 }
925
926 return Status::InvalidArgument(
927 "Cannot call this method on column family enabling timestamp");
928 }
929
930 Status WriteBatchInternal::PutEntity(WriteBatch* b, uint32_t column_family_id,
931 const Slice& key,
932 const WideColumns& columns) {
933 assert(b);
934
935 if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
936 return Status::InvalidArgument("key is too large");
937 }
938
939 WideColumns sorted_columns(columns);
940 std::sort(sorted_columns.begin(), sorted_columns.end(),
941 [](const WideColumn& lhs, const WideColumn& rhs) {
942 return lhs.name().compare(rhs.name()) < 0;
943 });
944
945 std::string entity;
946 const Status s = WideColumnSerialization::Serialize(sorted_columns, entity);
947 if (!s.ok()) {
948 return s;
949 }
950
951 if (entity.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
952 return Status::InvalidArgument("wide column entity is too large");
953 }
954
955 LocalSavePoint save(b);
956
957 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
958
959 if (column_family_id == 0) {
960 b->rep_.push_back(static_cast<char>(kTypeWideColumnEntity));
961 } else {
962 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyWideColumnEntity));
963 PutVarint32(&b->rep_, column_family_id);
964 }
965
966 PutLengthPrefixedSlice(&b->rep_, key);
967 PutLengthPrefixedSlice(&b->rep_, entity);
968
969 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
970 ContentFlags::HAS_PUT_ENTITY,
971 std::memory_order_relaxed);
972
973 if (b->prot_info_ != nullptr) {
974 b->prot_info_->entries_.emplace_back(
975 ProtectionInfo64()
976 .ProtectKVO(key, entity, kTypeWideColumnEntity)
977 .ProtectC(column_family_id));
978 }
979
980 return save.commit();
981 }
982
983 Status WriteBatch::PutEntity(ColumnFamilyHandle* column_family,
984 const Slice& key, const WideColumns& columns) {
985 if (!column_family) {
986 return Status::InvalidArgument(
987 "Cannot call this method without a column family handle");
988 }
989
990 Status s;
991 uint32_t cf_id = 0;
992 size_t ts_sz = 0;
993
994 std::tie(s, cf_id, ts_sz) =
995 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
996 column_family);
997
998 if (!s.ok()) {
999 return s;
1000 }
1001
1002 if (ts_sz) {
1003 return Status::InvalidArgument(
1004 "Cannot call this method on column family enabling timestamp");
1005 }
1006
1007 return WriteBatchInternal::PutEntity(this, cf_id, key, columns);
1008 }
1009
1010 Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
1011 b->rep_.push_back(static_cast<char>(kTypeNoop));
1012 return Status::OK();
1013 }
1014
1015 Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
1016 bool write_after_commit,
1017 bool unprepared_batch) {
1018 // a manually constructed batch can only contain one prepare section
1019 assert(b->rep_[12] == static_cast<char>(kTypeNoop));
1020
1021 // all savepoints up to this point are cleared
1022 if (b->save_points_ != nullptr) {
1023 while (!b->save_points_->stack.empty()) {
1024 b->save_points_->stack.pop();
1025 }
1026 }
1027
1028 // rewrite noop as begin marker
1029 b->rep_[12] = static_cast<char>(
1030 write_after_commit ? kTypeBeginPrepareXID
1031 : (unprepared_batch ? kTypeBeginUnprepareXID
1032 : kTypeBeginPersistedPrepareXID));
1033 b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
1034 PutLengthPrefixedSlice(&b->rep_, xid);
1035 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1036 ContentFlags::HAS_END_PREPARE |
1037 ContentFlags::HAS_BEGIN_PREPARE,
1038 std::memory_order_relaxed);
1039 if (unprepared_batch) {
1040 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1041 ContentFlags::HAS_BEGIN_UNPREPARE,
1042 std::memory_order_relaxed);
1043 }
1044 return Status::OK();
1045 }
1046
1047 Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
1048 b->rep_.push_back(static_cast<char>(kTypeCommitXID));
1049 PutLengthPrefixedSlice(&b->rep_, xid);
1050 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1051 ContentFlags::HAS_COMMIT,
1052 std::memory_order_relaxed);
1053 return Status::OK();
1054 }
1055
1056 Status WriteBatchInternal::MarkCommitWithTimestamp(WriteBatch* b,
1057 const Slice& xid,
1058 const Slice& commit_ts) {
1059 assert(!commit_ts.empty());
1060 b->rep_.push_back(static_cast<char>(kTypeCommitXIDAndTimestamp));
1061 PutLengthPrefixedSlice(&b->rep_, commit_ts);
1062 PutLengthPrefixedSlice(&b->rep_, xid);
1063 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1064 ContentFlags::HAS_COMMIT,
1065 std::memory_order_relaxed);
1066 return Status::OK();
1067 }
1068
1069 Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
1070 b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
1071 PutLengthPrefixedSlice(&b->rep_, xid);
1072 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1073 ContentFlags::HAS_ROLLBACK,
1074 std::memory_order_relaxed);
1075 return Status::OK();
1076 }
1077
1078 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
1079 const Slice& key) {
1080 LocalSavePoint save(b);
1081 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1082 if (column_family_id == 0) {
1083 b->rep_.push_back(static_cast<char>(kTypeDeletion));
1084 } else {
1085 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
1086 PutVarint32(&b->rep_, column_family_id);
1087 }
1088 PutLengthPrefixedSlice(&b->rep_, key);
1089 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1090 ContentFlags::HAS_DELETE,
1091 std::memory_order_relaxed);
1092 if (b->prot_info_ != nullptr) {
1093 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1094 // `ValueType` argument passed to `ProtectKVO()`.
1095 b->prot_info_->entries_.emplace_back(
1096 ProtectionInfo64()
1097 .ProtectKVO(key, "" /* value */, kTypeDeletion)
1098 .ProtectC(column_family_id));
1099 }
1100 return save.commit();
1101 }
1102
1103 Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
1104 size_t ts_sz = 0;
1105 uint32_t cf_id = 0;
1106 Status s;
1107
1108 std::tie(s, cf_id, ts_sz) =
1109 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1110 column_family);
1111
1112 if (!s.ok()) {
1113 return s;
1114 }
1115
1116 if (0 == ts_sz) {
1117 return WriteBatchInternal::Delete(this, cf_id, key);
1118 }
1119
1120 needs_in_place_update_ts_ = true;
1121 has_key_with_ts_ = true;
1122 std::string dummy_ts(ts_sz, '\0');
1123 std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
1124 return WriteBatchInternal::Delete(this, cf_id,
1125 SliceParts(key_with_ts.data(), 2));
1126 }
1127
1128 Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key,
1129 const Slice& ts) {
1130 const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
1131 if (!s.ok()) {
1132 return s;
1133 }
1134 assert(column_family);
1135 has_key_with_ts_ = true;
1136 uint32_t cf_id = column_family->GetID();
1137 std::array<Slice, 2> key_with_ts{{key, ts}};
1138 return WriteBatchInternal::Delete(this, cf_id,
1139 SliceParts(key_with_ts.data(), 2));
1140 }
1141
1142 Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
1143 const SliceParts& key) {
1144 LocalSavePoint save(b);
1145 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1146 if (column_family_id == 0) {
1147 b->rep_.push_back(static_cast<char>(kTypeDeletion));
1148 } else {
1149 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
1150 PutVarint32(&b->rep_, column_family_id);
1151 }
1152 PutLengthPrefixedSliceParts(&b->rep_, key);
1153 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1154 ContentFlags::HAS_DELETE,
1155 std::memory_order_relaxed);
1156 if (b->prot_info_ != nullptr) {
1157 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1158 // `ValueType` argument passed to `ProtectKVO()`.
1159 b->prot_info_->entries_.emplace_back(
1160 ProtectionInfo64()
1161 .ProtectKVO(key,
1162 SliceParts(nullptr /* _parts */, 0 /* _num_parts */),
1163 kTypeDeletion)
1164 .ProtectC(column_family_id));
1165 }
1166 return save.commit();
1167 }
1168
1169 Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
1170 const SliceParts& key) {
1171 size_t ts_sz = 0;
1172 uint32_t cf_id = 0;
1173 Status s;
1174
1175 std::tie(s, cf_id, ts_sz) =
1176 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1177 column_family);
1178
1179 if (!s.ok()) {
1180 return s;
1181 }
1182
1183 if (0 == ts_sz) {
1184 return WriteBatchInternal::Delete(this, cf_id, key);
1185 }
1186
1187 return Status::InvalidArgument(
1188 "Cannot call this method on column family enabling timestamp");
1189 }
1190
1191 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
1192 uint32_t column_family_id,
1193 const Slice& key) {
1194 LocalSavePoint save(b);
1195 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1196 if (column_family_id == 0) {
1197 b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
1198 } else {
1199 b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
1200 PutVarint32(&b->rep_, column_family_id);
1201 }
1202 PutLengthPrefixedSlice(&b->rep_, key);
1203 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1204 ContentFlags::HAS_SINGLE_DELETE,
1205 std::memory_order_relaxed);
1206 if (b->prot_info_ != nullptr) {
1207 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1208 // `ValueType` argument passed to `ProtectKVO()`.
1209 b->prot_info_->entries_.emplace_back(
1210 ProtectionInfo64()
1211 .ProtectKVO(key, "" /* value */, kTypeSingleDeletion)
1212 .ProtectC(column_family_id));
1213 }
1214 return save.commit();
1215 }
1216
1217 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1218 const Slice& key) {
1219 size_t ts_sz = 0;
1220 uint32_t cf_id = 0;
1221 Status s;
1222
1223 std::tie(s, cf_id, ts_sz) =
1224 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1225 column_family);
1226
1227 if (!s.ok()) {
1228 return s;
1229 }
1230
1231 if (0 == ts_sz) {
1232 return WriteBatchInternal::SingleDelete(this, cf_id, key);
1233 }
1234
1235 needs_in_place_update_ts_ = true;
1236 has_key_with_ts_ = true;
1237 std::string dummy_ts(ts_sz, '\0');
1238 std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
1239 return WriteBatchInternal::SingleDelete(this, cf_id,
1240 SliceParts(key_with_ts.data(), 2));
1241 }
1242
1243 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1244 const Slice& key, const Slice& ts) {
1245 const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
1246 if (!s.ok()) {
1247 return s;
1248 }
1249 has_key_with_ts_ = true;
1250 assert(column_family);
1251 uint32_t cf_id = column_family->GetID();
1252 std::array<Slice, 2> key_with_ts{{key, ts}};
1253 return WriteBatchInternal::SingleDelete(this, cf_id,
1254 SliceParts(key_with_ts.data(), 2));
1255 }
1256
1257 Status WriteBatchInternal::SingleDelete(WriteBatch* b,
1258 uint32_t column_family_id,
1259 const SliceParts& key) {
1260 LocalSavePoint save(b);
1261 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1262 if (column_family_id == 0) {
1263 b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
1264 } else {
1265 b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
1266 PutVarint32(&b->rep_, column_family_id);
1267 }
1268 PutLengthPrefixedSliceParts(&b->rep_, key);
1269 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1270 ContentFlags::HAS_SINGLE_DELETE,
1271 std::memory_order_relaxed);
1272 if (b->prot_info_ != nullptr) {
1273 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1274 // `ValueType` argument passed to `ProtectKVO()`.
1275 b->prot_info_->entries_.emplace_back(
1276 ProtectionInfo64()
1277 .ProtectKVO(key,
1278 SliceParts(nullptr /* _parts */,
1279 0 /* _num_parts */) /* value */,
1280 kTypeSingleDeletion)
1281 .ProtectC(column_family_id));
1282 }
1283 return save.commit();
1284 }
1285
1286 Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
1287 const SliceParts& key) {
1288 size_t ts_sz = 0;
1289 uint32_t cf_id = 0;
1290 Status s;
1291
1292 std::tie(s, cf_id, ts_sz) =
1293 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1294 column_family);
1295
1296 if (!s.ok()) {
1297 return s;
1298 }
1299
1300 if (0 == ts_sz) {
1301 return WriteBatchInternal::SingleDelete(this, cf_id, key);
1302 }
1303
1304 return Status::InvalidArgument(
1305 "Cannot call this method on column family enabling timestamp");
1306 }
1307
1308 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1309 const Slice& begin_key,
1310 const Slice& end_key) {
1311 LocalSavePoint save(b);
1312 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1313 if (column_family_id == 0) {
1314 b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1315 } else {
1316 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1317 PutVarint32(&b->rep_, column_family_id);
1318 }
1319 PutLengthPrefixedSlice(&b->rep_, begin_key);
1320 PutLengthPrefixedSlice(&b->rep_, end_key);
1321 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1322 ContentFlags::HAS_DELETE_RANGE,
1323 std::memory_order_relaxed);
1324 if (b->prot_info_ != nullptr) {
1325 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1326 // `ValueType` argument passed to `ProtectKVO()`.
1327 // In `DeleteRange()`, the end key is treated as the value.
1328 b->prot_info_->entries_.emplace_back(
1329 ProtectionInfo64()
1330 .ProtectKVO(begin_key, end_key, kTypeRangeDeletion)
1331 .ProtectC(column_family_id));
1332 }
1333 return save.commit();
1334 }
1335
1336 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1337 const Slice& begin_key, const Slice& end_key) {
1338 size_t ts_sz = 0;
1339 uint32_t cf_id = 0;
1340 Status s;
1341
1342 std::tie(s, cf_id, ts_sz) =
1343 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1344 column_family);
1345
1346 if (!s.ok()) {
1347 return s;
1348 }
1349
1350 if (0 == ts_sz) {
1351 return WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key);
1352 }
1353
1354 needs_in_place_update_ts_ = true;
1355 has_key_with_ts_ = true;
1356 std::string dummy_ts(ts_sz, '\0');
1357 std::array<Slice, 2> begin_key_with_ts{{begin_key, dummy_ts}};
1358 std::array<Slice, 2> end_key_with_ts{{end_key, dummy_ts}};
1359 return WriteBatchInternal::DeleteRange(
1360 this, cf_id, SliceParts(begin_key_with_ts.data(), 2),
1361 SliceParts(end_key_with_ts.data(), 2));
1362 }
1363
1364 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1365 const Slice& begin_key, const Slice& end_key,
1366 const Slice& ts) {
1367 const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
1368 if (!s.ok()) {
1369 return s;
1370 }
1371 assert(column_family);
1372 has_key_with_ts_ = true;
1373 uint32_t cf_id = column_family->GetID();
1374 std::array<Slice, 2> key_with_ts{{begin_key, ts}};
1375 std::array<Slice, 2> end_key_with_ts{{end_key, ts}};
1376 return WriteBatchInternal::DeleteRange(this, cf_id,
1377 SliceParts(key_with_ts.data(), 2),
1378 SliceParts(end_key_with_ts.data(), 2));
1379 }
1380
1381 Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
1382 const SliceParts& begin_key,
1383 const SliceParts& end_key) {
1384 LocalSavePoint save(b);
1385 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1386 if (column_family_id == 0) {
1387 b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
1388 } else {
1389 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
1390 PutVarint32(&b->rep_, column_family_id);
1391 }
1392 PutLengthPrefixedSliceParts(&b->rep_, begin_key);
1393 PutLengthPrefixedSliceParts(&b->rep_, end_key);
1394 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1395 ContentFlags::HAS_DELETE_RANGE,
1396 std::memory_order_relaxed);
1397 if (b->prot_info_ != nullptr) {
1398 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1399 // `ValueType` argument passed to `ProtectKVO()`.
1400 // In `DeleteRange()`, the end key is treated as the value.
1401 b->prot_info_->entries_.emplace_back(
1402 ProtectionInfo64()
1403 .ProtectKVO(begin_key, end_key, kTypeRangeDeletion)
1404 .ProtectC(column_family_id));
1405 }
1406 return save.commit();
1407 }
1408
1409 Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
1410 const SliceParts& begin_key,
1411 const SliceParts& end_key) {
1412 size_t ts_sz = 0;
1413 uint32_t cf_id = 0;
1414 Status s;
1415
1416 std::tie(s, cf_id, ts_sz) =
1417 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1418 column_family);
1419
1420 if (!s.ok()) {
1421 return s;
1422 }
1423
1424 if (0 == ts_sz) {
1425 return WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key);
1426 }
1427
1428 return Status::InvalidArgument(
1429 "Cannot call this method on column family enabling timestamp");
1430 }
1431
1432 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1433 const Slice& key, const Slice& value) {
1434 if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
1435 return Status::InvalidArgument("key is too large");
1436 }
1437 if (value.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
1438 return Status::InvalidArgument("value is too large");
1439 }
1440
1441 LocalSavePoint save(b);
1442 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1443 if (column_family_id == 0) {
1444 b->rep_.push_back(static_cast<char>(kTypeMerge));
1445 } else {
1446 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1447 PutVarint32(&b->rep_, column_family_id);
1448 }
1449 PutLengthPrefixedSlice(&b->rep_, key);
1450 PutLengthPrefixedSlice(&b->rep_, value);
1451 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1452 ContentFlags::HAS_MERGE,
1453 std::memory_order_relaxed);
1454 if (b->prot_info_ != nullptr) {
1455 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1456 // `ValueType` argument passed to `ProtectKVO()`.
1457 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1458 .ProtectKVO(key, value, kTypeMerge)
1459 .ProtectC(column_family_id));
1460 }
1461 return save.commit();
1462 }
1463
1464 Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
1465 const Slice& value) {
1466 size_t ts_sz = 0;
1467 uint32_t cf_id = 0;
1468 Status s;
1469
1470 std::tie(s, cf_id, ts_sz) =
1471 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1472 column_family);
1473
1474 if (!s.ok()) {
1475 return s;
1476 }
1477
1478 if (0 == ts_sz) {
1479 return WriteBatchInternal::Merge(this, cf_id, key, value);
1480 }
1481
1482 needs_in_place_update_ts_ = true;
1483 has_key_with_ts_ = true;
1484 std::string dummy_ts(ts_sz, '\0');
1485 std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
1486
1487 return WriteBatchInternal::Merge(
1488 this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1));
1489 }
1490
1491 Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
1492 const Slice& ts, const Slice& value) {
1493 const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
1494 if (!s.ok()) {
1495 return s;
1496 }
1497 has_key_with_ts_ = true;
1498 assert(column_family);
1499 uint32_t cf_id = column_family->GetID();
1500 std::array<Slice, 2> key_with_ts{{key, ts}};
1501 return WriteBatchInternal::Merge(
1502 this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1));
1503 }
1504
1505 Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
1506 const SliceParts& key,
1507 const SliceParts& value) {
1508 Status s = CheckSlicePartsLength(key, value);
1509 if (!s.ok()) {
1510 return s;
1511 }
1512
1513 LocalSavePoint save(b);
1514 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1515 if (column_family_id == 0) {
1516 b->rep_.push_back(static_cast<char>(kTypeMerge));
1517 } else {
1518 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
1519 PutVarint32(&b->rep_, column_family_id);
1520 }
1521 PutLengthPrefixedSliceParts(&b->rep_, key);
1522 PutLengthPrefixedSliceParts(&b->rep_, value);
1523 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1524 ContentFlags::HAS_MERGE,
1525 std::memory_order_relaxed);
1526 if (b->prot_info_ != nullptr) {
1527 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1528 // `ValueType` argument passed to `ProtectKVO()`.
1529 b->prot_info_->entries_.emplace_back(ProtectionInfo64()
1530 .ProtectKVO(key, value, kTypeMerge)
1531 .ProtectC(column_family_id));
1532 }
1533 return save.commit();
1534 }
1535
1536 Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
1537 const SliceParts& key, const SliceParts& value) {
1538 size_t ts_sz = 0;
1539 uint32_t cf_id = 0;
1540 Status s;
1541
1542 std::tie(s, cf_id, ts_sz) =
1543 WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
1544 column_family);
1545
1546 if (!s.ok()) {
1547 return s;
1548 }
1549
1550 if (0 == ts_sz) {
1551 return WriteBatchInternal::Merge(this, cf_id, key, value);
1552 }
1553
1554 return Status::InvalidArgument(
1555 "Cannot call this method on column family enabling timestamp");
1556 }
1557
1558 Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
1559 uint32_t column_family_id,
1560 const Slice& key, const Slice& value) {
1561 LocalSavePoint save(b);
1562 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
1563 if (column_family_id == 0) {
1564 b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
1565 } else {
1566 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
1567 PutVarint32(&b->rep_, column_family_id);
1568 }
1569 PutLengthPrefixedSlice(&b->rep_, key);
1570 PutLengthPrefixedSlice(&b->rep_, value);
1571 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
1572 ContentFlags::HAS_BLOB_INDEX,
1573 std::memory_order_relaxed);
1574 if (b->prot_info_ != nullptr) {
1575 // See comment in first `WriteBatchInternal::Put()` overload concerning the
1576 // `ValueType` argument passed to `ProtectKVO()`.
1577 b->prot_info_->entries_.emplace_back(
1578 ProtectionInfo64()
1579 .ProtectKVO(key, value, kTypeBlobIndex)
1580 .ProtectC(column_family_id));
1581 }
1582 return save.commit();
1583 }
1584
1585 Status WriteBatch::PutLogData(const Slice& blob) {
1586 LocalSavePoint save(this);
1587 rep_.push_back(static_cast<char>(kTypeLogData));
1588 PutLengthPrefixedSlice(&rep_, blob);
1589 return save.commit();
1590 }
1591
1592 void WriteBatch::SetSavePoint() {
1593 if (save_points_ == nullptr) {
1594 save_points_.reset(new SavePoints());
1595 }
1596 // Record length and count of current batch of writes.
1597 save_points_->stack.push(SavePoint(
1598 GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
1599 }
1600
1601 Status WriteBatch::RollbackToSavePoint() {
1602 if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1603 return Status::NotFound();
1604 }
1605
1606 // Pop the most recent savepoint off the stack
1607 SavePoint savepoint = save_points_->stack.top();
1608 save_points_->stack.pop();
1609
1610 assert(savepoint.size <= rep_.size());
1611 assert(static_cast<uint32_t>(savepoint.count) <= Count());
1612
1613 if (savepoint.size == rep_.size()) {
1614 // No changes to rollback
1615 } else if (savepoint.size == 0) {
1616 // Rollback everything
1617 Clear();
1618 } else {
1619 rep_.resize(savepoint.size);
1620 if (prot_info_ != nullptr) {
1621 prot_info_->entries_.resize(savepoint.count);
1622 }
1623 WriteBatchInternal::SetCount(this, savepoint.count);
1624 content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
1625 }
1626
1627 return Status::OK();
1628 }
1629
1630 Status WriteBatch::PopSavePoint() {
1631 if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1632 return Status::NotFound();
1633 }
1634
1635 // Pop the most recent savepoint off the stack
1636 save_points_->stack.pop();
1637
1638 return Status::OK();
1639 }
1640
1641 Status WriteBatch::UpdateTimestamps(
1642 const Slice& ts, std::function<size_t(uint32_t)> ts_sz_func) {
1643 TimestampUpdater<decltype(ts_sz_func)> ts_updater(prot_info_.get(),
1644 std::move(ts_sz_func), ts);
1645 const Status s = Iterate(&ts_updater);
1646 if (s.ok()) {
1647 needs_in_place_update_ts_ = false;
1648 }
1649 return s;
1650 }
1651
1652 Status WriteBatch::VerifyChecksum() const {
1653 if (prot_info_ == nullptr) {
1654 return Status::OK();
1655 }
1656 Slice input(rep_.data() + WriteBatchInternal::kHeader,
1657 rep_.size() - WriteBatchInternal::kHeader);
1658 Slice key, value, blob, xid;
1659 char tag = 0;
1660 uint32_t column_family = 0; // default
1661 Status s;
1662 size_t prot_info_idx = 0;
1663 bool checksum_protected = true;
1664 while (!input.empty() && prot_info_idx < prot_info_->entries_.size()) {
1665 // In case key/value/column_family are not updated by
1666 // ReadRecordFromWriteBatch
1667 key.clear();
1668 value.clear();
1669 column_family = 0;
1670 s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
1671 &blob, &xid);
1672 if (!s.ok()) {
1673 return s;
1674 }
1675 checksum_protected = true;
1676 // Write batch checksum uses op_type without ColumnFamily (e.g., if op_type
1677 // in the write batch is kTypeColumnFamilyValue, kTypeValue is used to
1678 // compute the checksum), and encodes column family id separately. See
1679 // comment in first `WriteBatchInternal::Put()` for more detail.
1680 switch (tag) {
1681 case kTypeColumnFamilyValue:
1682 case kTypeValue:
1683 tag = kTypeValue;
1684 break;
1685 case kTypeColumnFamilyDeletion:
1686 case kTypeDeletion:
1687 tag = kTypeDeletion;
1688 break;
1689 case kTypeColumnFamilySingleDeletion:
1690 case kTypeSingleDeletion:
1691 tag = kTypeSingleDeletion;
1692 break;
1693 case kTypeColumnFamilyRangeDeletion:
1694 case kTypeRangeDeletion:
1695 tag = kTypeRangeDeletion;
1696 break;
1697 case kTypeColumnFamilyMerge:
1698 case kTypeMerge:
1699 tag = kTypeMerge;
1700 break;
1701 case kTypeColumnFamilyBlobIndex:
1702 case kTypeBlobIndex:
1703 tag = kTypeBlobIndex;
1704 break;
1705 case kTypeLogData:
1706 case kTypeBeginPrepareXID:
1707 case kTypeEndPrepareXID:
1708 case kTypeCommitXID:
1709 case kTypeRollbackXID:
1710 case kTypeNoop:
1711 case kTypeBeginPersistedPrepareXID:
1712 case kTypeBeginUnprepareXID:
1713 case kTypeDeletionWithTimestamp:
1714 case kTypeCommitXIDAndTimestamp:
1715 checksum_protected = false;
1716 break;
1717 case kTypeColumnFamilyWideColumnEntity:
1718 case kTypeWideColumnEntity:
1719 tag = kTypeWideColumnEntity;
1720 break;
1721 default:
1722 return Status::Corruption(
1723 "unknown WriteBatch tag",
1724 std::to_string(static_cast<unsigned int>(tag)));
1725 }
1726 if (checksum_protected) {
1727 s = prot_info_->entries_[prot_info_idx++]
1728 .StripC(column_family)
1729 .StripKVO(key, value, static_cast<ValueType>(tag))
1730 .GetStatus();
1731 if (!s.ok()) {
1732 return s;
1733 }
1734 }
1735 }
1736
1737 if (prot_info_idx != WriteBatchInternal::Count(this)) {
1738 return Status::Corruption("WriteBatch has wrong count");
1739 }
1740 assert(WriteBatchInternal::Count(this) == prot_info_->entries_.size());
1741 return Status::OK();
1742 }
1743
1744 namespace {
1745
1746 class MemTableInserter : public WriteBatch::Handler {
1747 SequenceNumber sequence_;
1748 ColumnFamilyMemTables* const cf_mems_;
1749 FlushScheduler* const flush_scheduler_;
1750 TrimHistoryScheduler* const trim_history_scheduler_;
1751 const bool ignore_missing_column_families_;
1752 const uint64_t recovering_log_number_;
1753 // log number that all Memtables inserted into should reference
1754 uint64_t log_number_ref_;
1755 DBImpl* db_;
1756 const bool concurrent_memtable_writes_;
1757 bool post_info_created_;
1758 const WriteBatch::ProtectionInfo* prot_info_;
1759 size_t prot_info_idx_;
1760
1761 bool* has_valid_writes_;
1762 // On some (!) platforms just default creating
1763 // a map is too expensive in the Write() path as they
1764 // cause memory allocations though unused.
1765 // Make creation optional but do not incur
1766 // std::unique_ptr additional allocation
1767 using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
1768 using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
1769 PostMapType mem_post_info_map_;
1770 // current recovered transaction we are rebuilding (recovery)
1771 WriteBatch* rebuilding_trx_;
1772 SequenceNumber rebuilding_trx_seq_;
1773 // Increase seq number once per each write batch. Otherwise increase it once
1774 // per key.
1775 bool seq_per_batch_;
1776 // Whether the memtable write will be done only after the commit
1777 bool write_after_commit_;
1778 // Whether memtable write can be done before prepare
1779 bool write_before_prepare_;
1780 // Whether this batch was unprepared or not
1781 bool unprepared_batch_;
1782 using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
1783 DupDetector duplicate_detector_;
1784 bool dup_dectector_on_;
1785
1786 bool hint_per_batch_;
1787 bool hint_created_;
1788 // Hints for this batch
1789 using HintMap = std::unordered_map<MemTable*, void*>;
1790 using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
1791 HintMapType hint_;
1792
1793 HintMap& GetHintMap() {
1794 assert(hint_per_batch_);
1795 if (!hint_created_) {
1796 new (&hint_) HintMap();
1797 hint_created_ = true;
1798 }
1799 return *reinterpret_cast<HintMap*>(&hint_);
1800 }
1801
1802 MemPostInfoMap& GetPostMap() {
1803 assert(concurrent_memtable_writes_);
1804 if (!post_info_created_) {
1805 new (&mem_post_info_map_) MemPostInfoMap();
1806 post_info_created_ = true;
1807 }
1808 return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
1809 }
1810
1811 bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
1812 assert(!write_after_commit_);
1813 assert(rebuilding_trx_ != nullptr);
1814 if (!dup_dectector_on_) {
1815 new (&duplicate_detector_) DuplicateDetector(db_);
1816 dup_dectector_on_ = true;
1817 }
1818 return reinterpret_cast<DuplicateDetector*>(&duplicate_detector_)
1819 ->IsDuplicateKeySeq(column_family_id, key, sequence_);
1820 }
1821
1822 const ProtectionInfoKVOC64* NextProtectionInfo() {
1823 const ProtectionInfoKVOC64* res = nullptr;
1824 if (prot_info_ != nullptr) {
1825 assert(prot_info_idx_ < prot_info_->entries_.size());
1826 res = &prot_info_->entries_[prot_info_idx_];
1827 ++prot_info_idx_;
1828 }
1829 return res;
1830 }
1831
1832 void DecrementProtectionInfoIdxForTryAgain() {
1833 if (prot_info_ != nullptr) --prot_info_idx_;
1834 }
1835
1836 void ResetProtectionInfo() {
1837 prot_info_idx_ = 0;
1838 prot_info_ = nullptr;
1839 }
1840
1841 protected:
1842 Handler::OptionState WriteBeforePrepare() const override {
1843 return write_before_prepare_ ? Handler::OptionState::kEnabled
1844 : Handler::OptionState::kDisabled;
1845 }
1846 Handler::OptionState WriteAfterCommit() const override {
1847 return write_after_commit_ ? Handler::OptionState::kEnabled
1848 : Handler::OptionState::kDisabled;
1849 }
1850
1851 public:
1852 // cf_mems should not be shared with concurrent inserters
1853 MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
1854 FlushScheduler* flush_scheduler,
1855 TrimHistoryScheduler* trim_history_scheduler,
1856 bool ignore_missing_column_families,
1857 uint64_t recovering_log_number, DB* db,
1858 bool concurrent_memtable_writes,
1859 const WriteBatch::ProtectionInfo* prot_info,
1860 bool* has_valid_writes = nullptr, bool seq_per_batch = false,
1861 bool batch_per_txn = true, bool hint_per_batch = false)
1862 : sequence_(_sequence),
1863 cf_mems_(cf_mems),
1864 flush_scheduler_(flush_scheduler),
1865 trim_history_scheduler_(trim_history_scheduler),
1866 ignore_missing_column_families_(ignore_missing_column_families),
1867 recovering_log_number_(recovering_log_number),
1868 log_number_ref_(0),
1869 db_(static_cast_with_check<DBImpl>(db)),
1870 concurrent_memtable_writes_(concurrent_memtable_writes),
1871 post_info_created_(false),
1872 prot_info_(prot_info),
1873 prot_info_idx_(0),
1874 has_valid_writes_(has_valid_writes),
1875 rebuilding_trx_(nullptr),
1876 rebuilding_trx_seq_(0),
1877 seq_per_batch_(seq_per_batch),
1878 // Write after commit currently uses one seq per key (instead of per
1879 // batch). So seq_per_batch being false indicates write_after_commit
1880 // approach.
1881 write_after_commit_(!seq_per_batch),
1882 // WriteUnprepared can write WriteBatches per transaction, so
1883 // batch_per_txn being false indicates write_before_prepare.
1884 write_before_prepare_(!batch_per_txn),
1885 unprepared_batch_(false),
1886 duplicate_detector_(),
1887 dup_dectector_on_(false),
1888 hint_per_batch_(hint_per_batch),
1889 hint_created_(false) {
1890 assert(cf_mems_);
1891 }
1892
1893 ~MemTableInserter() override {
1894 if (dup_dectector_on_) {
1895 reinterpret_cast<DuplicateDetector*>(&duplicate_detector_)
1896 ->~DuplicateDetector();
1897 }
1898 if (post_info_created_) {
1899 reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_)->~MemPostInfoMap();
1900 }
1901 if (hint_created_) {
1902 for (auto iter : GetHintMap()) {
1903 delete[] reinterpret_cast<char*>(iter.second);
1904 }
1905 reinterpret_cast<HintMap*>(&hint_)->~HintMap();
1906 }
1907 delete rebuilding_trx_;
1908 }
1909
1910 MemTableInserter(const MemTableInserter&) = delete;
1911 MemTableInserter& operator=(const MemTableInserter&) = delete;
1912
1913 // The batch seq is regularly restarted; In normal mode it is set when
1914 // MemTableInserter is constructed in the write thread and in recovery mode it
1915 // is set when a batch, which is tagged with seq, is read from the WAL.
1916 // Within a sequenced batch, which could be a merge of multiple batches, we
1917 // have two policies to advance the seq: i) seq_per_key (default) and ii)
1918 // seq_per_batch. To implement the latter we need to mark the boundary between
1919 // the individual batches. The approach is this: 1) Use the terminating
1920 // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
1921 // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
1922 // natural boundary marker.
1923 void MaybeAdvanceSeq(bool batch_boundry = false) {
1924 if (batch_boundry == seq_per_batch_) {
1925 sequence_++;
1926 }
1927 }
1928
1929 void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
1930 void set_prot_info(const WriteBatch::ProtectionInfo* prot_info) {
1931 prot_info_ = prot_info;
1932 prot_info_idx_ = 0;
1933 }
1934
1935 SequenceNumber sequence() const { return sequence_; }
1936
1937 void PostProcess() {
1938 assert(concurrent_memtable_writes_);
1939 // If post info was not created there is nothing
1940 // to process and no need to create on demand
1941 if (post_info_created_) {
1942 for (auto& pair : GetPostMap()) {
1943 pair.first->BatchPostProcess(pair.second);
1944 }
1945 }
1946 }
1947
1948 bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
1949 // If we are in a concurrent mode, it is the caller's responsibility
1950 // to clone the original ColumnFamilyMemTables so that each thread
1951 // has its own instance. Otherwise, it must be guaranteed that there
1952 // is no concurrent access
1953 bool found = cf_mems_->Seek(column_family_id);
1954 if (!found) {
1955 if (ignore_missing_column_families_) {
1956 *s = Status::OK();
1957 } else {
1958 *s = Status::InvalidArgument(
1959 "Invalid column family specified in write batch");
1960 }
1961 return false;
1962 }
1963 if (recovering_log_number_ != 0 &&
1964 recovering_log_number_ < cf_mems_->GetLogNumber()) {
1965 // This is true only in recovery environment (recovering_log_number_ is
1966 // always 0 in
1967 // non-recovery, regular write code-path)
1968 // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
1969 // column family already contains updates from this log. We can't apply
1970 // updates twice because of update-in-place or merge workloads -- ignore
1971 // the update
1972 *s = Status::OK();
1973 return false;
1974 }
1975
1976 if (has_valid_writes_ != nullptr) {
1977 *has_valid_writes_ = true;
1978 }
1979
1980 if (log_number_ref_ > 0) {
1981 cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
1982 }
1983
1984 return true;
1985 }
1986
1987 Status PutCFImpl(uint32_t column_family_id, const Slice& key,
1988 const Slice& value, ValueType value_type,
1989 const ProtectionInfoKVOS64* kv_prot_info) {
1990 // optimize for non-recovery mode
1991 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
1992 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
1993 return WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key,
1994 value);
1995 // else insert the values to the memtable right away
1996 }
1997
1998 Status ret_status;
1999 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
2000 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
2001 assert(!write_after_commit_);
2002 // The CF is probably flushed and hence no need for insert but we still
2003 // need to keep track of the keys for upcoming rollback/commit.
2004 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2005 ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
2006 key, value);
2007 if (ret_status.ok()) {
2008 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
2009 }
2010 } else if (ret_status.ok()) {
2011 MaybeAdvanceSeq(false /* batch_boundary */);
2012 }
2013 return ret_status;
2014 }
2015 assert(ret_status.ok());
2016
2017 MemTable* mem = cf_mems_->GetMemTable();
2018 auto* moptions = mem->GetImmutableMemTableOptions();
2019 // inplace_update_support is inconsistent with snapshots, and therefore with
2020 // any kind of transactions including the ones that use seq_per_batch
2021 assert(!seq_per_batch_ || !moptions->inplace_update_support);
2022 if (!moptions->inplace_update_support) {
2023 ret_status =
2024 mem->Add(sequence_, value_type, key, value, kv_prot_info,
2025 concurrent_memtable_writes_, get_post_process_info(mem),
2026 hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
2027 } else if (moptions->inplace_callback == nullptr ||
2028 value_type != kTypeValue) {
2029 assert(!concurrent_memtable_writes_);
2030 ret_status = mem->Update(sequence_, value_type, key, value, kv_prot_info);
2031 } else {
2032 assert(!concurrent_memtable_writes_);
2033 assert(value_type == kTypeValue);
2034 ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info);
2035 if (ret_status.IsNotFound()) {
2036 // key not found in memtable. Do sst get, update, add
2037 SnapshotImpl read_from_snapshot;
2038 read_from_snapshot.number_ = sequence_;
2039 ReadOptions ropts;
2040 // it's going to be overwritten for sure, so no point caching data block
2041 // containing the old version
2042 ropts.fill_cache = false;
2043 ropts.snapshot = &read_from_snapshot;
2044
2045 std::string prev_value;
2046 std::string merged_value;
2047
2048 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
2049 Status get_status = Status::NotSupported();
2050 if (db_ != nullptr && recovering_log_number_ == 0) {
2051 if (cf_handle == nullptr) {
2052 cf_handle = db_->DefaultColumnFamily();
2053 }
2054 // TODO (yanqin): fix when user-defined timestamp is enabled.
2055 get_status = db_->Get(ropts, cf_handle, key, &prev_value);
2056 }
2057 // Intentionally overwrites the `NotFound` in `ret_status`.
2058 if (!get_status.ok() && !get_status.IsNotFound()) {
2059 ret_status = get_status;
2060 } else {
2061 ret_status = Status::OK();
2062 }
2063 if (ret_status.ok()) {
2064 UpdateStatus update_status;
2065 char* prev_buffer = const_cast<char*>(prev_value.c_str());
2066 uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
2067 if (get_status.ok()) {
2068 update_status = moptions->inplace_callback(prev_buffer, &prev_size,
2069 value, &merged_value);
2070 } else {
2071 update_status = moptions->inplace_callback(
2072 nullptr /* existing_value */, nullptr /* existing_value_size */,
2073 value, &merged_value);
2074 }
2075 if (update_status == UpdateStatus::UPDATED_INPLACE) {
2076 assert(get_status.ok());
2077 if (kv_prot_info != nullptr) {
2078 ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
2079 updated_kv_prot_info.UpdateV(value,
2080 Slice(prev_buffer, prev_size));
2081 // prev_value is updated in-place with final value.
2082 ret_status = mem->Add(sequence_, value_type, key,
2083 Slice(prev_buffer, prev_size),
2084 &updated_kv_prot_info);
2085 } else {
2086 ret_status = mem->Add(sequence_, value_type, key,
2087 Slice(prev_buffer, prev_size),
2088 nullptr /* kv_prot_info */);
2089 }
2090 if (ret_status.ok()) {
2091 RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
2092 }
2093 } else if (update_status == UpdateStatus::UPDATED) {
2094 if (kv_prot_info != nullptr) {
2095 ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
2096 updated_kv_prot_info.UpdateV(value, merged_value);
2097 // merged_value contains the final value.
2098 ret_status = mem->Add(sequence_, value_type, key,
2099 Slice(merged_value), &updated_kv_prot_info);
2100 } else {
2101 // merged_value contains the final value.
2102 ret_status =
2103 mem->Add(sequence_, value_type, key, Slice(merged_value),
2104 nullptr /* kv_prot_info */);
2105 }
2106 if (ret_status.ok()) {
2107 RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
2108 }
2109 }
2110 }
2111 }
2112 }
2113 if (UNLIKELY(ret_status.IsTryAgain())) {
2114 assert(seq_per_batch_);
2115 const bool kBatchBoundary = true;
2116 MaybeAdvanceSeq(kBatchBoundary);
2117 } else if (ret_status.ok()) {
2118 MaybeAdvanceSeq();
2119 CheckMemtableFull();
2120 }
2121 // optimize for non-recovery mode
2122 // If `ret_status` is `TryAgain` then the next (successful) try will add
2123 // the key to the rebuilding transaction object. If `ret_status` is
2124 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2125 // away. So we only need to add to it when `ret_status.ok()`.
2126 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
2127 assert(!write_after_commit_);
2128 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2129 ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
2130 key, value);
2131 }
2132 return ret_status;
2133 }
2134
2135 Status PutCF(uint32_t column_family_id, const Slice& key,
2136 const Slice& value) override {
2137 const auto* kv_prot_info = NextProtectionInfo();
2138 Status ret_status;
2139 if (kv_prot_info != nullptr) {
2140 // Memtable needs seqno, doesn't need CF ID
2141 auto mem_kv_prot_info =
2142 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2143 ret_status = PutCFImpl(column_family_id, key, value, kTypeValue,
2144 &mem_kv_prot_info);
2145 } else {
2146 ret_status = PutCFImpl(column_family_id, key, value, kTypeValue,
2147 nullptr /* kv_prot_info */);
2148 }
2149 // TODO: this assumes that if TryAgain status is returned to the caller,
2150 // the operation is actually tried again. The proper way to do this is to
2151 // pass a `try_again` parameter to the operation itself and decrement
2152 // prot_info_idx_ based on that
2153 if (UNLIKELY(ret_status.IsTryAgain())) {
2154 DecrementProtectionInfoIdxForTryAgain();
2155 }
2156 return ret_status;
2157 }
2158
2159 Status PutEntityCF(uint32_t column_family_id, const Slice& key,
2160 const Slice& value) override {
2161 const auto* kv_prot_info = NextProtectionInfo();
2162
2163 Status s;
2164 if (kv_prot_info) {
2165 // Memtable needs seqno, doesn't need CF ID
2166 auto mem_kv_prot_info =
2167 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2168 s = PutCFImpl(column_family_id, key, value, kTypeWideColumnEntity,
2169 &mem_kv_prot_info);
2170 } else {
2171 s = PutCFImpl(column_family_id, key, value, kTypeWideColumnEntity,
2172 /* kv_prot_info */ nullptr);
2173 }
2174
2175 if (UNLIKELY(s.IsTryAgain())) {
2176 DecrementProtectionInfoIdxForTryAgain();
2177 }
2178
2179 return s;
2180 }
2181
2182 Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
2183 const Slice& value, ValueType delete_type,
2184 const ProtectionInfoKVOS64* kv_prot_info) {
2185 Status ret_status;
2186 MemTable* mem = cf_mems_->GetMemTable();
2187 ret_status =
2188 mem->Add(sequence_, delete_type, key, value, kv_prot_info,
2189 concurrent_memtable_writes_, get_post_process_info(mem),
2190 hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
2191 if (UNLIKELY(ret_status.IsTryAgain())) {
2192 assert(seq_per_batch_);
2193 const bool kBatchBoundary = true;
2194 MaybeAdvanceSeq(kBatchBoundary);
2195 } else if (ret_status.ok()) {
2196 MaybeAdvanceSeq();
2197 CheckMemtableFull();
2198 }
2199 return ret_status;
2200 }
2201
2202 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
2203 const auto* kv_prot_info = NextProtectionInfo();
2204 // optimize for non-recovery mode
2205 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
2206 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2207 return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
2208 // else insert the values to the memtable right away
2209 }
2210
2211 Status ret_status;
2212 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
2213 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
2214 assert(!write_after_commit_);
2215 // The CF is probably flushed and hence no need for insert but we still
2216 // need to keep track of the keys for upcoming rollback/commit.
2217 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2218 ret_status =
2219 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
2220 if (ret_status.ok()) {
2221 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
2222 }
2223 } else if (ret_status.ok()) {
2224 MaybeAdvanceSeq(false /* batch_boundary */);
2225 }
2226 if (UNLIKELY(ret_status.IsTryAgain())) {
2227 DecrementProtectionInfoIdxForTryAgain();
2228 }
2229 return ret_status;
2230 }
2231
2232 ColumnFamilyData* cfd = cf_mems_->current();
2233 assert(!cfd || cfd->user_comparator());
2234 const size_t ts_sz = (cfd && cfd->user_comparator())
2235 ? cfd->user_comparator()->timestamp_size()
2236 : 0;
2237 const ValueType delete_type =
2238 (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
2239 if (kv_prot_info != nullptr) {
2240 auto mem_kv_prot_info =
2241 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2242 mem_kv_prot_info.UpdateO(kTypeDeletion, delete_type);
2243 ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
2244 &mem_kv_prot_info);
2245 } else {
2246 ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
2247 nullptr /* kv_prot_info */);
2248 }
2249 // optimize for non-recovery mode
2250 // If `ret_status` is `TryAgain` then the next (successful) try will add
2251 // the key to the rebuilding transaction object. If `ret_status` is
2252 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2253 // away. So we only need to add to it when `ret_status.ok()`.
2254 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
2255 assert(!write_after_commit_);
2256 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2257 ret_status =
2258 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
2259 }
2260 if (UNLIKELY(ret_status.IsTryAgain())) {
2261 DecrementProtectionInfoIdxForTryAgain();
2262 }
2263 return ret_status;
2264 }
2265
2266 Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
2267 const auto* kv_prot_info = NextProtectionInfo();
2268 // optimize for non-recovery mode
2269 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
2270 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2271 return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
2272 key);
2273 // else insert the values to the memtable right away
2274 }
2275
2276 Status ret_status;
2277 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
2278 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
2279 assert(!write_after_commit_);
2280 // The CF is probably flushed and hence no need for insert but we still
2281 // need to keep track of the keys for upcoming rollback/commit.
2282 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2283 ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
2284 column_family_id, key);
2285 if (ret_status.ok()) {
2286 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
2287 }
2288 } else if (ret_status.ok()) {
2289 MaybeAdvanceSeq(false /* batch_boundary */);
2290 }
2291 if (UNLIKELY(ret_status.IsTryAgain())) {
2292 DecrementProtectionInfoIdxForTryAgain();
2293 }
2294 return ret_status;
2295 }
2296 assert(ret_status.ok());
2297
2298 if (kv_prot_info != nullptr) {
2299 auto mem_kv_prot_info =
2300 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2301 ret_status = DeleteImpl(column_family_id, key, Slice(),
2302 kTypeSingleDeletion, &mem_kv_prot_info);
2303 } else {
2304 ret_status = DeleteImpl(column_family_id, key, Slice(),
2305 kTypeSingleDeletion, nullptr /* kv_prot_info */);
2306 }
2307 // optimize for non-recovery mode
2308 // If `ret_status` is `TryAgain` then the next (successful) try will add
2309 // the key to the rebuilding transaction object. If `ret_status` is
2310 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2311 // away. So we only need to add to it when `ret_status.ok()`.
2312 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
2313 assert(!write_after_commit_);
2314 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2315 ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
2316 column_family_id, key);
2317 }
2318 if (UNLIKELY(ret_status.IsTryAgain())) {
2319 DecrementProtectionInfoIdxForTryAgain();
2320 }
2321 return ret_status;
2322 }
2323
2324 Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
2325 const Slice& end_key) override {
2326 const auto* kv_prot_info = NextProtectionInfo();
2327 // optimize for non-recovery mode
2328 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
2329 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2330 return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
2331 begin_key, end_key);
2332 // else insert the values to the memtable right away
2333 }
2334
2335 Status ret_status;
2336 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
2337 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
2338 assert(!write_after_commit_);
2339 // The CF is probably flushed and hence no need for insert but we still
2340 // need to keep track of the keys for upcoming rollback/commit.
2341 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2342 ret_status = WriteBatchInternal::DeleteRange(
2343 rebuilding_trx_, column_family_id, begin_key, end_key);
2344 if (ret_status.ok()) {
2345 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, begin_key));
2346 }
2347 } else if (ret_status.ok()) {
2348 MaybeAdvanceSeq(false /* batch_boundary */);
2349 }
2350 if (UNLIKELY(ret_status.IsTryAgain())) {
2351 DecrementProtectionInfoIdxForTryAgain();
2352 }
2353 return ret_status;
2354 }
2355 assert(ret_status.ok());
2356
2357 if (db_ != nullptr) {
2358 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
2359 if (cf_handle == nullptr) {
2360 cf_handle = db_->DefaultColumnFamily();
2361 }
2362 auto* cfd =
2363 static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd();
2364 if (!cfd->is_delete_range_supported()) {
2365 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
2366 ret_status.PermitUncheckedError();
2367 return Status::NotSupported(
2368 std::string("DeleteRange not supported for table type ") +
2369 cfd->ioptions()->table_factory->Name() + " in CF " +
2370 cfd->GetName());
2371 }
2372 int cmp =
2373 cfd->user_comparator()->CompareWithoutTimestamp(begin_key, end_key);
2374 if (cmp > 0) {
2375 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
2376 ret_status.PermitUncheckedError();
2377 // It's an empty range where endpoints appear mistaken. Don't bother
2378 // applying it to the DB, and return an error to the user.
2379 return Status::InvalidArgument("end key comes before start key");
2380 } else if (cmp == 0) {
2381 // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
2382 ret_status.PermitUncheckedError();
2383 // It's an empty range. Don't bother applying it to the DB.
2384 return Status::OK();
2385 }
2386 }
2387
2388 if (kv_prot_info != nullptr) {
2389 auto mem_kv_prot_info =
2390 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2391 ret_status = DeleteImpl(column_family_id, begin_key, end_key,
2392 kTypeRangeDeletion, &mem_kv_prot_info);
2393 } else {
2394 ret_status = DeleteImpl(column_family_id, begin_key, end_key,
2395 kTypeRangeDeletion, nullptr /* kv_prot_info */);
2396 }
2397 // optimize for non-recovery mode
2398 // If `ret_status` is `TryAgain` then the next (successful) try will add
2399 // the key to the rebuilding transaction object. If `ret_status` is
2400 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2401 // away. So we only need to add to it when `ret_status.ok()`.
2402 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
2403 assert(!write_after_commit_);
2404 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2405 ret_status = WriteBatchInternal::DeleteRange(
2406 rebuilding_trx_, column_family_id, begin_key, end_key);
2407 }
2408 if (UNLIKELY(ret_status.IsTryAgain())) {
2409 DecrementProtectionInfoIdxForTryAgain();
2410 }
2411 return ret_status;
2412 }
2413
2414 Status MergeCF(uint32_t column_family_id, const Slice& key,
2415 const Slice& value) override {
2416 const auto* kv_prot_info = NextProtectionInfo();
2417 // optimize for non-recovery mode
2418 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
2419 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2420 return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
2421 value);
2422 // else insert the values to the memtable right away
2423 }
2424
2425 Status ret_status;
2426 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
2427 if (ret_status.ok() && rebuilding_trx_ != nullptr) {
2428 assert(!write_after_commit_);
2429 // The CF is probably flushed and hence no need for insert but we still
2430 // need to keep track of the keys for upcoming rollback/commit.
2431 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2432 ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
2433 column_family_id, key, value);
2434 if (ret_status.ok()) {
2435 MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
2436 }
2437 } else if (ret_status.ok()) {
2438 MaybeAdvanceSeq(false /* batch_boundary */);
2439 }
2440 if (UNLIKELY(ret_status.IsTryAgain())) {
2441 DecrementProtectionInfoIdxForTryAgain();
2442 }
2443 return ret_status;
2444 }
2445 assert(ret_status.ok());
2446
2447 MemTable* mem = cf_mems_->GetMemTable();
2448 auto* moptions = mem->GetImmutableMemTableOptions();
2449 if (moptions->merge_operator == nullptr) {
2450 return Status::InvalidArgument(
2451 "Merge requires `ColumnFamilyOptions::merge_operator != nullptr`");
2452 }
2453 bool perform_merge = false;
2454 assert(!concurrent_memtable_writes_ ||
2455 moptions->max_successive_merges == 0);
2456
2457 // If we pass DB through and options.max_successive_merges is hit
2458 // during recovery, Get() will be issued which will try to acquire
2459 // DB mutex and cause deadlock, as DB mutex is already held.
2460 // So we disable merge in recovery
2461 if (moptions->max_successive_merges > 0 && db_ != nullptr &&
2462 recovering_log_number_ == 0) {
2463 assert(!concurrent_memtable_writes_);
2464 LookupKey lkey(key, sequence_);
2465
2466 // Count the number of successive merges at the head
2467 // of the key in the memtable
2468 size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
2469
2470 if (num_merges >= moptions->max_successive_merges) {
2471 perform_merge = true;
2472 }
2473 }
2474
2475 if (perform_merge) {
2476 // 1) Get the existing value
2477 std::string get_value;
2478
2479 // Pass in the sequence number so that we also include previous merge
2480 // operations in the same batch.
2481 SnapshotImpl read_from_snapshot;
2482 read_from_snapshot.number_ = sequence_;
2483 ReadOptions read_options;
2484 read_options.snapshot = &read_from_snapshot;
2485
2486 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
2487 if (cf_handle == nullptr) {
2488 cf_handle = db_->DefaultColumnFamily();
2489 }
2490 Status get_status = db_->Get(read_options, cf_handle, key, &get_value);
2491 if (!get_status.ok()) {
2492 // Failed to read a key we know exists. Store the delta in memtable.
2493 perform_merge = false;
2494 } else {
2495 Slice get_value_slice = Slice(get_value);
2496
2497 // 2) Apply this merge
2498 auto merge_operator = moptions->merge_operator;
2499 assert(merge_operator);
2500
2501 std::string new_value;
2502 Status merge_status = MergeHelper::TimedFullMerge(
2503 merge_operator, key, &get_value_slice, {value}, &new_value,
2504 moptions->info_log, moptions->statistics,
2505 SystemClock::Default().get(), /* result_operand */ nullptr,
2506 /* update_num_ops_stats */ false);
2507
2508 if (!merge_status.ok()) {
2509 // Failed to merge!
2510 // Store the delta in memtable
2511 perform_merge = false;
2512 } else {
2513 // 3) Add value to memtable
2514 assert(!concurrent_memtable_writes_);
2515 if (kv_prot_info != nullptr) {
2516 auto merged_kv_prot_info =
2517 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2518 merged_kv_prot_info.UpdateV(value, new_value);
2519 merged_kv_prot_info.UpdateO(kTypeMerge, kTypeValue);
2520 ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
2521 &merged_kv_prot_info);
2522 } else {
2523 ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
2524 nullptr /* kv_prot_info */);
2525 }
2526 }
2527 }
2528 }
2529
2530 if (!perform_merge) {
2531 assert(ret_status.ok());
2532 // Add merge operand to memtable
2533 if (kv_prot_info != nullptr) {
2534 auto mem_kv_prot_info =
2535 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2536 ret_status =
2537 mem->Add(sequence_, kTypeMerge, key, value, &mem_kv_prot_info,
2538 concurrent_memtable_writes_, get_post_process_info(mem));
2539 } else {
2540 ret_status = mem->Add(
2541 sequence_, kTypeMerge, key, value, nullptr /* kv_prot_info */,
2542 concurrent_memtable_writes_, get_post_process_info(mem));
2543 }
2544 }
2545
2546 if (UNLIKELY(ret_status.IsTryAgain())) {
2547 assert(seq_per_batch_);
2548 const bool kBatchBoundary = true;
2549 MaybeAdvanceSeq(kBatchBoundary);
2550 } else if (ret_status.ok()) {
2551 MaybeAdvanceSeq();
2552 CheckMemtableFull();
2553 }
2554 // optimize for non-recovery mode
2555 // If `ret_status` is `TryAgain` then the next (successful) try will add
2556 // the key to the rebuilding transaction object. If `ret_status` is
2557 // another non-OK `Status`, then the `rebuilding_trx_` will be thrown
2558 // away. So we only need to add to it when `ret_status.ok()`.
2559 if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
2560 assert(!write_after_commit_);
2561 // TODO(ajkr): propagate `ProtectionInfoKVOS64`.
2562 ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
2563 key, value);
2564 }
2565 if (UNLIKELY(ret_status.IsTryAgain())) {
2566 DecrementProtectionInfoIdxForTryAgain();
2567 }
2568 return ret_status;
2569 }
2570
2571 Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
2572 const Slice& value) override {
2573 const auto* kv_prot_info = NextProtectionInfo();
2574 Status ret_status;
2575 if (kv_prot_info != nullptr) {
2576 // Memtable needs seqno, doesn't need CF ID
2577 auto mem_kv_prot_info =
2578 kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
2579 // Same as PutCF except for value type.
2580 ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
2581 &mem_kv_prot_info);
2582 } else {
2583 ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
2584 nullptr /* kv_prot_info */);
2585 }
2586 if (UNLIKELY(ret_status.IsTryAgain())) {
2587 DecrementProtectionInfoIdxForTryAgain();
2588 }
2589 return ret_status;
2590 }
2591
2592 void CheckMemtableFull() {
2593 if (flush_scheduler_ != nullptr) {
2594 auto* cfd = cf_mems_->current();
2595 assert(cfd != nullptr);
2596 if (cfd->mem()->ShouldScheduleFlush() &&
2597 cfd->mem()->MarkFlushScheduled()) {
2598 // MarkFlushScheduled only returns true if we are the one that
2599 // should take action, so no need to dedup further
2600 flush_scheduler_->ScheduleWork(cfd);
2601 }
2602 }
2603 // check if memtable_list size exceeds max_write_buffer_size_to_maintain
2604 if (trim_history_scheduler_ != nullptr) {
2605 auto* cfd = cf_mems_->current();
2606
2607 assert(cfd);
2608 assert(cfd->ioptions());
2609
2610 const size_t size_to_maintain = static_cast<size_t>(
2611 cfd->ioptions()->max_write_buffer_size_to_maintain);
2612
2613 if (size_to_maintain > 0) {
2614 MemTableList* const imm = cfd->imm();
2615 assert(imm);
2616
2617 if (imm->HasHistory()) {
2618 const MemTable* const mem = cfd->mem();
2619 assert(mem);
2620
2621 if (mem->MemoryAllocatedBytes() +
2622 imm->MemoryAllocatedBytesExcludingLast() >=
2623 size_to_maintain &&
2624 imm->MarkTrimHistoryNeeded()) {
2625 trim_history_scheduler_->ScheduleWork(cfd);
2626 }
2627 }
2628 }
2629 }
2630 }
2631
2632 // The write batch handler calls MarkBeginPrepare with unprepare set to true
2633 // if it encounters the kTypeBeginUnprepareXID marker.
2634 Status MarkBeginPrepare(bool unprepare) override {
2635 assert(rebuilding_trx_ == nullptr);
2636 assert(db_);
2637
2638 if (recovering_log_number_ != 0) {
2639 db_->mutex()->AssertHeld();
2640 // during recovery we rebuild a hollow transaction
2641 // from all encountered prepare sections of the wal
2642 if (db_->allow_2pc() == false) {
2643 return Status::NotSupported(
2644 "WAL contains prepared transactions. Open with "
2645 "TransactionDB::Open().");
2646 }
2647
2648 // we are now iterating through a prepared section
2649 rebuilding_trx_ = new WriteBatch();
2650 rebuilding_trx_seq_ = sequence_;
2651 // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers.
2652 // unprepared_batch_ should be false because it is false by default, and
2653 // gets reset to false in MarkEndPrepare.
2654 assert(!unprepared_batch_);
2655 unprepared_batch_ = unprepare;
2656
2657 if (has_valid_writes_ != nullptr) {
2658 *has_valid_writes_ = true;
2659 }
2660 }
2661
2662 return Status::OK();
2663 }
2664
2665 Status MarkEndPrepare(const Slice& name) override {
2666 assert(db_);
2667 assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
2668
2669 if (recovering_log_number_ != 0) {
2670 db_->mutex()->AssertHeld();
2671 assert(db_->allow_2pc());
2672 size_t batch_cnt =
2673 write_after_commit_
2674 ? 0 // 0 will disable further checks
2675 : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
2676 db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
2677 rebuilding_trx_, rebuilding_trx_seq_,
2678 batch_cnt, unprepared_batch_);
2679 unprepared_batch_ = false;
2680 rebuilding_trx_ = nullptr;
2681 } else {
2682 assert(rebuilding_trx_ == nullptr);
2683 }
2684 const bool batch_boundry = true;
2685 MaybeAdvanceSeq(batch_boundry);
2686
2687 return Status::OK();
2688 }
2689
2690 Status MarkNoop(bool empty_batch) override {
2691 if (recovering_log_number_ != 0) {
2692 db_->mutex()->AssertHeld();
2693 }
2694 // A hack in pessimistic transaction could result into a noop at the start
2695 // of the write batch, that should be ignored.
2696 if (!empty_batch) {
2697 // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
2698 // a batch. This happens when write batch commits skipping the prepare
2699 // phase.
2700 const bool batch_boundry = true;
2701 MaybeAdvanceSeq(batch_boundry);
2702 }
2703 return Status::OK();
2704 }
2705
2706 Status MarkCommit(const Slice& name) override {
2707 assert(db_);
2708
2709 Status s;
2710
2711 if (recovering_log_number_ != 0) {
2712 // We must hold db mutex in recovery.
2713 db_->mutex()->AssertHeld();
2714 // in recovery when we encounter a commit marker
2715 // we lookup this transaction in our set of rebuilt transactions
2716 // and commit.
2717 auto trx = db_->GetRecoveredTransaction(name.ToString());
2718
2719 // the log containing the prepared section may have
2720 // been released in the last incarnation because the
2721 // data was flushed to L0
2722 if (trx != nullptr) {
2723 // at this point individual CF lognumbers will prevent
2724 // duplicate re-insertion of values.
2725 assert(log_number_ref_ == 0);
2726 if (write_after_commit_) {
2727 // write_after_commit_ can only have one batch in trx.
2728 assert(trx->batches_.size() == 1);
2729 const auto& batch_info = trx->batches_.begin()->second;
2730 // all inserts must reference this trx log number
2731 log_number_ref_ = batch_info.log_number_;
2732 ResetProtectionInfo();
2733 s = batch_info.batch_->Iterate(this);
2734 log_number_ref_ = 0;
2735 }
2736 // else the values are already inserted before the commit
2737
2738 if (s.ok()) {
2739 db_->DeleteRecoveredTransaction(name.ToString());
2740 }
2741 if (has_valid_writes_ != nullptr) {
2742 *has_valid_writes_ = true;
2743 }
2744 }
2745 } else {
2746 // When writes are not delayed until commit, there is no disconnect
2747 // between a memtable write and the WAL that supports it. So the commit
2748 // need not reference any log as the only log to which it depends.
2749 assert(!write_after_commit_ || log_number_ref_ > 0);
2750 }
2751 const bool batch_boundry = true;
2752 MaybeAdvanceSeq(batch_boundry);
2753
2754 if (UNLIKELY(s.IsTryAgain())) {
2755 DecrementProtectionInfoIdxForTryAgain();
2756 }
2757
2758 return s;
2759 }
2760
2761 Status MarkCommitWithTimestamp(const Slice& name,
2762 const Slice& commit_ts) override {
2763 assert(db_);
2764
2765 Status s;
2766
2767 if (recovering_log_number_ != 0) {
2768 // In recovery, db mutex must be held.
2769 db_->mutex()->AssertHeld();
2770 // in recovery when we encounter a commit marker
2771 // we lookup this transaction in our set of rebuilt transactions
2772 // and commit.
2773 auto trx = db_->GetRecoveredTransaction(name.ToString());
2774 // the log containing the prepared section may have
2775 // been released in the last incarnation because the
2776 // data was flushed to L0
2777 if (trx) {
2778 // at this point individual CF lognumbers will prevent
2779 // duplicate re-insertion of values.
2780 assert(0 == log_number_ref_);
2781 if (write_after_commit_) {
2782 // write_after_commit_ can only have one batch in trx.
2783 assert(trx->batches_.size() == 1);
2784 const auto& batch_info = trx->batches_.begin()->second;
2785 // all inserts must reference this trx log number
2786 log_number_ref_ = batch_info.log_number_;
2787
2788 s = batch_info.batch_->UpdateTimestamps(
2789 commit_ts, [this](uint32_t cf) {
2790 assert(db_);
2791 VersionSet* const vset = db_->GetVersionSet();
2792 assert(vset);
2793 ColumnFamilySet* const cf_set = vset->GetColumnFamilySet();
2794 assert(cf_set);
2795 ColumnFamilyData* cfd = cf_set->GetColumnFamily(cf);
2796 assert(cfd);
2797 const auto* const ucmp = cfd->user_comparator();
2798 assert(ucmp);
2799 return ucmp->timestamp_size();
2800 });
2801 if (s.ok()) {
2802 ResetProtectionInfo();
2803 s = batch_info.batch_->Iterate(this);
2804 log_number_ref_ = 0;
2805 }
2806 }
2807 // else the values are already inserted before the commit
2808
2809 if (s.ok()) {
2810 db_->DeleteRecoveredTransaction(name.ToString());
2811 }
2812 if (has_valid_writes_) {
2813 *has_valid_writes_ = true;
2814 }
2815 }
2816 } else {
2817 // When writes are not delayed until commit, there is no connection
2818 // between a memtable write and the WAL that supports it. So the commit
2819 // need not reference any log as the only log to which it depends.
2820 assert(!write_after_commit_ || log_number_ref_ > 0);
2821 }
2822 constexpr bool batch_boundary = true;
2823 MaybeAdvanceSeq(batch_boundary);
2824
2825 if (UNLIKELY(s.IsTryAgain())) {
2826 DecrementProtectionInfoIdxForTryAgain();
2827 }
2828
2829 return s;
2830 }
2831
2832 Status MarkRollback(const Slice& name) override {
2833 assert(db_);
2834
2835 if (recovering_log_number_ != 0) {
2836 auto trx = db_->GetRecoveredTransaction(name.ToString());
2837
2838 // the log containing the transactions prep section
2839 // may have been released in the previous incarnation
2840 // because we knew it had been rolled back
2841 if (trx != nullptr) {
2842 db_->DeleteRecoveredTransaction(name.ToString());
2843 }
2844 } else {
2845 // in non recovery we simply ignore this tag
2846 }
2847
2848 const bool batch_boundry = true;
2849 MaybeAdvanceSeq(batch_boundry);
2850
2851 return Status::OK();
2852 }
2853
2854 private:
2855 MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
2856 if (!concurrent_memtable_writes_) {
2857 // No need to batch counters locally if we don't use concurrent mode.
2858 return nullptr;
2859 }
2860 return &GetPostMap()[mem];
2861 }
2862 };
2863
2864 } // anonymous namespace
2865
2866 // This function can only be called in these conditions:
2867 // 1) During Recovery()
2868 // 2) During Write(), in a single-threaded write thread
2869 // 3) During Write(), in a concurrent context where memtables has been cloned
2870 // The reason is that it calls memtables->Seek(), which has a stateful cache
2871 Status WriteBatchInternal::InsertInto(
2872 WriteThread::WriteGroup& write_group, SequenceNumber sequence,
2873 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2874 TrimHistoryScheduler* trim_history_scheduler,
2875 bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
2876 bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
2877 MemTableInserter inserter(
2878 sequence, memtables, flush_scheduler, trim_history_scheduler,
2879 ignore_missing_column_families, recovery_log_number, db,
2880 concurrent_memtable_writes, nullptr /* prot_info */,
2881 nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
2882 for (auto w : write_group) {
2883 if (w->CallbackFailed()) {
2884 continue;
2885 }
2886 w->sequence = inserter.sequence();
2887 if (!w->ShouldWriteToMemtable()) {
2888 // In seq_per_batch_ mode this advances the seq by one.
2889 inserter.MaybeAdvanceSeq(true);
2890 continue;
2891 }
2892 SetSequence(w->batch, inserter.sequence());
2893 inserter.set_log_number_ref(w->log_ref);
2894 inserter.set_prot_info(w->batch->prot_info_.get());
2895 w->status = w->batch->Iterate(&inserter);
2896 if (!w->status.ok()) {
2897 return w->status;
2898 }
2899 assert(!seq_per_batch || w->batch_cnt != 0);
2900 assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
2901 }
2902 return Status::OK();
2903 }
2904
2905 Status WriteBatchInternal::InsertInto(
2906 WriteThread::Writer* writer, SequenceNumber sequence,
2907 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
2908 TrimHistoryScheduler* trim_history_scheduler,
2909 bool ignore_missing_column_families, uint64_t log_number, DB* db,
2910 bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
2911 bool batch_per_txn, bool hint_per_batch) {
2912 #ifdef NDEBUG
2913 (void)batch_cnt;
2914 #endif
2915 assert(writer->ShouldWriteToMemtable());
2916 MemTableInserter inserter(sequence, memtables, flush_scheduler,
2917 trim_history_scheduler,
2918 ignore_missing_column_families, log_number, db,
2919 concurrent_memtable_writes, nullptr /* prot_info */,
2920 nullptr /*has_valid_writes*/, seq_per_batch,
2921 batch_per_txn, hint_per_batch);
2922 SetSequence(writer->batch, sequence);
2923 inserter.set_log_number_ref(writer->log_ref);
2924 inserter.set_prot_info(writer->batch->prot_info_.get());
2925 Status s = writer->batch->Iterate(&inserter);
2926 assert(!seq_per_batch || batch_cnt != 0);
2927 assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
2928 if (concurrent_memtable_writes) {
2929 inserter.PostProcess();
2930 }
2931 return s;
2932 }
2933
2934 Status WriteBatchInternal::InsertInto(
2935 const WriteBatch* batch, ColumnFamilyMemTables* memtables,
2936 FlushScheduler* flush_scheduler,
2937 TrimHistoryScheduler* trim_history_scheduler,
2938 bool ignore_missing_column_families, uint64_t log_number, DB* db,
2939 bool concurrent_memtable_writes, SequenceNumber* next_seq,
2940 bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
2941 MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
2942 trim_history_scheduler,
2943 ignore_missing_column_families, log_number, db,
2944 concurrent_memtable_writes, batch->prot_info_.get(),
2945 has_valid_writes, seq_per_batch, batch_per_txn);
2946 Status s = batch->Iterate(&inserter);
2947 if (next_seq != nullptr) {
2948 *next_seq = inserter.sequence();
2949 }
2950 if (concurrent_memtable_writes) {
2951 inserter.PostProcess();
2952 }
2953 return s;
2954 }
2955
2956 namespace {
2957
2958 // This class updates protection info for a WriteBatch.
2959 class ProtectionInfoUpdater : public WriteBatch::Handler {
2960 public:
2961 explicit ProtectionInfoUpdater(WriteBatch::ProtectionInfo* prot_info)
2962 : prot_info_(prot_info) {}
2963
2964 ~ProtectionInfoUpdater() override {}
2965
2966 Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
2967 return UpdateProtInfo(cf, key, val, kTypeValue);
2968 }
2969
2970 Status PutEntityCF(uint32_t cf, const Slice& key,
2971 const Slice& entity) override {
2972 return UpdateProtInfo(cf, key, entity, kTypeWideColumnEntity);
2973 }
2974
2975 Status DeleteCF(uint32_t cf, const Slice& key) override {
2976 return UpdateProtInfo(cf, key, "", kTypeDeletion);
2977 }
2978
2979 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
2980 return UpdateProtInfo(cf, key, "", kTypeSingleDeletion);
2981 }
2982
2983 Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
2984 const Slice& end_key) override {
2985 return UpdateProtInfo(cf, begin_key, end_key, kTypeRangeDeletion);
2986 }
2987
2988 Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
2989 return UpdateProtInfo(cf, key, val, kTypeMerge);
2990 }
2991
2992 Status PutBlobIndexCF(uint32_t cf, const Slice& key,
2993 const Slice& val) override {
2994 return UpdateProtInfo(cf, key, val, kTypeBlobIndex);
2995 }
2996
2997 Status MarkBeginPrepare(bool /* unprepare */) override {
2998 return Status::OK();
2999 }
3000
3001 Status MarkEndPrepare(const Slice& /* xid */) override {
3002 return Status::OK();
3003 }
3004
3005 Status MarkCommit(const Slice& /* xid */) override { return Status::OK(); }
3006
3007 Status MarkCommitWithTimestamp(const Slice& /* xid */,
3008 const Slice& /* ts */) override {
3009 return Status::OK();
3010 }
3011
3012 Status MarkRollback(const Slice& /* xid */) override { return Status::OK(); }
3013
3014 Status MarkNoop(bool /* empty_batch */) override { return Status::OK(); }
3015
3016 private:
3017 Status UpdateProtInfo(uint32_t cf, const Slice& key, const Slice& val,
3018 const ValueType op_type) {
3019 if (prot_info_) {
3020 prot_info_->entries_.emplace_back(
3021 ProtectionInfo64().ProtectKVO(key, val, op_type).ProtectC(cf));
3022 }
3023 return Status::OK();
3024 }
3025
3026 // No copy or move.
3027 ProtectionInfoUpdater(const ProtectionInfoUpdater&) = delete;
3028 ProtectionInfoUpdater(ProtectionInfoUpdater&&) = delete;
3029 ProtectionInfoUpdater& operator=(const ProtectionInfoUpdater&) = delete;
3030 ProtectionInfoUpdater& operator=(ProtectionInfoUpdater&&) = delete;
3031
3032 WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
3033 };
3034
3035 } // anonymous namespace
3036
3037 Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
3038 assert(contents.size() >= WriteBatchInternal::kHeader);
3039 assert(b->prot_info_ == nullptr);
3040
3041 b->rep_.assign(contents.data(), contents.size());
3042 b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
3043 return Status::OK();
3044 }
3045
3046 Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
3047 const bool wal_only) {
3048 assert(dst->Count() == 0 ||
3049 (dst->prot_info_ == nullptr) == (src->prot_info_ == nullptr));
3050 if ((src->prot_info_ != nullptr &&
3051 src->prot_info_->entries_.size() != src->Count()) ||
3052 (dst->prot_info_ != nullptr &&
3053 dst->prot_info_->entries_.size() != dst->Count())) {
3054 return Status::Corruption(
3055 "Write batch has inconsistent count and number of checksums");
3056 }
3057
3058 size_t src_len;
3059 int src_count;
3060 uint32_t src_flags;
3061
3062 const SavePoint& batch_end = src->GetWalTerminationPoint();
3063
3064 if (wal_only && !batch_end.is_cleared()) {
3065 src_len = batch_end.size - WriteBatchInternal::kHeader;
3066 src_count = batch_end.count;
3067 src_flags = batch_end.content_flags;
3068 } else {
3069 src_len = src->rep_.size() - WriteBatchInternal::kHeader;
3070 src_count = Count(src);
3071 src_flags = src->content_flags_.load(std::memory_order_relaxed);
3072 }
3073
3074 if (src->prot_info_ != nullptr) {
3075 if (dst->prot_info_ == nullptr) {
3076 dst->prot_info_.reset(new WriteBatch::ProtectionInfo());
3077 }
3078 std::copy(src->prot_info_->entries_.begin(),
3079 src->prot_info_->entries_.begin() + src_count,
3080 std::back_inserter(dst->prot_info_->entries_));
3081 } else if (dst->prot_info_ != nullptr) {
3082 // dst has empty prot_info->entries
3083 // In this special case, we allow write batch without prot_info to
3084 // be appende to write batch with empty prot_info
3085 dst->prot_info_ = nullptr;
3086 }
3087 SetCount(dst, Count(dst) + src_count);
3088 assert(src->rep_.size() >= WriteBatchInternal::kHeader);
3089 dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
3090 dst->content_flags_.store(
3091 dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
3092 std::memory_order_relaxed);
3093 return Status::OK();
3094 }
3095
3096 size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
3097 size_t rightByteSize) {
3098 if (leftByteSize == 0 || rightByteSize == 0) {
3099 return leftByteSize + rightByteSize;
3100 } else {
3101 return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
3102 }
3103 }
3104
3105 Status WriteBatchInternal::UpdateProtectionInfo(WriteBatch* wb,
3106 size_t bytes_per_key,
3107 uint64_t* checksum) {
3108 if (bytes_per_key == 0) {
3109 if (wb->prot_info_ != nullptr) {
3110 wb->prot_info_.reset();
3111 return Status::OK();
3112 } else {
3113 // Already not protected.
3114 return Status::OK();
3115 }
3116 } else if (bytes_per_key == 8) {
3117 if (wb->prot_info_ == nullptr) {
3118 wb->prot_info_.reset(new WriteBatch::ProtectionInfo());
3119 ProtectionInfoUpdater prot_info_updater(wb->prot_info_.get());
3120 Status s = wb->Iterate(&prot_info_updater);
3121 if (s.ok() && checksum != nullptr) {
3122 uint64_t expected_hash = XXH3_64bits(wb->rep_.data(), wb->rep_.size());
3123 if (expected_hash != *checksum) {
3124 return Status::Corruption("Write batch content corrupted.");
3125 }
3126 }
3127 return s;
3128 } else {
3129 // Already protected.
3130 return Status::OK();
3131 }
3132 }
3133 return Status::NotSupported(
3134 "WriteBatch protection info must be zero or eight bytes/key");
3135 }
3136
3137 } // namespace ROCKSDB_NAMESPACE