1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/version_edit.h"
12 #include "db/blob_index.h"
13 #include "db/version_set.h"
14 #include "logging/event_logger.h"
15 #include "rocksdb/slice.h"
16 #include "test_util/sync_point.h"
17 #include "util/coding.h"
18 #include "util/string_util.h"
20 namespace ROCKSDB_NAMESPACE
{
21 // The unknown file checksum.
22 const std::string
kUnknownFileChecksum("");
23 // The unknown sst file checksum function name.
24 const std::string
kUnknownFileChecksumFuncName("Unknown");
25 // Mask for an identified tag from the future which can be safely ignored.
26 const uint32_t kTagSafeIgnoreMask
= 1 << 13;
28 // Tag numbers for serialized VersionEdit. These numbers are written to
29 // disk and should not be changed. The number should be forward compatible so
30 // users can down-grade RocksDB safely. A future Tag is ignored by doing '&'
31 // between Tag and kTagSafeIgnoreMask field.
40 // 8 was used for large value refs
42 kMinLogNumberToKeep
= 10,
44 kDbId
= kTagSafeIgnoreMask
+ 1,
46 // these are new formats divergent from open source leveldb
49 kNewFile4
= 103, // 4th (the latest) format version of adding files
50 kColumnFamily
= 200, // specify column family for version edit
51 kColumnFamilyAdd
= 201,
52 kColumnFamilyDrop
= 202,
53 kMaxColumnFamily
= 203,
58 enum CustomTag
: uint32_t {
59 kTerminate
= 1, // The end of customized fields
61 // Since Manifest is not entirely currently forward-compatible, and the only
62 // forward-compatible part is the CutsomtTag of kNewFile, we currently encode
63 // kMinLogNumberToKeep as part of a CustomTag as a hack. This should be
64 // removed when manifest becomes forward-comptabile.
65 kMinLogNumberToKeepHack
= 3,
66 kOldestBlobFileNumber
= 4,
67 kOldestAncesterTime
= 5,
68 kFileCreationTime
= 6,
70 kFileChecksumFuncName
= 8,
73 // If this bit for the custom tag is set, opening DB should fail if
74 // we don't know this field.
75 uint32_t kCustomTagNonSafeIgnoreMask
= 1 << 6;
77 uint64_t PackFileNumberAndPathId(uint64_t number
, uint64_t path_id
) {
78 assert(number
<= kFileNumberMask
);
79 return number
| (path_id
* (kFileNumberMask
+ 1));
82 void FileMetaData::UpdateBoundaries(const Slice
& key
, const Slice
& value
,
84 ValueType value_type
) {
85 if (smallest
.size() == 0) {
86 smallest
.DecodeFrom(key
);
88 largest
.DecodeFrom(key
);
89 fd
.smallest_seqno
= std::min(fd
.smallest_seqno
, seqno
);
90 fd
.largest_seqno
= std::max(fd
.largest_seqno
, seqno
);
93 if (value_type
== kTypeBlobIndex
) {
95 const Status s
= blob_index
.DecodeFrom(value
);
100 if (blob_index
.IsInlined()) {
104 if (blob_index
.HasTTL()) {
108 // Paranoid check: this should not happen because BlobDB numbers the blob
109 // files starting from 1.
110 if (blob_index
.file_number() == kInvalidBlobFileNumber
) {
114 if (oldest_blob_file_number
== kInvalidBlobFileNumber
||
115 oldest_blob_file_number
> blob_index
.file_number()) {
116 oldest_blob_file_number
= blob_index
.file_number();
125 void VersionEdit::Clear() {
130 prev_log_number_
= 0;
131 next_file_number_
= 0;
132 max_column_family_
= 0;
133 min_log_number_to_keep_
= 0;
136 has_comparator_
= false;
137 has_log_number_
= false;
138 has_prev_log_number_
= false;
139 has_next_file_number_
= false;
140 has_max_column_family_
= false;
141 has_min_log_number_to_keep_
= false;
142 has_last_sequence_
= false;
143 deleted_files_
.clear();
146 is_column_family_add_
= false;
147 is_column_family_drop_
= false;
148 column_family_name_
.clear();
149 is_in_atomic_group_
= false;
150 remaining_entries_
= 0;
153 bool VersionEdit::EncodeTo(std::string
* dst
) const {
155 PutVarint32(dst
, kDbId
);
156 PutLengthPrefixedSlice(dst
, db_id_
);
158 if (has_comparator_
) {
159 PutVarint32(dst
, kComparator
);
160 PutLengthPrefixedSlice(dst
, comparator_
);
162 if (has_log_number_
) {
163 PutVarint32Varint64(dst
, kLogNumber
, log_number_
);
165 if (has_prev_log_number_
) {
166 PutVarint32Varint64(dst
, kPrevLogNumber
, prev_log_number_
);
168 if (has_next_file_number_
) {
169 PutVarint32Varint64(dst
, kNextFileNumber
, next_file_number_
);
171 if (has_max_column_family_
) {
172 PutVarint32Varint32(dst
, kMaxColumnFamily
, max_column_family_
);
174 if (has_last_sequence_
) {
175 PutVarint32Varint64(dst
, kLastSequence
, last_sequence_
);
177 for (const auto& deleted
: deleted_files_
) {
178 PutVarint32Varint32Varint64(dst
, kDeletedFile
, deleted
.first
/* level */,
179 deleted
.second
/* file number */);
182 bool min_log_num_written
= false;
183 for (size_t i
= 0; i
< new_files_
.size(); i
++) {
184 const FileMetaData
& f
= new_files_
[i
].second
;
185 if (!f
.smallest
.Valid() || !f
.largest
.Valid()) {
188 PutVarint32(dst
, kNewFile4
);
189 PutVarint32Varint64(dst
, new_files_
[i
].first
/* level */, f
.fd
.GetNumber());
190 PutVarint64(dst
, f
.fd
.GetFileSize());
191 PutLengthPrefixedSlice(dst
, f
.smallest
.Encode());
192 PutLengthPrefixedSlice(dst
, f
.largest
.Encode());
193 PutVarint64Varint64(dst
, f
.fd
.smallest_seqno
, f
.fd
.largest_seqno
);
194 // Customized fields' format:
195 // +-----------------------------+
196 // | 1st field's tag (varint32) |
197 // +-----------------------------+
198 // | 1st field's size (varint32) |
199 // +-----------------------------+
200 // | bytes for 1st field |
201 // | (based on size decoded) |
202 // +-----------------------------+
206 // +-----------------------------+
207 // | last field's size (varint32)|
208 // +-----------------------------+
209 // | bytes for last field |
210 // | (based on size decoded) |
211 // +-----------------------------+
212 // | terminating tag (varint32) |
213 // +-----------------------------+
215 // Customized encoding for fields:
216 // tag kPathId: 1 byte as path_id
217 // tag kNeedCompaction:
218 // now only can take one char value 1 indicating need-compaction
220 PutVarint32(dst
, CustomTag::kOldestAncesterTime
);
221 std::string varint_oldest_ancester_time
;
222 PutVarint64(&varint_oldest_ancester_time
, f
.oldest_ancester_time
);
223 TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:VarintOldestAncesterTime",
224 &varint_oldest_ancester_time
);
225 PutLengthPrefixedSlice(dst
, Slice(varint_oldest_ancester_time
));
227 PutVarint32(dst
, CustomTag::kFileCreationTime
);
228 std::string varint_file_creation_time
;
229 PutVarint64(&varint_file_creation_time
, f
.file_creation_time
);
230 TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:VarintFileCreationTime",
231 &varint_file_creation_time
);
232 PutLengthPrefixedSlice(dst
, Slice(varint_file_creation_time
));
234 PutVarint32(dst
, CustomTag::kFileChecksum
);
235 PutLengthPrefixedSlice(dst
, Slice(f
.file_checksum
));
237 PutVarint32(dst
, CustomTag::kFileChecksumFuncName
);
238 PutLengthPrefixedSlice(dst
, Slice(f
.file_checksum_func_name
));
240 if (f
.fd
.GetPathId() != 0) {
241 PutVarint32(dst
, CustomTag::kPathId
);
242 char p
= static_cast<char>(f
.fd
.GetPathId());
243 PutLengthPrefixedSlice(dst
, Slice(&p
, 1));
245 if (f
.marked_for_compaction
) {
246 PutVarint32(dst
, CustomTag::kNeedCompaction
);
247 char p
= static_cast<char>(1);
248 PutLengthPrefixedSlice(dst
, Slice(&p
, 1));
250 if (has_min_log_number_to_keep_
&& !min_log_num_written
) {
251 PutVarint32(dst
, CustomTag::kMinLogNumberToKeepHack
);
252 std::string varint_log_number
;
253 PutFixed64(&varint_log_number
, min_log_number_to_keep_
);
254 PutLengthPrefixedSlice(dst
, Slice(varint_log_number
));
255 min_log_num_written
= true;
257 if (f
.oldest_blob_file_number
!= kInvalidBlobFileNumber
) {
258 PutVarint32(dst
, CustomTag::kOldestBlobFileNumber
);
259 std::string oldest_blob_file_number
;
260 PutVarint64(&oldest_blob_file_number
, f
.oldest_blob_file_number
);
261 PutLengthPrefixedSlice(dst
, Slice(oldest_blob_file_number
));
263 TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
266 PutVarint32(dst
, CustomTag::kTerminate
);
269 // 0 is default and does not need to be explicitly written
270 if (column_family_
!= 0) {
271 PutVarint32Varint32(dst
, kColumnFamily
, column_family_
);
274 if (is_column_family_add_
) {
275 PutVarint32(dst
, kColumnFamilyAdd
);
276 PutLengthPrefixedSlice(dst
, Slice(column_family_name_
));
279 if (is_column_family_drop_
) {
280 PutVarint32(dst
, kColumnFamilyDrop
);
283 if (is_in_atomic_group_
) {
284 PutVarint32(dst
, kInAtomicGroup
);
285 PutVarint32(dst
, remaining_entries_
);
290 static bool GetInternalKey(Slice
* input
, InternalKey
* dst
) {
292 if (GetLengthPrefixedSlice(input
, &str
)) {
293 dst
->DecodeFrom(str
);
300 bool VersionEdit::GetLevel(Slice
* input
, int* level
, const char** /*msg*/) {
302 if (GetVarint32(input
, &v
)) {
304 if (max_level_
< *level
) {
313 static bool is_pseudo_new_file_record_pr3488(
315 const uint64_t number
,
316 const uint64_t file_size
,
317 InternalKey
& smallest
,
318 InternalKey
& largest
,
319 const bool has_min_log_number_to_keep_
) {
321 if (level
== 0 && number
== 0 && file_size
== 0 &&
322 has_min_log_number_to_keep_
) {
323 InternalKey
dummy_key(Slice("dummy_key"), 0ull, ValueType::kTypeValue
);
324 return (*smallest
.rep() == *dummy_key
.rep() &&
325 *largest
.rep() == *dummy_key
.rep());
331 const char* VersionEdit::DecodeNewFile4From(Slice
* input
) {
332 const char* msg
= nullptr;
336 uint32_t path_id
= 0;
337 uint64_t file_size
= 0;
338 SequenceNumber smallest_seqno
= 0;
339 SequenceNumber largest_seqno
= kMaxSequenceNumber
;
340 // Since this is the only forward-compatible part of the code, we hack new
341 // extension into this record. When we do, we set this boolean to distinguish
342 // the record from the normal NewFile records.
343 if (GetLevel(input
, &level
, &msg
) && GetVarint64(input
, &number
) &&
344 GetVarint64(input
, &file_size
) && GetInternalKey(input
, &f
.smallest
) &&
345 GetInternalKey(input
, &f
.largest
) &&
346 GetVarint64(input
, &smallest_seqno
) &&
347 GetVarint64(input
, &largest_seqno
)) {
348 // See comments in VersionEdit::EncodeTo() for format of customized fields
350 uint32_t custom_tag
= 0;
352 if (!GetVarint32(input
, &custom_tag
)) {
353 return "new-file4 custom field";
355 if (custom_tag
== kTerminate
) {
358 if (!GetLengthPrefixedSlice(input
, &field
)) {
359 return "new-file4 custom field length prefixed slice error";
361 switch (custom_tag
) {
363 if (field
.size() != 1) {
364 return "path_id field wrong size";
368 return "path_id wrong vaue";
371 case kOldestAncesterTime
:
372 if (!GetVarint64(&field
, &f
.oldest_ancester_time
)) {
373 return "invalid oldest ancester time";
376 case kFileCreationTime
:
377 if (!GetVarint64(&field
, &f
.file_creation_time
)) {
378 return "invalid file creation time";
382 f
.file_checksum
= field
.ToString();
384 case kFileChecksumFuncName
:
385 f
.file_checksum_func_name
= field
.ToString();
387 case kNeedCompaction
:
388 if (field
.size() != 1) {
389 return "need_compaction field wrong size";
391 f
.marked_for_compaction
= (field
[0] == 1);
393 case kMinLogNumberToKeepHack
:
394 // This is a hack to encode kMinLogNumberToKeep in a
395 // forward-compatible fashion.
396 if (!GetFixed64(&field
, &min_log_number_to_keep_
)) {
397 return "deleted log number malformatted";
399 has_min_log_number_to_keep_
= true;
401 case kOldestBlobFileNumber
:
402 if (!GetVarint64(&field
, &f
.oldest_blob_file_number
)) {
403 return "invalid oldest blob file number";
407 if ((custom_tag
& kCustomTagNonSafeIgnoreMask
) != 0) {
408 // Should not proceed if cannot understand it
409 return "new-file4 custom field not supported";
415 return "new-file4 entry";
417 if (is_pseudo_new_file_record_pr3488(level
, number
, file_size
,
418 f
.smallest
, f
.largest
,
419 has_min_log_number_to_keep_
)) {
420 // Since this has nothing to do with NewFile, return immediately.
424 FileDescriptor(number
, path_id
, file_size
, smallest_seqno
, largest_seqno
);
425 new_files_
.push_back(std::make_pair(level
, f
));
429 Status
VersionEdit::DecodeFrom(const Slice
& src
) {
432 const char* msg
= nullptr;
435 // Temporary storage for parsing
440 while (msg
== nullptr && GetVarint32(&input
, &tag
)) {
443 if (GetLengthPrefixedSlice(&input
, &str
)) {
444 db_id_
= str
.ToString();
451 if (GetLengthPrefixedSlice(&input
, &str
)) {
452 comparator_
= str
.ToString();
453 has_comparator_
= true;
455 msg
= "comparator name";
460 if (GetVarint64(&input
, &log_number_
)) {
461 has_log_number_
= true;
468 if (GetVarint64(&input
, &prev_log_number_
)) {
469 has_prev_log_number_
= true;
471 msg
= "previous log number";
475 case kNextFileNumber
:
476 if (GetVarint64(&input
, &next_file_number_
)) {
477 has_next_file_number_
= true;
479 msg
= "next file number";
483 case kMaxColumnFamily
:
484 if (GetVarint32(&input
, &max_column_family_
)) {
485 has_max_column_family_
= true;
487 msg
= "max column family";
491 case kMinLogNumberToKeep
:
492 if (GetVarint64(&input
, &min_log_number_to_keep_
)) {
493 has_min_log_number_to_keep_
= true;
495 msg
= "min log number to kee";
500 if (GetVarint64(&input
, &last_sequence_
)) {
501 has_last_sequence_
= true;
503 msg
= "last sequence number";
507 case kCompactPointer
:
508 if (GetLevel(&input
, &level
, &msg
) &&
509 GetInternalKey(&input
, &key
)) {
510 // we don't use compact pointers anymore,
511 // but we should not fail if they are still
515 msg
= "compaction pointer";
522 if (GetLevel(&input
, &level
, &msg
) && GetVarint64(&input
, &number
)) {
523 deleted_files_
.insert(std::make_pair(level
, number
));
526 msg
= "deleted file";
534 uint64_t file_size
= 0;
535 if (GetLevel(&input
, &level
, &msg
) && GetVarint64(&input
, &number
) &&
536 GetVarint64(&input
, &file_size
) &&
537 GetInternalKey(&input
, &f
.smallest
) &&
538 GetInternalKey(&input
, &f
.largest
)) {
539 f
.fd
= FileDescriptor(number
, 0, file_size
);
540 new_files_
.push_back(std::make_pair(level
, f
));
543 msg
= "new-file entry";
550 uint64_t file_size
= 0;
551 SequenceNumber smallest_seqno
= 0;
552 SequenceNumber largest_seqno
= kMaxSequenceNumber
;
553 if (GetLevel(&input
, &level
, &msg
) && GetVarint64(&input
, &number
) &&
554 GetVarint64(&input
, &file_size
) &&
555 GetInternalKey(&input
, &f
.smallest
) &&
556 GetInternalKey(&input
, &f
.largest
) &&
557 GetVarint64(&input
, &smallest_seqno
) &&
558 GetVarint64(&input
, &largest_seqno
)) {
559 f
.fd
= FileDescriptor(number
, 0, file_size
, smallest_seqno
,
561 new_files_
.push_back(std::make_pair(level
, f
));
564 msg
= "new-file2 entry";
572 uint32_t path_id
= 0;
573 uint64_t file_size
= 0;
574 SequenceNumber smallest_seqno
= 0;
575 SequenceNumber largest_seqno
= kMaxSequenceNumber
;
576 if (GetLevel(&input
, &level
, &msg
) && GetVarint64(&input
, &number
) &&
577 GetVarint32(&input
, &path_id
) && GetVarint64(&input
, &file_size
) &&
578 GetInternalKey(&input
, &f
.smallest
) &&
579 GetInternalKey(&input
, &f
.largest
) &&
580 GetVarint64(&input
, &smallest_seqno
) &&
581 GetVarint64(&input
, &largest_seqno
)) {
582 f
.fd
= FileDescriptor(number
, path_id
, file_size
, smallest_seqno
,
584 new_files_
.push_back(std::make_pair(level
, f
));
587 msg
= "new-file3 entry";
594 msg
= DecodeNewFile4From(&input
);
599 if (!GetVarint32(&input
, &column_family_
)) {
601 msg
= "set column family id";
606 case kColumnFamilyAdd
:
607 if (GetLengthPrefixedSlice(&input
, &str
)) {
608 is_column_family_add_
= true;
609 column_family_name_
= str
.ToString();
612 msg
= "column family add";
617 case kColumnFamilyDrop
:
618 is_column_family_drop_
= true;
622 is_in_atomic_group_
= true;
623 if (!GetVarint32(&input
, &remaining_entries_
)) {
625 msg
= "remaining entries";
631 if (tag
& kTagSafeIgnoreMask
) {
632 // Tag from future which can be safely ignored.
633 // The next field must be the length of the entry.
635 if (!GetVarint32(&input
, &field_len
) ||
636 static_cast<size_t>(field_len
) > input
.size()) {
638 msg
= "safely ignoreable tag length error";
641 input
.remove_prefix(static_cast<size_t>(field_len
));
650 if (msg
== nullptr && !input
.empty()) {
655 if (msg
!= nullptr) {
656 result
= Status::Corruption("VersionEdit", msg
);
661 std::string
VersionEdit::DebugString(bool hex_key
) const {
663 r
.append("VersionEdit {");
665 r
.append("\n DB ID: ");
668 if (has_comparator_
) {
669 r
.append("\n Comparator: ");
670 r
.append(comparator_
);
672 if (has_log_number_
) {
673 r
.append("\n LogNumber: ");
674 AppendNumberTo(&r
, log_number_
);
676 if (has_prev_log_number_
) {
677 r
.append("\n PrevLogNumber: ");
678 AppendNumberTo(&r
, prev_log_number_
);
680 if (has_next_file_number_
) {
681 r
.append("\n NextFileNumber: ");
682 AppendNumberTo(&r
, next_file_number_
);
684 if (has_max_column_family_
) {
685 r
.append("\n MaxColumnFamily: ");
686 AppendNumberTo(&r
, max_column_family_
);
688 if (has_min_log_number_to_keep_
) {
689 r
.append("\n MinLogNumberToKeep: ");
690 AppendNumberTo(&r
, min_log_number_to_keep_
);
692 if (has_last_sequence_
) {
693 r
.append("\n LastSeq: ");
694 AppendNumberTo(&r
, last_sequence_
);
696 for (const auto& deleted_file
: deleted_files_
) {
697 r
.append("\n DeleteFile: ");
698 AppendNumberTo(&r
, deleted_file
.first
);
700 AppendNumberTo(&r
, deleted_file
.second
);
702 for (size_t i
= 0; i
< new_files_
.size(); i
++) {
703 const FileMetaData
& f
= new_files_
[i
].second
;
704 r
.append("\n AddFile: ");
705 AppendNumberTo(&r
, new_files_
[i
].first
);
707 AppendNumberTo(&r
, f
.fd
.GetNumber());
709 AppendNumberTo(&r
, f
.fd
.GetFileSize());
711 r
.append(f
.smallest
.DebugString(hex_key
));
713 r
.append(f
.largest
.DebugString(hex_key
));
714 if (f
.oldest_blob_file_number
!= kInvalidBlobFileNumber
) {
715 r
.append(" blob_file:");
716 AppendNumberTo(&r
, f
.oldest_blob_file_number
);
718 r
.append(" oldest_ancester_time:");
719 AppendNumberTo(&r
, f
.oldest_ancester_time
);
720 r
.append(" file_creation_time:");
721 AppendNumberTo(&r
, f
.file_creation_time
);
722 r
.append(" file_checksum:");
723 r
.append(f
.file_checksum
);
724 r
.append(" file_checksum_func_name: ");
725 r
.append(f
.file_checksum_func_name
);
727 r
.append("\n ColumnFamily: ");
728 AppendNumberTo(&r
, column_family_
);
729 if (is_column_family_add_
) {
730 r
.append("\n ColumnFamilyAdd: ");
731 r
.append(column_family_name_
);
733 if (is_column_family_drop_
) {
734 r
.append("\n ColumnFamilyDrop");
736 if (is_in_atomic_group_
) {
737 r
.append("\n AtomicGroup: ");
738 AppendNumberTo(&r
, remaining_entries_
);
739 r
.append(" entries remains");
745 std::string
VersionEdit::DebugJSON(int edit_num
, bool hex_key
) const {
747 jw
<< "EditNumber" << edit_num
;
750 jw
<< "DB ID" << db_id_
;
752 if (has_comparator_
) {
753 jw
<< "Comparator" << comparator_
;
755 if (has_log_number_
) {
756 jw
<< "LogNumber" << log_number_
;
758 if (has_prev_log_number_
) {
759 jw
<< "PrevLogNumber" << prev_log_number_
;
761 if (has_next_file_number_
) {
762 jw
<< "NextFileNumber" << next_file_number_
;
764 if (has_max_column_family_
) {
765 jw
<< "MaxColumnFamily" << max_column_family_
;
767 if (has_min_log_number_to_keep_
) {
768 jw
<< "MinLogNumberToKeep" << min_log_number_to_keep_
;
770 if (has_last_sequence_
) {
771 jw
<< "LastSeq" << last_sequence_
;
774 if (!deleted_files_
.empty()) {
775 jw
<< "DeletedFiles";
778 for (const auto& deleted_file
: deleted_files_
) {
779 jw
.StartArrayedObject();
780 jw
<< "Level" << deleted_file
.first
;
781 jw
<< "FileNumber" << deleted_file
.second
;
782 jw
.EndArrayedObject();
788 if (!new_files_
.empty()) {
792 for (size_t i
= 0; i
< new_files_
.size(); i
++) {
793 jw
.StartArrayedObject();
794 jw
<< "Level" << new_files_
[i
].first
;
795 const FileMetaData
& f
= new_files_
[i
].second
;
796 jw
<< "FileNumber" << f
.fd
.GetNumber();
797 jw
<< "FileSize" << f
.fd
.GetFileSize();
798 jw
<< "SmallestIKey" << f
.smallest
.DebugString(hex_key
);
799 jw
<< "LargestIKey" << f
.largest
.DebugString(hex_key
);
800 if (f
.oldest_blob_file_number
!= kInvalidBlobFileNumber
) {
801 jw
<< "OldestBlobFile" << f
.oldest_blob_file_number
;
803 jw
.EndArrayedObject();
809 jw
<< "ColumnFamily" << column_family_
;
811 if (is_column_family_add_
) {
812 jw
<< "ColumnFamilyAdd" << column_family_name_
;
814 if (is_column_family_drop_
) {
815 jw
<< "ColumnFamilyDrop" << column_family_name_
;
817 if (is_in_atomic_group_
) {
818 jw
<< "AtomicGroup" << remaining_entries_
;
826 } // namespace ROCKSDB_NAMESPACE