]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/write_batch.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / write_batch.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9//
10// WriteBatch::rep_ :=
11// sequence: fixed64
12// count: fixed32
13// data: record[count]
14// record :=
15// kTypeValue varstring varstring
16// kTypeDeletion varstring
17// kTypeSingleDeletion varstring
11fdf7f2 18// kTypeRangeDeletion varstring varstring
7c673cae
FG
19// kTypeMerge varstring varstring
20// kTypeColumnFamilyValue varint32 varstring varstring
11fdf7f2
TL
21// kTypeColumnFamilyDeletion varint32 varstring
22// kTypeColumnFamilySingleDeletion varint32 varstring
23// kTypeColumnFamilyRangeDeletion varint32 varstring varstring
7c673cae
FG
24// kTypeColumnFamilyMerge varint32 varstring varstring
25// kTypeBeginPrepareXID varstring
26// kTypeEndPrepareXID
27// kTypeCommitXID varstring
28// kTypeRollbackXID varstring
11fdf7f2
TL
29// kTypeBeginPersistedPrepareXID varstring
30// kTypeBeginUnprepareXID varstring
7c673cae
FG
31// kTypeNoop
32// varstring :=
33// len: varint32
34// data: uint8[len]
35
36#include "rocksdb/write_batch.h"
37
38#include <map>
39#include <stack>
40#include <stdexcept>
41#include <type_traits>
42#include <vector>
43
44#include "db/column_family.h"
45#include "db/db_impl.h"
46#include "db/dbformat.h"
47#include "db/flush_scheduler.h"
48#include "db/memtable.h"
49#include "db/merge_context.h"
50#include "db/snapshot_impl.h"
51#include "db/write_batch_internal.h"
52#include "monitoring/perf_context_imp.h"
53#include "monitoring/statistics.h"
54#include "rocksdb/merge_operator.h"
55#include "util/coding.h"
11fdf7f2 56#include "util/duplicate_detector.h"
7c673cae 57#include "util/string_util.h"
11fdf7f2 58#include "util/util.h"
7c673cae
FG
59
60namespace rocksdb {
61
62// anon namespace for file-local types
63namespace {
64
65enum ContentFlags : uint32_t {
66 DEFERRED = 1 << 0,
67 HAS_PUT = 1 << 1,
68 HAS_DELETE = 1 << 2,
69 HAS_SINGLE_DELETE = 1 << 3,
70 HAS_MERGE = 1 << 4,
71 HAS_BEGIN_PREPARE = 1 << 5,
72 HAS_END_PREPARE = 1 << 6,
73 HAS_COMMIT = 1 << 7,
74 HAS_ROLLBACK = 1 << 8,
75 HAS_DELETE_RANGE = 1 << 9,
11fdf7f2
TL
76 HAS_BLOB_INDEX = 1 << 10,
77 HAS_BEGIN_UNPREPARE = 1 << 11,
7c673cae
FG
78};
79
80struct BatchContentClassifier : public WriteBatch::Handler {
81 uint32_t content_flags = 0;
82
83 Status PutCF(uint32_t, const Slice&, const Slice&) override {
84 content_flags |= ContentFlags::HAS_PUT;
85 return Status::OK();
86 }
87
88 Status DeleteCF(uint32_t, const Slice&) override {
89 content_flags |= ContentFlags::HAS_DELETE;
90 return Status::OK();
91 }
92
93 Status SingleDeleteCF(uint32_t, const Slice&) override {
94 content_flags |= ContentFlags::HAS_SINGLE_DELETE;
95 return Status::OK();
96 }
97
98 Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
99 content_flags |= ContentFlags::HAS_DELETE_RANGE;
100 return Status::OK();
101 }
102
103 Status MergeCF(uint32_t, const Slice&, const Slice&) override {
104 content_flags |= ContentFlags::HAS_MERGE;
105 return Status::OK();
106 }
107
11fdf7f2
TL
108 Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
109 content_flags |= ContentFlags::HAS_BLOB_INDEX;
110 return Status::OK();
111 }
112
113 Status MarkBeginPrepare(bool unprepare) override {
7c673cae 114 content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
11fdf7f2
TL
115 if (unprepare) {
116 content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
117 }
7c673cae
FG
118 return Status::OK();
119 }
120
121 Status MarkEndPrepare(const Slice&) override {
122 content_flags |= ContentFlags::HAS_END_PREPARE;
123 return Status::OK();
124 }
125
126 Status MarkCommit(const Slice&) override {
127 content_flags |= ContentFlags::HAS_COMMIT;
128 return Status::OK();
129 }
130
131 Status MarkRollback(const Slice&) override {
132 content_flags |= ContentFlags::HAS_ROLLBACK;
133 return Status::OK();
134 }
135};
136
137} // anon namespace
138
139struct SavePoints {
140 std::stack<SavePoint> stack;
141};
142
143WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
144 : save_points_(nullptr), content_flags_(0), max_bytes_(max_bytes), rep_() {
145 rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
146 reserved_bytes : WriteBatchInternal::kHeader);
147 rep_.resize(WriteBatchInternal::kHeader);
148}
149
150WriteBatch::WriteBatch(const std::string& rep)
151 : save_points_(nullptr),
152 content_flags_(ContentFlags::DEFERRED),
153 max_bytes_(0),
154 rep_(rep) {}
155
11fdf7f2
TL
156WriteBatch::WriteBatch(std::string&& rep)
157 : save_points_(nullptr),
158 content_flags_(ContentFlags::DEFERRED),
159 max_bytes_(0),
160 rep_(std::move(rep)) {}
161
7c673cae
FG
162WriteBatch::WriteBatch(const WriteBatch& src)
163 : save_points_(src.save_points_),
164 wal_term_point_(src.wal_term_point_),
165 content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
166 max_bytes_(src.max_bytes_),
167 rep_(src.rep_) {}
168
11fdf7f2 169WriteBatch::WriteBatch(WriteBatch&& src) noexcept
7c673cae
FG
170 : save_points_(std::move(src.save_points_)),
171 wal_term_point_(std::move(src.wal_term_point_)),
172 content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
173 max_bytes_(src.max_bytes_),
174 rep_(std::move(src.rep_)) {}
175
176WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
177 if (&src != this) {
178 this->~WriteBatch();
179 new (this) WriteBatch(src);
180 }
181 return *this;
182}
183
184WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
185 if (&src != this) {
186 this->~WriteBatch();
187 new (this) WriteBatch(std::move(src));
188 }
189 return *this;
190}
191
192WriteBatch::~WriteBatch() { delete save_points_; }
193
194WriteBatch::Handler::~Handler() { }
195
11fdf7f2 196void WriteBatch::Handler::LogData(const Slice& /*blob*/) {
7c673cae
FG
197 // If the user has not specified something to do with blobs, then we ignore
198 // them.
199}
200
201bool WriteBatch::Handler::Continue() {
202 return true;
203}
204
205void WriteBatch::Clear() {
206 rep_.clear();
207 rep_.resize(WriteBatchInternal::kHeader);
208
209 content_flags_.store(0, std::memory_order_relaxed);
210
211 if (save_points_ != nullptr) {
212 while (!save_points_->stack.empty()) {
213 save_points_->stack.pop();
214 }
215 }
216
217 wal_term_point_.clear();
218}
219
220int WriteBatch::Count() const {
221 return WriteBatchInternal::Count(this);
222}
223
224uint32_t WriteBatch::ComputeContentFlags() const {
225 auto rv = content_flags_.load(std::memory_order_relaxed);
226 if ((rv & ContentFlags::DEFERRED) != 0) {
227 BatchContentClassifier classifier;
228 Iterate(&classifier);
229 rv = classifier.content_flags;
230
231 // this method is conceptually const, because it is performing a lazy
232 // computation that doesn't affect the abstract state of the batch.
233 // content_flags_ is marked mutable so that we can perform the
234 // following assignment
235 content_flags_.store(rv, std::memory_order_relaxed);
236 }
237 return rv;
238}
239
240void WriteBatch::MarkWalTerminationPoint() {
241 wal_term_point_.size = GetDataSize();
242 wal_term_point_.count = Count();
243 wal_term_point_.content_flags = content_flags_;
244}
245
246bool WriteBatch::HasPut() const {
247 return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
248}
249
250bool WriteBatch::HasDelete() const {
251 return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
252}
253
254bool WriteBatch::HasSingleDelete() const {
255 return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
256}
257
258bool WriteBatch::HasDeleteRange() const {
259 return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
260}
261
262bool WriteBatch::HasMerge() const {
263 return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
264}
265
266bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
267 assert(input != nullptr && key != nullptr);
268 // Skip tag byte
269 input->remove_prefix(1);
270
271 if (cf_record) {
272 // Skip column_family bytes
273 uint32_t cf;
274 if (!GetVarint32(input, &cf)) {
275 return false;
276 }
277 }
278
279 // Extract key
280 return GetLengthPrefixedSlice(input, key);
281}
282
283bool WriteBatch::HasBeginPrepare() const {
284 return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
285}
286
287bool WriteBatch::HasEndPrepare() const {
288 return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
289}
290
291bool WriteBatch::HasCommit() const {
292 return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
293}
294
295bool WriteBatch::HasRollback() const {
296 return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
297}
298
299Status ReadRecordFromWriteBatch(Slice* input, char* tag,
300 uint32_t* column_family, Slice* key,
301 Slice* value, Slice* blob, Slice* xid) {
302 assert(key != nullptr && value != nullptr);
303 *tag = (*input)[0];
304 input->remove_prefix(1);
305 *column_family = 0; // default
306 switch (*tag) {
307 case kTypeColumnFamilyValue:
308 if (!GetVarint32(input, column_family)) {
309 return Status::Corruption("bad WriteBatch Put");
310 }
11fdf7f2 311 FALLTHROUGH_INTENDED;
7c673cae
FG
312 case kTypeValue:
313 if (!GetLengthPrefixedSlice(input, key) ||
314 !GetLengthPrefixedSlice(input, value)) {
315 return Status::Corruption("bad WriteBatch Put");
316 }
317 break;
318 case kTypeColumnFamilyDeletion:
319 case kTypeColumnFamilySingleDeletion:
320 if (!GetVarint32(input, column_family)) {
321 return Status::Corruption("bad WriteBatch Delete");
322 }
11fdf7f2 323 FALLTHROUGH_INTENDED;
7c673cae
FG
324 case kTypeDeletion:
325 case kTypeSingleDeletion:
326 if (!GetLengthPrefixedSlice(input, key)) {
327 return Status::Corruption("bad WriteBatch Delete");
328 }
329 break;
330 case kTypeColumnFamilyRangeDeletion:
331 if (!GetVarint32(input, column_family)) {
332 return Status::Corruption("bad WriteBatch DeleteRange");
333 }
11fdf7f2 334 FALLTHROUGH_INTENDED;
7c673cae
FG
335 case kTypeRangeDeletion:
336 // for range delete, "key" is begin_key, "value" is end_key
337 if (!GetLengthPrefixedSlice(input, key) ||
338 !GetLengthPrefixedSlice(input, value)) {
339 return Status::Corruption("bad WriteBatch DeleteRange");
340 }
341 break;
342 case kTypeColumnFamilyMerge:
343 if (!GetVarint32(input, column_family)) {
344 return Status::Corruption("bad WriteBatch Merge");
345 }
11fdf7f2 346 FALLTHROUGH_INTENDED;
7c673cae
FG
347 case kTypeMerge:
348 if (!GetLengthPrefixedSlice(input, key) ||
349 !GetLengthPrefixedSlice(input, value)) {
350 return Status::Corruption("bad WriteBatch Merge");
351 }
352 break;
11fdf7f2
TL
353 case kTypeColumnFamilyBlobIndex:
354 if (!GetVarint32(input, column_family)) {
355 return Status::Corruption("bad WriteBatch BlobIndex");
356 }
357 FALLTHROUGH_INTENDED;
358 case kTypeBlobIndex:
359 if (!GetLengthPrefixedSlice(input, key) ||
360 !GetLengthPrefixedSlice(input, value)) {
361 return Status::Corruption("bad WriteBatch BlobIndex");
362 }
363 break;
7c673cae
FG
364 case kTypeLogData:
365 assert(blob != nullptr);
366 if (!GetLengthPrefixedSlice(input, blob)) {
367 return Status::Corruption("bad WriteBatch Blob");
368 }
369 break;
370 case kTypeNoop:
371 case kTypeBeginPrepareXID:
11fdf7f2
TL
372 // This indicates that the prepared batch is also persisted in the db.
373 // This is used in WritePreparedTxn
374 case kTypeBeginPersistedPrepareXID:
375 // This is used in WriteUnpreparedTxn
376 case kTypeBeginUnprepareXID:
7c673cae
FG
377 break;
378 case kTypeEndPrepareXID:
379 if (!GetLengthPrefixedSlice(input, xid)) {
380 return Status::Corruption("bad EndPrepare XID");
381 }
382 break;
383 case kTypeCommitXID:
384 if (!GetLengthPrefixedSlice(input, xid)) {
385 return Status::Corruption("bad Commit XID");
386 }
387 break;
388 case kTypeRollbackXID:
389 if (!GetLengthPrefixedSlice(input, xid)) {
390 return Status::Corruption("bad Rollback XID");
391 }
392 break;
393 default:
394 return Status::Corruption("unknown WriteBatch tag");
395 }
396 return Status::OK();
397}
398
399Status WriteBatch::Iterate(Handler* handler) const {
400 Slice input(rep_);
401 if (input.size() < WriteBatchInternal::kHeader) {
402 return Status::Corruption("malformed WriteBatch (too small)");
403 }
404
405 input.remove_prefix(WriteBatchInternal::kHeader);
406 Slice key, value, blob, xid;
11fdf7f2
TL
407 // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
408 // the batch boundary symbols otherwise we would mis-count the number of
409 // batches. We do that by checking whether the accumulated batch is empty
410 // before seeing the next Noop.
411 bool empty_batch = true;
7c673cae
FG
412 int found = 0;
413 Status s;
11fdf7f2
TL
414 char tag = 0;
415 uint32_t column_family = 0; // default
416 bool last_was_try_again = false;
494da23a
TL
417 bool handler_continue = true;
418 while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
419 handler_continue = handler->Continue();
420 if (!handler_continue) {
421 break;
422 }
423
11fdf7f2
TL
424 if (LIKELY(!s.IsTryAgain())) {
425 last_was_try_again = false;
426 tag = 0;
427 column_family = 0; // default
428
429 s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
430 &blob, &xid);
431 if (!s.ok()) {
432 return s;
433 }
434 } else {
435 assert(s.IsTryAgain());
436 assert(!last_was_try_again); // to detect infinite loop bugs
437 if (UNLIKELY(last_was_try_again)) {
438 return Status::Corruption(
439 "two consecutive TryAgain in WriteBatch handler; this is either a "
440 "software bug or data corruption.");
441 }
442 last_was_try_again = true;
443 s = Status::OK();
7c673cae
FG
444 }
445
446 switch (tag) {
447 case kTypeColumnFamilyValue:
448 case kTypeValue:
449 assert(content_flags_.load(std::memory_order_relaxed) &
450 (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
451 s = handler->PutCF(column_family, key, value);
11fdf7f2
TL
452 if (LIKELY(s.ok())) {
453 empty_batch = false;
454 found++;
455 }
7c673cae
FG
456 break;
457 case kTypeColumnFamilyDeletion:
458 case kTypeDeletion:
459 assert(content_flags_.load(std::memory_order_relaxed) &
460 (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
461 s = handler->DeleteCF(column_family, key);
11fdf7f2
TL
462 if (LIKELY(s.ok())) {
463 empty_batch = false;
464 found++;
465 }
7c673cae
FG
466 break;
467 case kTypeColumnFamilySingleDeletion:
468 case kTypeSingleDeletion:
469 assert(content_flags_.load(std::memory_order_relaxed) &
470 (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
471 s = handler->SingleDeleteCF(column_family, key);
11fdf7f2
TL
472 if (LIKELY(s.ok())) {
473 empty_batch = false;
474 found++;
475 }
7c673cae
FG
476 break;
477 case kTypeColumnFamilyRangeDeletion:
478 case kTypeRangeDeletion:
479 assert(content_flags_.load(std::memory_order_relaxed) &
480 (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
481 s = handler->DeleteRangeCF(column_family, key, value);
11fdf7f2
TL
482 if (LIKELY(s.ok())) {
483 empty_batch = false;
484 found++;
485 }
7c673cae
FG
486 break;
487 case kTypeColumnFamilyMerge:
488 case kTypeMerge:
489 assert(content_flags_.load(std::memory_order_relaxed) &
490 (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
491 s = handler->MergeCF(column_family, key, value);
11fdf7f2
TL
492 if (LIKELY(s.ok())) {
493 empty_batch = false;
494 found++;
495 }
496 break;
497 case kTypeColumnFamilyBlobIndex:
498 case kTypeBlobIndex:
499 assert(content_flags_.load(std::memory_order_relaxed) &
500 (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
501 s = handler->PutBlobIndexCF(column_family, key, value);
502 if (LIKELY(s.ok())) {
503 found++;
504 }
7c673cae
FG
505 break;
506 case kTypeLogData:
507 handler->LogData(blob);
11fdf7f2
TL
508 // A batch might have nothing but LogData. It is still a batch.
509 empty_batch = false;
7c673cae
FG
510 break;
511 case kTypeBeginPrepareXID:
512 assert(content_flags_.load(std::memory_order_relaxed) &
513 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
514 handler->MarkBeginPrepare();
11fdf7f2
TL
515 empty_batch = false;
516 if (!handler->WriteAfterCommit()) {
517 s = Status::NotSupported(
518 "WriteCommitted txn tag when write_after_commit_ is disabled (in "
519 "WritePrepared/WriteUnprepared mode). If it is not due to "
520 "corruption, the WAL must be emptied before changing the "
521 "WritePolicy.");
522 }
523 if (handler->WriteBeforePrepare()) {
524 s = Status::NotSupported(
525 "WriteCommitted txn tag when write_before_prepare_ is enabled "
526 "(in WriteUnprepared mode). If it is not due to corruption, the "
527 "WAL must be emptied before changing the WritePolicy.");
528 }
529 break;
530 case kTypeBeginPersistedPrepareXID:
531 assert(content_flags_.load(std::memory_order_relaxed) &
532 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
533 handler->MarkBeginPrepare();
534 empty_batch = false;
535 if (handler->WriteAfterCommit()) {
536 s = Status::NotSupported(
537 "WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
538 "is enabled (in default WriteCommitted mode). If it is not due "
539 "to corruption, the WAL must be emptied before changing the "
540 "WritePolicy.");
541 }
542 break;
543 case kTypeBeginUnprepareXID:
544 assert(content_flags_.load(std::memory_order_relaxed) &
545 (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
546 handler->MarkBeginPrepare(true /* unprepared */);
547 empty_batch = false;
548 if (handler->WriteAfterCommit()) {
549 s = Status::NotSupported(
550 "WriteUnprepared txn tag when write_after_commit_ is enabled (in "
551 "default WriteCommitted mode). If it is not due to corruption, "
552 "the WAL must be emptied before changing the WritePolicy.");
553 }
554 if (!handler->WriteBeforePrepare()) {
555 s = Status::NotSupported(
556 "WriteUnprepared txn tag when write_before_prepare_ is disabled "
557 "(in WriteCommitted/WritePrepared mode). If it is not due to "
558 "corruption, the WAL must be emptied before changing the "
559 "WritePolicy.");
560 }
7c673cae
FG
561 break;
562 case kTypeEndPrepareXID:
563 assert(content_flags_.load(std::memory_order_relaxed) &
564 (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
565 handler->MarkEndPrepare(xid);
11fdf7f2 566 empty_batch = true;
7c673cae
FG
567 break;
568 case kTypeCommitXID:
569 assert(content_flags_.load(std::memory_order_relaxed) &
570 (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
571 handler->MarkCommit(xid);
11fdf7f2 572 empty_batch = true;
7c673cae
FG
573 break;
574 case kTypeRollbackXID:
575 assert(content_flags_.load(std::memory_order_relaxed) &
576 (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
577 handler->MarkRollback(xid);
11fdf7f2 578 empty_batch = true;
7c673cae
FG
579 break;
580 case kTypeNoop:
11fdf7f2
TL
581 handler->MarkNoop(empty_batch);
582 empty_batch = true;
7c673cae
FG
583 break;
584 default:
585 return Status::Corruption("unknown WriteBatch tag");
586 }
587 }
588 if (!s.ok()) {
589 return s;
590 }
494da23a 591 if (handler_continue && found != WriteBatchInternal::Count(this)) {
7c673cae
FG
592 return Status::Corruption("WriteBatch has wrong count");
593 } else {
594 return Status::OK();
595 }
596}
597
11fdf7f2
TL
598bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
599 return b->is_latest_persistent_state_;
600}
601
602void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
603 b->is_latest_persistent_state_ = true;
604}
605
7c673cae
FG
606int WriteBatchInternal::Count(const WriteBatch* b) {
607 return DecodeFixed32(b->rep_.data() + 8);
608}
609
610void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
611 EncodeFixed32(&b->rep_[8], n);
612}
613
614SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
615 return SequenceNumber(DecodeFixed64(b->rep_.data()));
616}
617
618void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
619 EncodeFixed64(&b->rep_[0], seq);
620}
621
11fdf7f2 622size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) {
7c673cae
FG
623 return WriteBatchInternal::kHeader;
624}
625
626Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
627 const Slice& key, const Slice& value) {
11fdf7f2
TL
628 if (key.size() > size_t{port::kMaxUint32}) {
629 return Status::InvalidArgument("key is too large");
630 }
631 if (value.size() > size_t{port::kMaxUint32}) {
632 return Status::InvalidArgument("value is too large");
633 }
634
7c673cae
FG
635 LocalSavePoint save(b);
636 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
637 if (column_family_id == 0) {
638 b->rep_.push_back(static_cast<char>(kTypeValue));
639 } else {
640 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
641 PutVarint32(&b->rep_, column_family_id);
642 }
643 PutLengthPrefixedSlice(&b->rep_, key);
644 PutLengthPrefixedSlice(&b->rep_, value);
645 b->content_flags_.store(
646 b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
647 std::memory_order_relaxed);
648 return save.commit();
649}
650
651Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
652 const Slice& value) {
653 return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
654 value);
655}
656
11fdf7f2
TL
657Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
658 const SliceParts& value) {
659 size_t total_key_bytes = 0;
660 for (int i = 0; i < key.num_parts; ++i) {
661 total_key_bytes += key.parts[i].size();
662 }
663 if (total_key_bytes >= size_t{port::kMaxUint32}) {
664 return Status::InvalidArgument("key is too large");
665 }
666
667 size_t total_value_bytes = 0;
668 for (int i = 0; i < value.num_parts; ++i) {
669 total_value_bytes += value.parts[i].size();
670 }
671 if (total_value_bytes >= size_t{port::kMaxUint32}) {
672 return Status::InvalidArgument("value is too large");
673 }
674 return Status::OK();
675}
676
7c673cae
FG
677Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
678 const SliceParts& key, const SliceParts& value) {
11fdf7f2
TL
679 Status s = CheckSlicePartsLength(key, value);
680 if (!s.ok()) {
681 return s;
682 }
683
7c673cae
FG
684 LocalSavePoint save(b);
685 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
686 if (column_family_id == 0) {
687 b->rep_.push_back(static_cast<char>(kTypeValue));
688 } else {
689 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
690 PutVarint32(&b->rep_, column_family_id);
691 }
692 PutLengthPrefixedSliceParts(&b->rep_, key);
693 PutLengthPrefixedSliceParts(&b->rep_, value);
694 b->content_flags_.store(
695 b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
696 std::memory_order_relaxed);
697 return save.commit();
698}
699
700Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
701 const SliceParts& value) {
702 return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
703 value);
704}
705
706Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
707 b->rep_.push_back(static_cast<char>(kTypeNoop));
708 return Status::OK();
709}
710
11fdf7f2
TL
711Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
712 bool write_after_commit,
713 bool unprepared_batch) {
7c673cae
FG
714 // a manually constructed batch can only contain one prepare section
715 assert(b->rep_[12] == static_cast<char>(kTypeNoop));
716
717 // all savepoints up to this point are cleared
718 if (b->save_points_ != nullptr) {
719 while (!b->save_points_->stack.empty()) {
720 b->save_points_->stack.pop();
721 }
722 }
723
724 // rewrite noop as begin marker
11fdf7f2
TL
725 b->rep_[12] = static_cast<char>(
726 write_after_commit ? kTypeBeginPrepareXID
727 : (unprepared_batch ? kTypeBeginUnprepareXID
728 : kTypeBeginPersistedPrepareXID));
7c673cae
FG
729 b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
730 PutLengthPrefixedSlice(&b->rep_, xid);
731 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
732 ContentFlags::HAS_END_PREPARE |
733 ContentFlags::HAS_BEGIN_PREPARE,
734 std::memory_order_relaxed);
11fdf7f2
TL
735 if (unprepared_batch) {
736 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
737 ContentFlags::HAS_BEGIN_UNPREPARE,
738 std::memory_order_relaxed);
739 }
7c673cae
FG
740 return Status::OK();
741}
742
743Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
744 b->rep_.push_back(static_cast<char>(kTypeCommitXID));
745 PutLengthPrefixedSlice(&b->rep_, xid);
746 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
747 ContentFlags::HAS_COMMIT,
748 std::memory_order_relaxed);
749 return Status::OK();
750}
751
752Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
753 b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
754 PutLengthPrefixedSlice(&b->rep_, xid);
755 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
756 ContentFlags::HAS_ROLLBACK,
757 std::memory_order_relaxed);
758 return Status::OK();
759}
760
761Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
762 const Slice& key) {
763 LocalSavePoint save(b);
764 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
765 if (column_family_id == 0) {
766 b->rep_.push_back(static_cast<char>(kTypeDeletion));
767 } else {
768 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
769 PutVarint32(&b->rep_, column_family_id);
770 }
771 PutLengthPrefixedSlice(&b->rep_, key);
772 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
773 ContentFlags::HAS_DELETE,
774 std::memory_order_relaxed);
775 return save.commit();
776}
777
778Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
779 return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
780 key);
781}
782
783Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
784 const SliceParts& key) {
785 LocalSavePoint save(b);
786 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
787 if (column_family_id == 0) {
788 b->rep_.push_back(static_cast<char>(kTypeDeletion));
789 } else {
790 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
791 PutVarint32(&b->rep_, column_family_id);
792 }
793 PutLengthPrefixedSliceParts(&b->rep_, key);
794 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
795 ContentFlags::HAS_DELETE,
796 std::memory_order_relaxed);
797 return save.commit();
798}
799
800Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
801 const SliceParts& key) {
802 return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
803 key);
804}
805
806Status WriteBatchInternal::SingleDelete(WriteBatch* b,
807 uint32_t column_family_id,
808 const Slice& key) {
809 LocalSavePoint save(b);
810 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
811 if (column_family_id == 0) {
812 b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
813 } else {
814 b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
815 PutVarint32(&b->rep_, column_family_id);
816 }
817 PutLengthPrefixedSlice(&b->rep_, key);
818 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
819 ContentFlags::HAS_SINGLE_DELETE,
820 std::memory_order_relaxed);
821 return save.commit();
822}
823
824Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
825 const Slice& key) {
826 return WriteBatchInternal::SingleDelete(
827 this, GetColumnFamilyID(column_family), key);
828}
829
830Status WriteBatchInternal::SingleDelete(WriteBatch* b,
831 uint32_t column_family_id,
832 const SliceParts& key) {
833 LocalSavePoint save(b);
834 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
835 if (column_family_id == 0) {
836 b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
837 } else {
838 b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
839 PutVarint32(&b->rep_, column_family_id);
840 }
841 PutLengthPrefixedSliceParts(&b->rep_, key);
842 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
843 ContentFlags::HAS_SINGLE_DELETE,
844 std::memory_order_relaxed);
845 return save.commit();
846}
847
848Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
849 const SliceParts& key) {
850 return WriteBatchInternal::SingleDelete(
851 this, GetColumnFamilyID(column_family), key);
852}
853
854Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
855 const Slice& begin_key,
856 const Slice& end_key) {
857 LocalSavePoint save(b);
858 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
859 if (column_family_id == 0) {
860 b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
861 } else {
862 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
863 PutVarint32(&b->rep_, column_family_id);
864 }
865 PutLengthPrefixedSlice(&b->rep_, begin_key);
866 PutLengthPrefixedSlice(&b->rep_, end_key);
867 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
868 ContentFlags::HAS_DELETE_RANGE,
869 std::memory_order_relaxed);
870 return save.commit();
871}
872
873Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
874 const Slice& begin_key, const Slice& end_key) {
875 return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
876 begin_key, end_key);
877}
878
879Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
880 const SliceParts& begin_key,
881 const SliceParts& end_key) {
882 LocalSavePoint save(b);
883 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
884 if (column_family_id == 0) {
885 b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
886 } else {
887 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
888 PutVarint32(&b->rep_, column_family_id);
889 }
890 PutLengthPrefixedSliceParts(&b->rep_, begin_key);
891 PutLengthPrefixedSliceParts(&b->rep_, end_key);
892 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
893 ContentFlags::HAS_DELETE_RANGE,
894 std::memory_order_relaxed);
895 return save.commit();
896}
897
898Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
899 const SliceParts& begin_key,
900 const SliceParts& end_key) {
901 return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
902 begin_key, end_key);
903}
904
905Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
906 const Slice& key, const Slice& value) {
11fdf7f2
TL
907 if (key.size() > size_t{port::kMaxUint32}) {
908 return Status::InvalidArgument("key is too large");
909 }
910 if (value.size() > size_t{port::kMaxUint32}) {
911 return Status::InvalidArgument("value is too large");
912 }
913
7c673cae
FG
914 LocalSavePoint save(b);
915 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
916 if (column_family_id == 0) {
917 b->rep_.push_back(static_cast<char>(kTypeMerge));
918 } else {
919 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
920 PutVarint32(&b->rep_, column_family_id);
921 }
922 PutLengthPrefixedSlice(&b->rep_, key);
923 PutLengthPrefixedSlice(&b->rep_, value);
924 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
925 ContentFlags::HAS_MERGE,
926 std::memory_order_relaxed);
927 return save.commit();
928}
929
930Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
931 const Slice& value) {
932 return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
933 value);
934}
935
936Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
937 const SliceParts& key,
938 const SliceParts& value) {
11fdf7f2
TL
939 Status s = CheckSlicePartsLength(key, value);
940 if (!s.ok()) {
941 return s;
942 }
943
7c673cae
FG
944 LocalSavePoint save(b);
945 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
946 if (column_family_id == 0) {
947 b->rep_.push_back(static_cast<char>(kTypeMerge));
948 } else {
949 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
950 PutVarint32(&b->rep_, column_family_id);
951 }
952 PutLengthPrefixedSliceParts(&b->rep_, key);
953 PutLengthPrefixedSliceParts(&b->rep_, value);
954 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
955 ContentFlags::HAS_MERGE,
956 std::memory_order_relaxed);
957 return save.commit();
958}
959
960Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
961 const SliceParts& key, const SliceParts& value) {
962 return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
963 value);
964}
965
11fdf7f2
TL
966Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
967 uint32_t column_family_id,
968 const Slice& key, const Slice& value) {
969 LocalSavePoint save(b);
970 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
971 if (column_family_id == 0) {
972 b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
973 } else {
974 b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
975 PutVarint32(&b->rep_, column_family_id);
976 }
977 PutLengthPrefixedSlice(&b->rep_, key);
978 PutLengthPrefixedSlice(&b->rep_, value);
979 b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
980 ContentFlags::HAS_BLOB_INDEX,
981 std::memory_order_relaxed);
982 return save.commit();
983}
984
7c673cae
FG
985Status WriteBatch::PutLogData(const Slice& blob) {
986 LocalSavePoint save(this);
987 rep_.push_back(static_cast<char>(kTypeLogData));
988 PutLengthPrefixedSlice(&rep_, blob);
989 return save.commit();
990}
991
992void WriteBatch::SetSavePoint() {
993 if (save_points_ == nullptr) {
994 save_points_ = new SavePoints();
995 }
996 // Record length and count of current batch of writes.
997 save_points_->stack.push(SavePoint(
998 GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
999}
1000
1001Status WriteBatch::RollbackToSavePoint() {
1002 if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1003 return Status::NotFound();
1004 }
1005
1006 // Pop the most recent savepoint off the stack
1007 SavePoint savepoint = save_points_->stack.top();
1008 save_points_->stack.pop();
1009
1010 assert(savepoint.size <= rep_.size());
1011 assert(savepoint.count <= Count());
1012
1013 if (savepoint.size == rep_.size()) {
1014 // No changes to rollback
1015 } else if (savepoint.size == 0) {
1016 // Rollback everything
1017 Clear();
1018 } else {
1019 rep_.resize(savepoint.size);
1020 WriteBatchInternal::SetCount(this, savepoint.count);
1021 content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
1022 }
1023
1024 return Status::OK();
1025}
1026
11fdf7f2
TL
1027Status WriteBatch::PopSavePoint() {
1028 if (save_points_ == nullptr || save_points_->stack.size() == 0) {
1029 return Status::NotFound();
1030 }
1031
1032 // Pop the most recent savepoint off the stack
1033 save_points_->stack.pop();
1034
1035 return Status::OK();
1036}
1037
7c673cae
FG
1038class MemTableInserter : public WriteBatch::Handler {
1039
1040 SequenceNumber sequence_;
1041 ColumnFamilyMemTables* const cf_mems_;
1042 FlushScheduler* const flush_scheduler_;
1043 const bool ignore_missing_column_families_;
1044 const uint64_t recovering_log_number_;
1045 // log number that all Memtables inserted into should reference
1046 uint64_t log_number_ref_;
1047 DBImpl* db_;
1048 const bool concurrent_memtable_writes_;
1049 bool post_info_created_;
1050
1051 bool* has_valid_writes_;
1052 // On some (!) platforms just default creating
1053 // a map is too expensive in the Write() path as they
1054 // cause memory allocations though unused.
1055 // Make creation optional but do not incur
494da23a 1056 // std::unique_ptr additional allocation
11fdf7f2
TL
1057 using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
1058 using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
7c673cae
FG
1059 PostMapType mem_post_info_map_;
1060 // current recovered transaction we are rebuilding (recovery)
1061 WriteBatch* rebuilding_trx_;
11fdf7f2
TL
1062 SequenceNumber rebuilding_trx_seq_;
1063 // Increase seq number once per each write batch. Otherwise increase it once
1064 // per key.
1065 bool seq_per_batch_;
1066 // Whether the memtable write will be done only after the commit
1067 bool write_after_commit_;
1068 // Whether memtable write can be done before prepare
1069 bool write_before_prepare_;
1070 // Whether this batch was unprepared or not
1071 bool unprepared_batch_;
1072 using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
1073 DupDetector duplicate_detector_;
1074 bool dup_dectector_on_;
7c673cae
FG
1075
1076 MemPostInfoMap& GetPostMap() {
1077 assert(concurrent_memtable_writes_);
1078 if(!post_info_created_) {
1079 new (&mem_post_info_map_) MemPostInfoMap();
1080 post_info_created_ = true;
1081 }
1082 return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
1083 }
1084
11fdf7f2
TL
1085 bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
1086 assert(!write_after_commit_);
1087 assert(rebuilding_trx_ != nullptr);
1088 if (!dup_dectector_on_) {
1089 new (&duplicate_detector_) DuplicateDetector(db_);
1090 dup_dectector_on_ = true;
1091 }
1092 return reinterpret_cast<DuplicateDetector*>
1093 (&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
1094 }
1095
1096 protected:
494da23a
TL
1097 bool WriteBeforePrepare() const override { return write_before_prepare_; }
1098 bool WriteAfterCommit() const override { return write_after_commit_; }
11fdf7f2
TL
1099
1100 public:
7c673cae 1101 // cf_mems should not be shared with concurrent inserters
11fdf7f2 1102 MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
7c673cae
FG
1103 FlushScheduler* flush_scheduler,
1104 bool ignore_missing_column_families,
1105 uint64_t recovering_log_number, DB* db,
1106 bool concurrent_memtable_writes,
11fdf7f2
TL
1107 bool* has_valid_writes = nullptr, bool seq_per_batch = false,
1108 bool batch_per_txn = true)
1109 : sequence_(_sequence),
7c673cae
FG
1110 cf_mems_(cf_mems),
1111 flush_scheduler_(flush_scheduler),
1112 ignore_missing_column_families_(ignore_missing_column_families),
1113 recovering_log_number_(recovering_log_number),
1114 log_number_ref_(0),
1115 db_(reinterpret_cast<DBImpl*>(db)),
1116 concurrent_memtable_writes_(concurrent_memtable_writes),
1117 post_info_created_(false),
1118 has_valid_writes_(has_valid_writes),
11fdf7f2
TL
1119 rebuilding_trx_(nullptr),
1120 rebuilding_trx_seq_(0),
1121 seq_per_batch_(seq_per_batch),
1122 // Write after commit currently uses one seq per key (instead of per
1123 // batch). So seq_per_batch being false indicates write_after_commit
1124 // approach.
1125 write_after_commit_(!seq_per_batch),
1126 // WriteUnprepared can write WriteBatches per transaction, so
1127 // batch_per_txn being false indicates write_before_prepare.
1128 write_before_prepare_(!batch_per_txn),
1129 unprepared_batch_(false),
1130 duplicate_detector_(),
1131 dup_dectector_on_(false) {
7c673cae
FG
1132 assert(cf_mems_);
1133 }
1134
494da23a 1135 ~MemTableInserter() override {
11fdf7f2
TL
1136 if (dup_dectector_on_) {
1137 reinterpret_cast<DuplicateDetector*>
1138 (&duplicate_detector_)->~DuplicateDetector();
1139 }
7c673cae
FG
1140 if (post_info_created_) {
1141 reinterpret_cast<MemPostInfoMap*>
1142 (&mem_post_info_map_)->~MemPostInfoMap();
1143 }
11fdf7f2 1144 delete rebuilding_trx_;
7c673cae
FG
1145 }
1146
1147 MemTableInserter(const MemTableInserter&) = delete;
1148 MemTableInserter& operator=(const MemTableInserter&) = delete;
1149
11fdf7f2
TL
1150 // The batch seq is regularly restarted; In normal mode it is set when
1151 // MemTableInserter is constructed in the write thread and in recovery mode it
1152 // is set when a batch, which is tagged with seq, is read from the WAL.
1153 // Within a sequenced batch, which could be a merge of multiple batches, we
1154 // have two policies to advance the seq: i) seq_per_key (default) and ii)
1155 // seq_per_batch. To implement the latter we need to mark the boundary between
1156 // the individual batches. The approach is this: 1) Use the terminating
1157 // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID,
1158 // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a
1159 // natural boundary marker.
1160 void MaybeAdvanceSeq(bool batch_boundry = false) {
1161 if (batch_boundry == seq_per_batch_) {
1162 sequence_++;
1163 }
1164 }
1165
7c673cae
FG
1166 void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
1167
11fdf7f2 1168 SequenceNumber sequence() const { return sequence_; }
7c673cae
FG
1169
1170 void PostProcess() {
1171 assert(concurrent_memtable_writes_);
1172 // If post info was not created there is nothing
1173 // to process and no need to create on demand
1174 if(post_info_created_) {
1175 for (auto& pair : GetPostMap()) {
1176 pair.first->BatchPostProcess(pair.second);
1177 }
1178 }
1179 }
1180
1181 bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
1182 // If we are in a concurrent mode, it is the caller's responsibility
1183 // to clone the original ColumnFamilyMemTables so that each thread
1184 // has its own instance. Otherwise, it must be guaranteed that there
1185 // is no concurrent access
1186 bool found = cf_mems_->Seek(column_family_id);
1187 if (!found) {
1188 if (ignore_missing_column_families_) {
1189 *s = Status::OK();
1190 } else {
1191 *s = Status::InvalidArgument(
1192 "Invalid column family specified in write batch");
1193 }
1194 return false;
1195 }
1196 if (recovering_log_number_ != 0 &&
1197 recovering_log_number_ < cf_mems_->GetLogNumber()) {
1198 // This is true only in recovery environment (recovering_log_number_ is
1199 // always 0 in
1200 // non-recovery, regular write code-path)
1201 // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
1202 // column
1203 // family already contains updates from this log. We can't apply updates
1204 // twice because of update-in-place or merge workloads -- ignore the
1205 // update
1206 *s = Status::OK();
1207 return false;
1208 }
1209
1210 if (has_valid_writes_ != nullptr) {
1211 *has_valid_writes_ = true;
1212 }
1213
1214 if (log_number_ref_ > 0) {
1215 cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
1216 }
1217
1218 return true;
1219 }
1220
11fdf7f2
TL
1221 Status PutCFImpl(uint32_t column_family_id, const Slice& key,
1222 const Slice& value, ValueType value_type) {
1223 // optimize for non-recovery mode
1224 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
7c673cae
FG
1225 WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
1226 return Status::OK();
11fdf7f2 1227 // else insert the values to the memtable right away
7c673cae
FG
1228 }
1229
1230 Status seek_status;
11fdf7f2
TL
1231 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1232 bool batch_boundry = false;
1233 if (rebuilding_trx_ != nullptr) {
1234 assert(!write_after_commit_);
1235 // The CF is probably flushed and hence no need for insert but we still
1236 // need to keep track of the keys for upcoming rollback/commit.
1237 WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
1238 batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1239 }
1240 MaybeAdvanceSeq(batch_boundry);
7c673cae
FG
1241 return seek_status;
1242 }
11fdf7f2 1243 Status ret_status;
7c673cae
FG
1244
1245 MemTable* mem = cf_mems_->GetMemTable();
11fdf7f2
TL
1246 auto* moptions = mem->GetImmutableMemTableOptions();
1247 // inplace_update_support is inconsistent with snapshots, and therefore with
1248 // any kind of transactions including the ones that use seq_per_batch
1249 assert(!seq_per_batch_ || !moptions->inplace_update_support);
7c673cae 1250 if (!moptions->inplace_update_support) {
11fdf7f2
TL
1251 bool mem_res =
1252 mem->Add(sequence_, value_type, key, value,
1253 concurrent_memtable_writes_, get_post_process_info(mem));
1254 if (UNLIKELY(!mem_res)) {
1255 assert(seq_per_batch_);
1256 ret_status = Status::TryAgain("key+seq exists");
1257 const bool BATCH_BOUNDRY = true;
1258 MaybeAdvanceSeq(BATCH_BOUNDRY);
1259 }
7c673cae
FG
1260 } else if (moptions->inplace_callback == nullptr) {
1261 assert(!concurrent_memtable_writes_);
1262 mem->Update(sequence_, key, value);
7c673cae
FG
1263 } else {
1264 assert(!concurrent_memtable_writes_);
1265 if (mem->UpdateCallback(sequence_, key, value)) {
1266 } else {
1267 // key not found in memtable. Do sst get, update, add
1268 SnapshotImpl read_from_snapshot;
1269 read_from_snapshot.number_ = sequence_;
1270 ReadOptions ropts;
11fdf7f2
TL
1271 // it's going to be overwritten for sure, so no point caching data block
1272 // containing the old version
1273 ropts.fill_cache = false;
7c673cae
FG
1274 ropts.snapshot = &read_from_snapshot;
1275
1276 std::string prev_value;
1277 std::string merged_value;
1278
1279 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1280 Status s = Status::NotSupported();
1281 if (db_ != nullptr && recovering_log_number_ == 0) {
1282 if (cf_handle == nullptr) {
1283 cf_handle = db_->DefaultColumnFamily();
1284 }
1285 s = db_->Get(ropts, cf_handle, key, &prev_value);
1286 }
1287
1288 char* prev_buffer = const_cast<char*>(prev_value.c_str());
1289 uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1290 auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr,
1291 s.ok() ? &prev_size : nullptr,
1292 value, &merged_value);
1293 if (status == UpdateStatus::UPDATED_INPLACE) {
1294 // prev_value is updated in-place with final value.
11fdf7f2
TL
1295 bool mem_res __attribute__((__unused__));
1296 mem_res = mem->Add(
1297 sequence_, value_type, key, Slice(prev_buffer, prev_size));
1298 assert(mem_res);
7c673cae
FG
1299 RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1300 } else if (status == UpdateStatus::UPDATED) {
1301 // merged_value contains the final value.
11fdf7f2
TL
1302 bool mem_res __attribute__((__unused__));
1303 mem_res =
1304 mem->Add(sequence_, value_type, key, Slice(merged_value));
1305 assert(mem_res);
7c673cae
FG
1306 RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
1307 }
1308 }
1309 }
11fdf7f2
TL
1310 // optimize for non-recovery mode
1311 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1312 assert(!write_after_commit_);
1313 // If the ret_status is TryAgain then let the next try to add the ky to
1314 // the rebuilding transaction object.
1315 WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
1316 }
1317 // Since all Puts are logged in transaction logs (if enabled), always bump
7c673cae
FG
1318 // sequence number. Even if the update eventually fails and does not result
1319 // in memtable add/update.
11fdf7f2 1320 MaybeAdvanceSeq();
7c673cae 1321 CheckMemtableFull();
11fdf7f2
TL
1322 return ret_status;
1323 }
1324
494da23a
TL
1325 Status PutCF(uint32_t column_family_id, const Slice& key,
1326 const Slice& value) override {
11fdf7f2 1327 return PutCFImpl(column_family_id, key, value, kTypeValue);
7c673cae
FG
1328 }
1329
11fdf7f2 1330 Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
7c673cae 1331 const Slice& value, ValueType delete_type) {
11fdf7f2 1332 Status ret_status;
7c673cae 1333 MemTable* mem = cf_mems_->GetMemTable();
11fdf7f2
TL
1334 bool mem_res =
1335 mem->Add(sequence_, delete_type, key, value,
1336 concurrent_memtable_writes_, get_post_process_info(mem));
1337 if (UNLIKELY(!mem_res)) {
1338 assert(seq_per_batch_);
1339 ret_status = Status::TryAgain("key+seq exists");
1340 const bool BATCH_BOUNDRY = true;
1341 MaybeAdvanceSeq(BATCH_BOUNDRY);
1342 }
1343 MaybeAdvanceSeq();
7c673cae 1344 CheckMemtableFull();
11fdf7f2 1345 return ret_status;
7c673cae
FG
1346 }
1347
494da23a 1348 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
11fdf7f2
TL
1349 // optimize for non-recovery mode
1350 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
7c673cae
FG
1351 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1352 return Status::OK();
11fdf7f2 1353 // else insert the values to the memtable right away
7c673cae
FG
1354 }
1355
1356 Status seek_status;
11fdf7f2
TL
1357 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1358 bool batch_boundry = false;
1359 if (rebuilding_trx_ != nullptr) {
1360 assert(!write_after_commit_);
1361 // The CF is probably flushed and hence no need for insert but we still
1362 // need to keep track of the keys for upcoming rollback/commit.
1363 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1364 batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1365 }
1366 MaybeAdvanceSeq(batch_boundry);
7c673cae
FG
1367 return seek_status;
1368 }
1369
11fdf7f2
TL
1370 auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
1371 // optimize for non-recovery mode
1372 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1373 assert(!write_after_commit_);
1374 // If the ret_status is TryAgain then let the next try to add the ky to
1375 // the rebuilding transaction object.
1376 WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
1377 }
1378 return ret_status;
7c673cae
FG
1379 }
1380
494da23a 1381 Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
11fdf7f2
TL
1382 // optimize for non-recovery mode
1383 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
7c673cae
FG
1384 WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
1385 return Status::OK();
11fdf7f2 1386 // else insert the values to the memtable right away
7c673cae
FG
1387 }
1388
1389 Status seek_status;
11fdf7f2
TL
1390 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1391 bool batch_boundry = false;
1392 if (rebuilding_trx_ != nullptr) {
1393 assert(!write_after_commit_);
1394 // The CF is probably flushed and hence no need for insert but we still
1395 // need to keep track of the keys for upcoming rollback/commit.
1396 WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
1397 key);
1398 batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1399 }
1400 MaybeAdvanceSeq(batch_boundry);
7c673cae
FG
1401 return seek_status;
1402 }
1403
11fdf7f2
TL
1404 auto ret_status =
1405 DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
1406 // optimize for non-recovery mode
1407 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1408 assert(!write_after_commit_);
1409 // If the ret_status is TryAgain then let the next try to add the ky to
1410 // the rebuilding transaction object.
1411 WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
1412 }
1413 return ret_status;
7c673cae
FG
1414 }
1415
494da23a
TL
1416 Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
1417 const Slice& end_key) override {
11fdf7f2
TL
1418 // optimize for non-recovery mode
1419 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
7c673cae
FG
1420 WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1421 begin_key, end_key);
1422 return Status::OK();
11fdf7f2 1423 // else insert the values to the memtable right away
7c673cae
FG
1424 }
1425
1426 Status seek_status;
11fdf7f2
TL
1427 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1428 bool batch_boundry = false;
1429 if (rebuilding_trx_ != nullptr) {
1430 assert(!write_after_commit_);
1431 // The CF is probably flushed and hence no need for insert but we still
1432 // need to keep track of the keys for upcoming rollback/commit.
1433 WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1434 begin_key, end_key);
1435 // TODO(myabandeh): when transactional DeleteRange support is added,
1436 // check if end_key must also be added.
1437 batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key);
1438 }
1439 MaybeAdvanceSeq(batch_boundry);
7c673cae
FG
1440 return seek_status;
1441 }
1442 if (db_ != nullptr) {
1443 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1444 if (cf_handle == nullptr) {
1445 cf_handle = db_->DefaultColumnFamily();
1446 }
1447 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cf_handle)->cfd();
1448 if (!cfd->is_delete_range_supported()) {
1449 return Status::NotSupported(
1450 std::string("DeleteRange not supported for table type ") +
1451 cfd->ioptions()->table_factory->Name() + " in CF " +
1452 cfd->GetName());
1453 }
1454 }
1455
11fdf7f2
TL
1456 auto ret_status =
1457 DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
1458 // optimize for non-recovery mode
1459 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1460 assert(!write_after_commit_);
1461 // If the ret_status is TryAgain then let the next try to add the ky to
1462 // the rebuilding transaction object.
1463 WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
1464 begin_key, end_key);
1465 }
1466 return ret_status;
7c673cae
FG
1467 }
1468
494da23a
TL
1469 Status MergeCF(uint32_t column_family_id, const Slice& key,
1470 const Slice& value) override {
7c673cae 1471 assert(!concurrent_memtable_writes_);
11fdf7f2
TL
1472 // optimize for non-recovery mode
1473 if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
7c673cae
FG
1474 WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
1475 return Status::OK();
11fdf7f2 1476 // else insert the values to the memtable right away
7c673cae
FG
1477 }
1478
1479 Status seek_status;
11fdf7f2
TL
1480 if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) {
1481 bool batch_boundry = false;
1482 if (rebuilding_trx_ != nullptr) {
1483 assert(!write_after_commit_);
1484 // The CF is probably flushed and hence no need for insert but we still
1485 // need to keep track of the keys for upcoming rollback/commit.
1486 WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
1487 value);
1488 batch_boundry = IsDuplicateKeySeq(column_family_id, key);
1489 }
1490 MaybeAdvanceSeq(batch_boundry);
7c673cae
FG
1491 return seek_status;
1492 }
1493
11fdf7f2 1494 Status ret_status;
7c673cae 1495 MemTable* mem = cf_mems_->GetMemTable();
11fdf7f2 1496 auto* moptions = mem->GetImmutableMemTableOptions();
7c673cae
FG
1497 bool perform_merge = false;
1498
1499 // If we pass DB through and options.max_successive_merges is hit
1500 // during recovery, Get() will be issued which will try to acquire
1501 // DB mutex and cause deadlock, as DB mutex is already held.
1502 // So we disable merge in recovery
1503 if (moptions->max_successive_merges > 0 && db_ != nullptr &&
1504 recovering_log_number_ == 0) {
1505 LookupKey lkey(key, sequence_);
1506
1507 // Count the number of successive merges at the head
1508 // of the key in the memtable
1509 size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
1510
1511 if (num_merges >= moptions->max_successive_merges) {
1512 perform_merge = true;
1513 }
1514 }
1515
1516 if (perform_merge) {
1517 // 1) Get the existing value
1518 std::string get_value;
1519
1520 // Pass in the sequence number so that we also include previous merge
1521 // operations in the same batch.
1522 SnapshotImpl read_from_snapshot;
1523 read_from_snapshot.number_ = sequence_;
1524 ReadOptions read_options;
1525 read_options.snapshot = &read_from_snapshot;
1526
1527 auto cf_handle = cf_mems_->GetColumnFamilyHandle();
1528 if (cf_handle == nullptr) {
1529 cf_handle = db_->DefaultColumnFamily();
1530 }
1531 db_->Get(read_options, cf_handle, key, &get_value);
1532 Slice get_value_slice = Slice(get_value);
1533
1534 // 2) Apply this merge
1535 auto merge_operator = moptions->merge_operator;
1536 assert(merge_operator);
1537
1538 std::string new_value;
1539
1540 Status merge_status = MergeHelper::TimedFullMerge(
1541 merge_operator, key, &get_value_slice, {value}, &new_value,
1542 moptions->info_log, moptions->statistics, Env::Default());
1543
1544 if (!merge_status.ok()) {
1545 // Failed to merge!
1546 // Store the delta in memtable
1547 perform_merge = false;
1548 } else {
1549 // 3) Add value to memtable
11fdf7f2
TL
1550 bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
1551 if (UNLIKELY(!mem_res)) {
1552 assert(seq_per_batch_);
1553 ret_status = Status::TryAgain("key+seq exists");
1554 const bool BATCH_BOUNDRY = true;
1555 MaybeAdvanceSeq(BATCH_BOUNDRY);
1556 }
7c673cae
FG
1557 }
1558 }
1559
1560 if (!perform_merge) {
1561 // Add merge operator to memtable
11fdf7f2
TL
1562 bool mem_res = mem->Add(sequence_, kTypeMerge, key, value);
1563 if (UNLIKELY(!mem_res)) {
1564 assert(seq_per_batch_);
1565 ret_status = Status::TryAgain("key+seq exists");
1566 const bool BATCH_BOUNDRY = true;
1567 MaybeAdvanceSeq(BATCH_BOUNDRY);
1568 }
7c673cae
FG
1569 }
1570
11fdf7f2
TL
1571 // optimize for non-recovery mode
1572 if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
1573 assert(!write_after_commit_);
1574 // If the ret_status is TryAgain then let the next try to add the ky to
1575 // the rebuilding transaction object.
1576 WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
1577 }
1578 MaybeAdvanceSeq();
7c673cae 1579 CheckMemtableFull();
11fdf7f2
TL
1580 return ret_status;
1581 }
1582
494da23a
TL
1583 Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
1584 const Slice& value) override {
11fdf7f2
TL
1585 // Same as PutCF except for value type.
1586 return PutCFImpl(column_family_id, key, value, kTypeBlobIndex);
7c673cae
FG
1587 }
1588
1589 void CheckMemtableFull() {
1590 if (flush_scheduler_ != nullptr) {
1591 auto* cfd = cf_mems_->current();
1592 assert(cfd != nullptr);
1593 if (cfd->mem()->ShouldScheduleFlush() &&
1594 cfd->mem()->MarkFlushScheduled()) {
1595 // MarkFlushScheduled only returns true if we are the one that
1596 // should take action, so no need to dedup further
1597 flush_scheduler_->ScheduleFlush(cfd);
1598 }
1599 }
1600 }
1601
11fdf7f2
TL
1602 // The write batch handler calls MarkBeginPrepare with unprepare set to true
1603 // if it encounters the kTypeBeginUnprepareXID marker.
1604 Status MarkBeginPrepare(bool unprepare) override {
7c673cae
FG
1605 assert(rebuilding_trx_ == nullptr);
1606 assert(db_);
1607
1608 if (recovering_log_number_ != 0) {
1609 // during recovery we rebuild a hollow transaction
1610 // from all encountered prepare sections of the wal
1611 if (db_->allow_2pc() == false) {
1612 return Status::NotSupported(
1613 "WAL contains prepared transactions. Open with "
1614 "TransactionDB::Open().");
1615 }
1616
1617 // we are now iterating through a prepared section
1618 rebuilding_trx_ = new WriteBatch();
11fdf7f2
TL
1619 rebuilding_trx_seq_ = sequence_;
1620 // We only call MarkBeginPrepare once per batch, and unprepared_batch_
1621 // is initialized to false by default.
1622 assert(!unprepared_batch_);
1623 unprepared_batch_ = unprepare;
1624
7c673cae
FG
1625 if (has_valid_writes_ != nullptr) {
1626 *has_valid_writes_ = true;
1627 }
7c673cae
FG
1628 }
1629
1630 return Status::OK();
1631 }
1632
1633 Status MarkEndPrepare(const Slice& name) override {
1634 assert(db_);
1635 assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
1636
1637 if (recovering_log_number_ != 0) {
1638 assert(db_->allow_2pc());
11fdf7f2
TL
1639 size_t batch_cnt =
1640 write_after_commit_
1641 ? 0 // 0 will disable further checks
1642 : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
7c673cae 1643 db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
11fdf7f2
TL
1644 rebuilding_trx_, rebuilding_trx_seq_,
1645 batch_cnt, unprepared_batch_);
7c673cae
FG
1646 rebuilding_trx_ = nullptr;
1647 } else {
1648 assert(rebuilding_trx_ == nullptr);
7c673cae 1649 }
11fdf7f2
TL
1650 const bool batch_boundry = true;
1651 MaybeAdvanceSeq(batch_boundry);
7c673cae
FG
1652
1653 return Status::OK();
1654 }
1655
11fdf7f2
TL
1656 Status MarkNoop(bool empty_batch) override {
1657 // A hack in pessimistic transaction could result into a noop at the start
1658 // of the write batch, that should be ignored.
1659 if (!empty_batch) {
1660 // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
1661 // a batch. This happens when write batch commits skipping the prepare
1662 // phase.
1663 const bool batch_boundry = true;
1664 MaybeAdvanceSeq(batch_boundry);
1665 }
1666 return Status::OK();
1667 }
1668
7c673cae
FG
1669 Status MarkCommit(const Slice& name) override {
1670 assert(db_);
1671
1672 Status s;
1673
1674 if (recovering_log_number_ != 0) {
1675 // in recovery when we encounter a commit marker
1676 // we lookup this transaction in our set of rebuilt transactions
1677 // and commit.
1678 auto trx = db_->GetRecoveredTransaction(name.ToString());
1679
11fdf7f2 1680 // the log containing the prepared section may have
7c673cae
FG
1681 // been released in the last incarnation because the
1682 // data was flushed to L0
1683 if (trx != nullptr) {
1684 // at this point individual CF lognumbers will prevent
1685 // duplicate re-insertion of values.
1686 assert(log_number_ref_ == 0);
11fdf7f2
TL
1687 if (write_after_commit_) {
1688 // write_after_commit_ can only have one batch in trx.
1689 assert(trx->batches_.size() == 1);
1690 const auto& batch_info = trx->batches_.begin()->second;
1691 // all inserts must reference this trx log number
1692 log_number_ref_ = batch_info.log_number_;
1693 s = batch_info.batch_->Iterate(this);
1694 log_number_ref_ = 0;
1695 }
1696 // else the values are already inserted before the commit
7c673cae
FG
1697
1698 if (s.ok()) {
1699 db_->DeleteRecoveredTransaction(name.ToString());
1700 }
1701 if (has_valid_writes_ != nullptr) {
1702 *has_valid_writes_ = true;
1703 }
1704 }
1705 } else {
11fdf7f2
TL
1706 // When writes are not delayed until commit, there is no disconnect
1707 // between a memtable write and the WAL that supports it. So the commit
1708 // need not reference any log as the only log to which it depends.
1709 assert(!write_after_commit_ || log_number_ref_ > 0);
7c673cae 1710 }
11fdf7f2
TL
1711 const bool batch_boundry = true;
1712 MaybeAdvanceSeq(batch_boundry);
7c673cae
FG
1713
1714 return s;
1715 }
1716
1717 Status MarkRollback(const Slice& name) override {
1718 assert(db_);
1719
1720 if (recovering_log_number_ != 0) {
1721 auto trx = db_->GetRecoveredTransaction(name.ToString());
1722
1723 // the log containing the transactions prep section
1724 // may have been released in the previous incarnation
1725 // because we knew it had been rolled back
1726 if (trx != nullptr) {
1727 db_->DeleteRecoveredTransaction(name.ToString());
1728 }
1729 } else {
1730 // in non recovery we simply ignore this tag
1731 }
1732
11fdf7f2
TL
1733 const bool batch_boundry = true;
1734 MaybeAdvanceSeq(batch_boundry);
1735
7c673cae
FG
1736 return Status::OK();
1737 }
1738
1739 private:
1740 MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
1741 if (!concurrent_memtable_writes_) {
1742 // No need to batch counters locally if we don't use concurrent mode.
1743 return nullptr;
1744 }
1745 return &GetPostMap()[mem];
1746 }
1747};
1748
1749// This function can only be called in these conditions:
1750// 1) During Recovery()
1751// 2) During Write(), in a single-threaded write thread
1752// 3) During Write(), in a concurrent context where memtables has been cloned
1753// The reason is that it calls memtables->Seek(), which has a stateful cache
1754Status WriteBatchInternal::InsertInto(
11fdf7f2 1755 WriteThread::WriteGroup& write_group, SequenceNumber sequence,
7c673cae
FG
1756 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
1757 bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
11fdf7f2
TL
1758 bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
1759 MemTableInserter inserter(
1760 sequence, memtables, flush_scheduler, ignore_missing_column_families,
1761 recovery_log_number, db, concurrent_memtable_writes,
1762 nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn);
1763 for (auto w : write_group) {
1764 if (w->CallbackFailed()) {
1765 continue;
1766 }
1767 w->sequence = inserter.sequence();
7c673cae 1768 if (!w->ShouldWriteToMemtable()) {
11fdf7f2
TL
1769 // In seq_per_batch_ mode this advances the seq by one.
1770 inserter.MaybeAdvanceSeq(true);
7c673cae
FG
1771 continue;
1772 }
11fdf7f2 1773 SetSequence(w->batch, inserter.sequence());
7c673cae
FG
1774 inserter.set_log_number_ref(w->log_ref);
1775 w->status = w->batch->Iterate(&inserter);
1776 if (!w->status.ok()) {
1777 return w->status;
1778 }
11fdf7f2
TL
1779 assert(!seq_per_batch || w->batch_cnt != 0);
1780 assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
7c673cae
FG
1781 }
1782 return Status::OK();
1783}
1784
11fdf7f2
TL
1785Status WriteBatchInternal::InsertInto(
1786 WriteThread::Writer* writer, SequenceNumber sequence,
1787 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
1788 bool ignore_missing_column_families, uint64_t log_number, DB* db,
1789 bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
1790 bool batch_per_txn) {
1791#ifdef NDEBUG
1792 (void)batch_cnt;
1793#endif
7c673cae 1794 assert(writer->ShouldWriteToMemtable());
11fdf7f2
TL
1795 MemTableInserter inserter(
1796 sequence, memtables, flush_scheduler, ignore_missing_column_families,
1797 log_number, db, concurrent_memtable_writes, nullptr /*has_valid_writes*/,
1798 seq_per_batch, batch_per_txn);
1799 SetSequence(writer->batch, sequence);
7c673cae
FG
1800 inserter.set_log_number_ref(writer->log_ref);
1801 Status s = writer->batch->Iterate(&inserter);
11fdf7f2
TL
1802 assert(!seq_per_batch || batch_cnt != 0);
1803 assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
7c673cae
FG
1804 if (concurrent_memtable_writes) {
1805 inserter.PostProcess();
1806 }
1807 return s;
1808}
1809
1810Status WriteBatchInternal::InsertInto(
1811 const WriteBatch* batch, ColumnFamilyMemTables* memtables,
1812 FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
1813 uint64_t log_number, DB* db, bool concurrent_memtable_writes,
11fdf7f2
TL
1814 SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch,
1815 bool batch_per_txn) {
1816 MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
1817 ignore_missing_column_families, log_number, db,
1818 concurrent_memtable_writes, has_valid_writes,
1819 seq_per_batch, batch_per_txn);
7c673cae 1820 Status s = batch->Iterate(&inserter);
11fdf7f2
TL
1821 if (next_seq != nullptr) {
1822 *next_seq = inserter.sequence();
7c673cae
FG
1823 }
1824 if (concurrent_memtable_writes) {
1825 inserter.PostProcess();
1826 }
1827 return s;
1828}
1829
1830Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
1831 assert(contents.size() >= WriteBatchInternal::kHeader);
1832 b->rep_.assign(contents.data(), contents.size());
1833 b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
1834 return Status::OK();
1835}
1836
1837Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
1838 const bool wal_only) {
1839 size_t src_len;
1840 int src_count;
1841 uint32_t src_flags;
1842
1843 const SavePoint& batch_end = src->GetWalTerminationPoint();
1844
1845 if (wal_only && !batch_end.is_cleared()) {
1846 src_len = batch_end.size - WriteBatchInternal::kHeader;
1847 src_count = batch_end.count;
1848 src_flags = batch_end.content_flags;
1849 } else {
1850 src_len = src->rep_.size() - WriteBatchInternal::kHeader;
1851 src_count = Count(src);
1852 src_flags = src->content_flags_.load(std::memory_order_relaxed);
1853 }
1854
1855 SetCount(dst, Count(dst) + src_count);
1856 assert(src->rep_.size() >= WriteBatchInternal::kHeader);
1857 dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
1858 dst->content_flags_.store(
1859 dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
1860 std::memory_order_relaxed);
1861 return Status::OK();
1862}
1863
1864size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
1865 size_t rightByteSize) {
1866 if (leftByteSize == 0 || rightByteSize == 0) {
1867 return leftByteSize + rightByteSize;
1868 } else {
1869 return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
1870 }
1871}
1872
1873} // namespace rocksdb