]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/log_writer.h" | |
11 | ||
12 | #include <stdint.h> | |
13 | #include "rocksdb/env.h" | |
14 | #include "util/coding.h" | |
15 | #include "util/crc32c.h" | |
16 | #include "util/file_reader_writer.h" | |
17 | ||
18 | namespace rocksdb { | |
19 | namespace log { | |
20 | ||
11fdf7f2 TL |
21 | Writer::Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number, |
22 | bool recycle_log_files, bool manual_flush) | |
7c673cae FG |
23 | : dest_(std::move(dest)), |
24 | block_offset_(0), | |
25 | log_number_(log_number), | |
11fdf7f2 TL |
26 | recycle_log_files_(recycle_log_files), |
27 | manual_flush_(manual_flush) { | |
7c673cae FG |
28 | for (int i = 0; i <= kMaxRecordType; i++) { |
29 | char t = static_cast<char>(i); | |
30 | type_crc_[i] = crc32c::Value(&t, 1); | |
31 | } | |
32 | } | |
33 | ||
11fdf7f2 TL |
34 | Writer::~Writer() { WriteBuffer(); } |
35 | ||
36 | Status Writer::WriteBuffer() { return dest_->Flush(); } | |
7c673cae FG |
37 | |
38 | Status Writer::AddRecord(const Slice& slice) { | |
39 | const char* ptr = slice.data(); | |
40 | size_t left = slice.size(); | |
41 | ||
42 | // Header size varies depending on whether we are recycling or not. | |
43 | const int header_size = | |
44 | recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize; | |
45 | ||
46 | // Fragment the record if necessary and emit it. Note that if slice | |
47 | // is empty, we still want to iterate once to emit a single | |
48 | // zero-length record | |
49 | Status s; | |
50 | bool begin = true; | |
51 | do { | |
52 | const int64_t leftover = kBlockSize - block_offset_; | |
53 | assert(leftover >= 0); | |
54 | if (leftover < header_size) { | |
55 | // Switch to a new block | |
56 | if (leftover > 0) { | |
57 | // Fill the trailer (literal below relies on kHeaderSize and | |
58 | // kRecyclableHeaderSize being <= 11) | |
59 | assert(header_size <= 11); | |
11fdf7f2 TL |
60 | s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", |
61 | static_cast<size_t>(leftover))); | |
62 | if (!s.ok()) { | |
63 | break; | |
64 | } | |
7c673cae FG |
65 | } |
66 | block_offset_ = 0; | |
67 | } | |
68 | ||
69 | // Invariant: we never leave < header_size bytes in a block. | |
70 | assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size); | |
71 | ||
72 | const size_t avail = kBlockSize - block_offset_ - header_size; | |
73 | const size_t fragment_length = (left < avail) ? left : avail; | |
74 | ||
75 | RecordType type; | |
76 | const bool end = (left == fragment_length); | |
77 | if (begin && end) { | |
78 | type = recycle_log_files_ ? kRecyclableFullType : kFullType; | |
79 | } else if (begin) { | |
80 | type = recycle_log_files_ ? kRecyclableFirstType : kFirstType; | |
81 | } else if (end) { | |
82 | type = recycle_log_files_ ? kRecyclableLastType : kLastType; | |
83 | } else { | |
84 | type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType; | |
85 | } | |
86 | ||
87 | s = EmitPhysicalRecord(type, ptr, fragment_length); | |
88 | ptr += fragment_length; | |
89 | left -= fragment_length; | |
90 | begin = false; | |
91 | } while (s.ok() && left > 0); | |
92 | return s; | |
93 | } | |
94 | ||
11fdf7f2 TL |
95 | bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } |
96 | ||
7c673cae FG |
97 | Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { |
98 | assert(n <= 0xffff); // Must fit in two bytes | |
99 | ||
100 | size_t header_size; | |
101 | char buf[kRecyclableHeaderSize]; | |
102 | ||
103 | // Format the header | |
104 | buf[4] = static_cast<char>(n & 0xff); | |
105 | buf[5] = static_cast<char>(n >> 8); | |
106 | buf[6] = static_cast<char>(t); | |
107 | ||
108 | uint32_t crc = type_crc_[t]; | |
109 | if (t < kRecyclableFullType) { | |
110 | // Legacy record format | |
111 | assert(block_offset_ + kHeaderSize + n <= kBlockSize); | |
112 | header_size = kHeaderSize; | |
113 | } else { | |
114 | // Recyclable record format | |
115 | assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize); | |
116 | header_size = kRecyclableHeaderSize; | |
117 | ||
118 | // Only encode low 32-bits of the 64-bit log number. This means | |
119 | // we will fail to detect an old record if we recycled a log from | |
120 | // ~4 billion logs ago, but that is effectively impossible, and | |
121 | // even if it were we'dbe far more likely to see a false positive | |
122 | // on the 32-bit CRC. | |
123 | EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_)); | |
124 | crc = crc32c::Extend(crc, buf + 7, 4); | |
125 | } | |
126 | ||
127 | // Compute the crc of the record type and the payload. | |
128 | crc = crc32c::Extend(crc, ptr, n); | |
129 | crc = crc32c::Mask(crc); // Adjust for storage | |
130 | EncodeFixed32(buf, crc); | |
131 | ||
132 | // Write the header and the payload | |
133 | Status s = dest_->Append(Slice(buf, header_size)); | |
134 | if (s.ok()) { | |
135 | s = dest_->Append(Slice(ptr, n)); | |
136 | if (s.ok()) { | |
11fdf7f2 TL |
137 | if (!manual_flush_) { |
138 | s = dest_->Flush(); | |
139 | } | |
7c673cae FG |
140 | } |
141 | } | |
142 | block_offset_ += header_size + n; | |
143 | return s; | |
144 | } | |
145 | ||
146 | } // namespace log | |
147 | } // namespace rocksdb |