]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
f67539c2 | 10 | #include "table/block_based/block_based_table_builder.h" |
7c673cae FG |
11 | |
12 | #include <assert.h> | |
7c673cae | 13 | #include <stdio.h> |
20effc67 | 14 | #include <atomic> |
7c673cae FG |
15 | #include <list> |
16 | #include <map> | |
17 | #include <memory> | |
18 | #include <string> | |
19 | #include <unordered_map> | |
20 | #include <utility> | |
21 | ||
22 | #include "db/dbformat.h" | |
f67539c2 | 23 | #include "index_builder.h" |
7c673cae FG |
24 | |
25 | #include "rocksdb/cache.h" | |
26 | #include "rocksdb/comparator.h" | |
27 | #include "rocksdb/env.h" | |
7c673cae FG |
28 | #include "rocksdb/flush_block_policy.h" |
29 | #include "rocksdb/merge_operator.h" | |
30 | #include "rocksdb/table.h" | |
31 | ||
f67539c2 TL |
32 | #include "table/block_based/block.h" |
33 | #include "table/block_based/block_based_filter_block.h" | |
34 | #include "table/block_based/block_based_table_factory.h" | |
35 | #include "table/block_based/block_based_table_reader.h" | |
36 | #include "table/block_based/block_builder.h" | |
37 | #include "table/block_based/filter_block.h" | |
38 | #include "table/block_based/filter_policy_internal.h" | |
39 | #include "table/block_based/full_filter_block.h" | |
40 | #include "table/block_based/partitioned_filter_block.h" | |
7c673cae | 41 | #include "table/format.h" |
7c673cae FG |
42 | #include "table/table_builder.h" |
43 | ||
f67539c2 | 44 | #include "memory/memory_allocator.h" |
7c673cae FG |
45 | #include "util/coding.h" |
46 | #include "util/compression.h" | |
47 | #include "util/crc32c.h" | |
48 | #include "util/stop_watch.h" | |
11fdf7f2 | 49 | #include "util/string_util.h" |
20effc67 | 50 | #include "util/work_queue.h" |
7c673cae FG |
51 | #include "util/xxhash.h" |
52 | ||
f67539c2 | 53 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
54 | |
55 | extern const std::string kHashIndexPrefixesBlock; | |
56 | extern const std::string kHashIndexPrefixesMetadataBlock; | |
57 | ||
7c673cae FG |
58 | |
59 | // Without anonymous namespace here, we fail the warning -Wmissing-prototypes | |
60 | namespace { | |
61 | ||
62 | // Create a filter block builder based on its type. | |
63 | FilterBlockBuilder* CreateFilterBlockBuilder( | |
11fdf7f2 | 64 | const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt, |
f67539c2 | 65 | const FilterBuildingContext& context, |
11fdf7f2 | 66 | const bool use_delta_encoding_for_index_values, |
7c673cae | 67 | PartitionedIndexBuilder* const p_index_builder) { |
f67539c2 | 68 | const BlockBasedTableOptions& table_opt = context.table_options; |
7c673cae FG |
69 | if (table_opt.filter_policy == nullptr) return nullptr; |
70 | ||
71 | FilterBitsBuilder* filter_bits_builder = | |
f67539c2 | 72 | BloomFilterPolicy::GetBuilderFromContext(context); |
7c673cae | 73 | if (filter_bits_builder == nullptr) { |
11fdf7f2 TL |
74 | return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(), |
75 | table_opt); | |
7c673cae FG |
76 | } else { |
77 | if (table_opt.partition_filters) { | |
78 | assert(p_index_builder != nullptr); | |
11fdf7f2 TL |
79 | // Since after partition cut request from filter builder it takes time |
80 | // until index builder actully cuts the partition, we take the lower bound | |
81 | // as partition size. | |
82 | assert(table_opt.block_size_deviation <= 100); | |
494da23a TL |
83 | auto partition_size = |
84 | static_cast<uint32_t>(((table_opt.metadata_block_size * | |
85 | (100 - table_opt.block_size_deviation)) + | |
86 | 99) / | |
87 | 100); | |
11fdf7f2 | 88 | partition_size = std::max(partition_size, static_cast<uint32_t>(1)); |
7c673cae | 89 | return new PartitionedFilterBlockBuilder( |
11fdf7f2 | 90 | mopt.prefix_extractor.get(), table_opt.whole_key_filtering, |
7c673cae | 91 | filter_bits_builder, table_opt.index_block_restart_interval, |
11fdf7f2 | 92 | use_delta_encoding_for_index_values, p_index_builder, partition_size); |
7c673cae | 93 | } else { |
11fdf7f2 | 94 | return new FullFilterBlockBuilder(mopt.prefix_extractor.get(), |
7c673cae FG |
95 | table_opt.whole_key_filtering, |
96 | filter_bits_builder); | |
97 | } | |
98 | } | |
99 | } | |
100 | ||
101 | bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { | |
102 | // Check to see if compressed less than 12.5% | |
103 | return compressed_size < raw_size - (raw_size / 8u); | |
104 | } | |
105 | ||
494da23a TL |
106 | } // namespace |
107 | ||
108 | // format_version is the block format as defined in include/rocksdb/table.h | |
109 | Slice CompressBlock(const Slice& raw, const CompressionInfo& info, | |
110 | CompressionType* type, uint32_t format_version, | |
111 | bool do_sample, std::string* compressed_output, | |
112 | std::string* sampled_output_fast, | |
113 | std::string* sampled_output_slow) { | |
20effc67 TL |
114 | assert(type); |
115 | assert(compressed_output); | |
116 | assert(compressed_output->empty()); | |
7c673cae | 117 | |
494da23a TL |
118 | // If requested, we sample one in every N block with a |
119 | // fast and slow compression algorithm and report the stats. | |
120 | // The users can use these stats to decide if it is worthwhile | |
121 | // enabling compression and they also get a hint about which | |
122 | // compression algorithm wil be beneficial. | |
123 | if (do_sample && info.SampleForCompression() && | |
20effc67 TL |
124 | Random::GetTLSInstance()->OneIn( |
125 | static_cast<int>(info.SampleForCompression()))) { | |
494da23a | 126 | // Sampling with a fast compression algorithm |
20effc67 | 127 | if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) { |
494da23a TL |
128 | CompressionType c = |
129 | LZ4_Supported() ? kLZ4Compression : kSnappyCompression; | |
130 | CompressionContext context(c); | |
131 | CompressionOptions options; | |
132 | CompressionInfo info_tmp(options, context, | |
133 | CompressionDict::GetEmptyDict(), c, | |
134 | info.SampleForCompression()); | |
135 | ||
20effc67 TL |
136 | CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version), |
137 | sampled_output_fast); | |
494da23a TL |
138 | } |
139 | ||
140 | // Sampling with a slow but high-compression algorithm | |
20effc67 | 141 | if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) { |
494da23a TL |
142 | CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression; |
143 | CompressionContext context(c); | |
144 | CompressionOptions options; | |
145 | CompressionInfo info_tmp(options, context, | |
146 | CompressionDict::GetEmptyDict(), c, | |
147 | info.SampleForCompression()); | |
20effc67 TL |
148 | |
149 | CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version), | |
150 | sampled_output_slow); | |
494da23a TL |
151 | } |
152 | } | |
153 | ||
20effc67 TL |
154 | if (info.type() == kNoCompression) { |
155 | *type = kNoCompression; | |
156 | return raw; | |
494da23a TL |
157 | } |
158 | ||
20effc67 TL |
159 | // Actually compress the data; if the compression method is not supported, |
160 | // or the compression fails etc., just fall back to uncompressed | |
161 | if (!CompressData(raw, info, GetCompressFormatForVersion(format_version), | |
162 | compressed_output)) { | |
163 | *type = kNoCompression; | |
164 | return raw; | |
165 | } | |
166 | ||
167 | // Check the compression ratio; if it's not good enough, just fall back to | |
168 | // uncompressed | |
169 | if (!GoodCompressionRatio(compressed_output->size(), raw.size())) { | |
170 | *type = kNoCompression; | |
171 | return raw; | |
172 | } | |
173 | ||
174 | *type = info.type(); | |
175 | return *compressed_output; | |
7c673cae FG |
176 | } |
177 | ||
178 | // kBlockBasedTableMagicNumber was picked by running | |
179 | // echo rocksdb.table.block_based | sha1sum | |
180 | // and taking the leading 64 bits. | |
181 | // Please note that kBlockBasedTableMagicNumber may also be accessed by other | |
182 | // .cc files | |
183 | // for that reason we declare it extern in the header but to get the space | |
184 | // allocated | |
185 | // it must be not extern in one place. | |
186 | const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull; | |
187 | // We also support reading and writing legacy block based table format (for | |
188 | // backwards compatibility) | |
189 | const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull; | |
190 | ||
191 | // A collector that collects properties of interest to block-based table. | |
192 | // For now this class looks heavy-weight since we only write one additional | |
193 | // property. | |
194 | // But in the foreseeable future, we will add more and more properties that are | |
195 | // specific to block-based table. | |
196 | class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector | |
197 | : public IntTblPropCollector { | |
198 | public: | |
199 | explicit BlockBasedTablePropertiesCollector( | |
200 | BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering, | |
201 | bool prefix_filtering) | |
202 | : index_type_(index_type), | |
203 | whole_key_filtering_(whole_key_filtering), | |
204 | prefix_filtering_(prefix_filtering) {} | |
205 | ||
494da23a TL |
206 | Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/, |
207 | uint64_t /*file_size*/) override { | |
7c673cae FG |
208 | // Intentionally left blank. Have no interest in collecting stats for |
209 | // individual key/value pairs. | |
210 | return Status::OK(); | |
211 | } | |
212 | ||
494da23a TL |
213 | virtual void BlockAdd(uint64_t /* blockRawBytes */, |
214 | uint64_t /* blockCompressedBytesFast */, | |
215 | uint64_t /* blockCompressedBytesSlow */) override { | |
216 | // Intentionally left blank. No interest in collecting stats for | |
217 | // blocks. | |
218 | return; | |
219 | } | |
220 | ||
221 | Status Finish(UserCollectedProperties* properties) override { | |
7c673cae FG |
222 | std::string val; |
223 | PutFixed32(&val, static_cast<uint32_t>(index_type_)); | |
224 | properties->insert({BlockBasedTablePropertyNames::kIndexType, val}); | |
225 | properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering, | |
226 | whole_key_filtering_ ? kPropTrue : kPropFalse}); | |
227 | properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering, | |
228 | prefix_filtering_ ? kPropTrue : kPropFalse}); | |
229 | return Status::OK(); | |
230 | } | |
231 | ||
232 | // The name of the properties collector can be used for debugging purpose. | |
494da23a | 233 | const char* Name() const override { |
7c673cae FG |
234 | return "BlockBasedTablePropertiesCollector"; |
235 | } | |
236 | ||
494da23a | 237 | UserCollectedProperties GetReadableProperties() const override { |
7c673cae FG |
238 | // Intentionally left blank. |
239 | return UserCollectedProperties(); | |
240 | } | |
241 | ||
242 | private: | |
243 | BlockBasedTableOptions::IndexType index_type_; | |
244 | bool whole_key_filtering_; | |
245 | bool prefix_filtering_; | |
246 | }; | |
247 | ||
248 | struct BlockBasedTableBuilder::Rep { | |
249 | const ImmutableCFOptions ioptions; | |
11fdf7f2 | 250 | const MutableCFOptions moptions; |
7c673cae FG |
251 | const BlockBasedTableOptions table_options; |
252 | const InternalKeyComparator& internal_comparator; | |
253 | WritableFileWriter* file; | |
20effc67 | 254 | std::atomic<uint64_t> offset; |
11fdf7f2 | 255 | size_t alignment; |
7c673cae | 256 | BlockBuilder data_block; |
494da23a TL |
257 | // Buffers uncompressed data blocks and keys to replay later. Needed when |
258 | // compression dictionary is enabled so we can finalize the dictionary before | |
259 | // compressing any data blocks. | |
260 | // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data | |
261 | // blocks as it's redundant, but it's easier to implement for now. | |
262 | std::vector<std::pair<std::string, std::vector<std::string>>> | |
263 | data_block_and_keys_buffers; | |
7c673cae FG |
264 | BlockBuilder range_del_block; |
265 | ||
266 | InternalKeySliceTransform internal_prefix_transform; | |
267 | std::unique_ptr<IndexBuilder> index_builder; | |
11fdf7f2 | 268 | PartitionedIndexBuilder* p_index_builder_ = nullptr; |
7c673cae FG |
269 | |
270 | std::string last_key; | |
20effc67 | 271 | const Slice* first_key_in_next_block = nullptr; |
494da23a TL |
272 | CompressionType compression_type; |
273 | uint64_t sample_for_compression; | |
274 | CompressionOptions compression_opts; | |
275 | std::unique_ptr<CompressionDict> compression_dict; | |
20effc67 TL |
276 | std::vector<std::unique_ptr<CompressionContext>> compression_ctxs; |
277 | std::vector<std::unique_ptr<UncompressionContext>> verify_ctxs; | |
494da23a TL |
278 | std::unique_ptr<UncompressionDict> verify_dict; |
279 | ||
280 | size_t data_begin_offset = 0; | |
281 | ||
7c673cae FG |
282 | TableProperties props; |
283 | ||
494da23a TL |
284 | // States of the builder. |
285 | // | |
286 | // - `kBuffered`: This is the initial state where zero or more data blocks are | |
287 | // accumulated uncompressed in-memory. From this state, call | |
288 | // `EnterUnbuffered()` to finalize the compression dictionary if enabled, | |
289 | // compress/write out any buffered blocks, and proceed to the `kUnbuffered` | |
290 | // state. | |
291 | // | |
292 | // - `kUnbuffered`: This is the state when compression dictionary is finalized | |
293 | // either because it wasn't enabled in the first place or it's been created | |
294 | // from sampling previously buffered data. In this state, blocks are simply | |
295 | // compressed/written out as they fill up. From this state, call `Finish()` | |
296 | // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete | |
297 | // the partially created file. | |
298 | // | |
299 | // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been | |
300 | // called, so the table builder is no longer usable. We must be in this | |
301 | // state by the time the destructor runs. | |
302 | enum class State { | |
303 | kBuffered, | |
304 | kUnbuffered, | |
305 | kClosed, | |
306 | }; | |
307 | State state; | |
308 | ||
11fdf7f2 | 309 | const bool use_delta_encoding_for_index_values; |
7c673cae FG |
310 | std::unique_ptr<FilterBlockBuilder> filter_builder; |
311 | char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize]; | |
312 | size_t compressed_cache_key_prefix_size; | |
313 | ||
314 | BlockHandle pending_handle; // Handle to add to index block | |
315 | ||
316 | std::string compressed_output; | |
317 | std::unique_ptr<FlushBlockPolicy> flush_block_policy; | |
f67539c2 | 318 | int level_at_creation; |
7c673cae FG |
319 | uint32_t column_family_id; |
320 | const std::string& column_family_name; | |
11fdf7f2 TL |
321 | uint64_t creation_time = 0; |
322 | uint64_t oldest_key_time = 0; | |
494da23a | 323 | const uint64_t target_file_size; |
f67539c2 | 324 | uint64_t file_creation_time = 0; |
7c673cae | 325 | |
20effc67 TL |
326 | // DB IDs |
327 | const std::string db_id; | |
328 | const std::string db_session_id; | |
329 | std::string db_host_id; | |
330 | ||
7c673cae FG |
331 | std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors; |
332 | ||
20effc67 TL |
333 | std::unique_ptr<ParallelCompressionRep> pc_rep; |
334 | ||
335 | uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } | |
336 | void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } | |
337 | ||
338 | bool IsParallelCompressionEnabled() const { | |
339 | return compression_opts.parallel_threads > 1; | |
340 | } | |
341 | ||
342 | Status GetStatus() { | |
343 | // We need to make modifications of status visible when status_ok is set | |
344 | // to false, and this is ensured by status_mutex, so no special memory | |
345 | // order for status_ok is required. | |
346 | if (status_ok.load(std::memory_order_relaxed)) { | |
347 | return Status::OK(); | |
348 | } else { | |
349 | return CopyStatus(); | |
350 | } | |
351 | } | |
352 | ||
353 | Status CopyStatus() { | |
354 | std::lock_guard<std::mutex> lock(status_mutex); | |
355 | return status; | |
356 | } | |
357 | ||
358 | IOStatus GetIOStatus() { | |
359 | // We need to make modifications of io_status visible when status_ok is set | |
360 | // to false, and this is ensured by io_status_mutex, so no special memory | |
361 | // order for io_status_ok is required. | |
362 | if (io_status_ok.load(std::memory_order_relaxed)) { | |
363 | return IOStatus::OK(); | |
364 | } else { | |
365 | return CopyIOStatus(); | |
366 | } | |
367 | } | |
368 | ||
369 | IOStatus CopyIOStatus() { | |
370 | std::lock_guard<std::mutex> lock(io_status_mutex); | |
371 | return io_status; | |
372 | } | |
373 | ||
374 | // Never erase an existing status that is not OK. | |
375 | void SetStatus(Status s) { | |
376 | if (!s.ok() && status_ok.load(std::memory_order_relaxed)) { | |
377 | // Locking is an overkill for non compression_opts.parallel_threads | |
378 | // case but since it's unlikely that s is not OK, we take this cost | |
379 | // to be simplicity. | |
380 | std::lock_guard<std::mutex> lock(status_mutex); | |
381 | status = s; | |
382 | status_ok.store(false, std::memory_order_relaxed); | |
383 | } | |
384 | } | |
385 | ||
386 | // Never erase an existing I/O status that is not OK. | |
387 | void SetIOStatus(IOStatus ios) { | |
388 | if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) { | |
389 | // Locking is an overkill for non compression_opts.parallel_threads | |
390 | // case but since it's unlikely that s is not OK, we take this cost | |
391 | // to be simplicity. | |
392 | std::lock_guard<std::mutex> lock(io_status_mutex); | |
393 | io_status = ios; | |
394 | io_status_ok.store(false, std::memory_order_relaxed); | |
395 | } | |
396 | } | |
397 | ||
11fdf7f2 | 398 | Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions, |
7c673cae FG |
399 | const BlockBasedTableOptions& table_opt, |
400 | const InternalKeyComparator& icomparator, | |
401 | const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* | |
402 | int_tbl_prop_collector_factories, | |
403 | uint32_t _column_family_id, WritableFileWriter* f, | |
404 | const CompressionType _compression_type, | |
494da23a TL |
405 | const uint64_t _sample_for_compression, |
406 | const CompressionOptions& _compression_opts, const bool skip_filters, | |
f67539c2 TL |
407 | const int _level_at_creation, const std::string& _column_family_name, |
408 | const uint64_t _creation_time, const uint64_t _oldest_key_time, | |
20effc67 TL |
409 | const uint64_t _target_file_size, const uint64_t _file_creation_time, |
410 | const std::string& _db_id, const std::string& _db_session_id) | |
7c673cae | 411 | : ioptions(_ioptions), |
11fdf7f2 | 412 | moptions(_moptions), |
7c673cae FG |
413 | table_options(table_opt), |
414 | internal_comparator(icomparator), | |
415 | file(f), | |
20effc67 | 416 | offset(0), |
11fdf7f2 TL |
417 | alignment(table_options.block_align |
418 | ? std::min(table_options.block_size, kDefaultPageSize) | |
419 | : 0), | |
7c673cae | 420 | data_block(table_options.block_restart_interval, |
11fdf7f2 TL |
421 | table_options.use_delta_encoding, |
422 | false /* use_value_delta_encoding */, | |
423 | icomparator.user_comparator() | |
424 | ->CanKeysWithDifferentByteContentsBeEqual() | |
425 | ? BlockBasedTableOptions::kDataBlockBinarySearch | |
426 | : table_options.data_block_index_type, | |
427 | table_options.data_block_hash_table_util_ratio), | |
428 | range_del_block(1 /* block_restart_interval */), | |
429 | internal_prefix_transform(_moptions.prefix_extractor.get()), | |
494da23a TL |
430 | compression_type(_compression_type), |
431 | sample_for_compression(_sample_for_compression), | |
432 | compression_opts(_compression_opts), | |
433 | compression_dict(), | |
20effc67 TL |
434 | compression_ctxs(_compression_opts.parallel_threads), |
435 | verify_ctxs(_compression_opts.parallel_threads), | |
494da23a TL |
436 | verify_dict(), |
437 | state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered | |
438 | : State::kUnbuffered), | |
11fdf7f2 TL |
439 | use_delta_encoding_for_index_values(table_opt.format_version >= 4 && |
440 | !table_opt.block_align), | |
441 | compressed_cache_key_prefix_size(0), | |
7c673cae FG |
442 | flush_block_policy( |
443 | table_options.flush_block_policy_factory->NewFlushBlockPolicy( | |
444 | table_options, data_block)), | |
f67539c2 | 445 | level_at_creation(_level_at_creation), |
7c673cae | 446 | column_family_id(_column_family_id), |
11fdf7f2 TL |
447 | column_family_name(_column_family_name), |
448 | creation_time(_creation_time), | |
494da23a | 449 | oldest_key_time(_oldest_key_time), |
f67539c2 | 450 | target_file_size(_target_file_size), |
20effc67 TL |
451 | file_creation_time(_file_creation_time), |
452 | db_id(_db_id), | |
453 | db_session_id(_db_session_id), | |
454 | db_host_id(ioptions.db_host_id), | |
455 | status_ok(true), | |
456 | io_status_ok(true) { | |
457 | for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { | |
458 | compression_ctxs[i].reset(new CompressionContext(compression_type)); | |
459 | } | |
7c673cae FG |
460 | if (table_options.index_type == |
461 | BlockBasedTableOptions::kTwoLevelIndexSearch) { | |
11fdf7f2 TL |
462 | p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( |
463 | &internal_comparator, use_delta_encoding_for_index_values, | |
464 | table_options); | |
465 | index_builder.reset(p_index_builder_); | |
7c673cae FG |
466 | } else { |
467 | index_builder.reset(IndexBuilder::CreateIndexBuilder( | |
468 | table_options.index_type, &internal_comparator, | |
11fdf7f2 TL |
469 | &this->internal_prefix_transform, use_delta_encoding_for_index_values, |
470 | table_options)); | |
7c673cae FG |
471 | } |
472 | if (skip_filters) { | |
473 | filter_builder = nullptr; | |
474 | } else { | |
f67539c2 TL |
475 | FilterBuildingContext context(table_options); |
476 | context.column_family_name = column_family_name; | |
477 | context.compaction_style = ioptions.compaction_style; | |
478 | context.level_at_creation = level_at_creation; | |
479 | context.info_log = ioptions.info_log; | |
11fdf7f2 | 480 | filter_builder.reset(CreateFilterBlockBuilder( |
f67539c2 TL |
481 | ioptions, moptions, context, use_delta_encoding_for_index_values, |
482 | p_index_builder_)); | |
7c673cae FG |
483 | } |
484 | ||
485 | for (auto& collector_factories : *int_tbl_prop_collector_factories) { | |
486 | table_properties_collectors.emplace_back( | |
487 | collector_factories->CreateIntTblPropCollector(column_family_id)); | |
488 | } | |
489 | table_properties_collectors.emplace_back( | |
490 | new BlockBasedTablePropertiesCollector( | |
491 | table_options.index_type, table_options.whole_key_filtering, | |
11fdf7f2 TL |
492 | _moptions.prefix_extractor != nullptr)); |
493 | if (table_options.verify_compression) { | |
20effc67 TL |
494 | for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { |
495 | verify_ctxs[i].reset(new UncompressionContext(compression_type)); | |
496 | } | |
497 | } | |
498 | ||
499 | if (!ReifyDbHostIdProperty(ioptions.env, &db_host_id).ok()) { | |
500 | ROCKS_LOG_INFO(ioptions.info_log, "db_host_id property will not be set"); | |
11fdf7f2 | 501 | } |
7c673cae | 502 | } |
11fdf7f2 TL |
503 | |
504 | Rep(const Rep&) = delete; | |
505 | Rep& operator=(const Rep&) = delete; | |
506 | ||
20effc67 TL |
507 | private: |
508 | // Synchronize status & io_status accesses across threads from main thread, | |
509 | // compression thread and write thread in parallel compression. | |
510 | std::mutex status_mutex; | |
511 | std::atomic<bool> status_ok; | |
512 | Status status; | |
513 | std::mutex io_status_mutex; | |
514 | std::atomic<bool> io_status_ok; | |
515 | IOStatus io_status; | |
516 | }; | |
517 | ||
518 | struct BlockBasedTableBuilder::ParallelCompressionRep { | |
519 | // Keys is a wrapper of vector of strings avoiding | |
520 | // releasing string memories during vector clear() | |
521 | // in order to save memory allocation overhead | |
522 | class Keys { | |
523 | public: | |
524 | Keys() : keys_(kKeysInitSize), size_(0) {} | |
525 | void PushBack(const Slice& key) { | |
526 | if (size_ == keys_.size()) { | |
527 | keys_.emplace_back(key.data(), key.size()); | |
528 | } else { | |
529 | keys_[size_].assign(key.data(), key.size()); | |
530 | } | |
531 | size_++; | |
532 | } | |
533 | void SwapAssign(std::vector<std::string>& keys) { | |
534 | size_ = keys.size(); | |
535 | std::swap(keys_, keys); | |
536 | } | |
537 | void Clear() { size_ = 0; } | |
538 | size_t Size() { return size_; } | |
539 | std::string& Back() { return keys_[size_ - 1]; } | |
540 | std::string& operator[](size_t idx) { | |
541 | assert(idx < size_); | |
542 | return keys_[idx]; | |
543 | } | |
544 | ||
545 | private: | |
546 | const size_t kKeysInitSize = 32; | |
547 | std::vector<std::string> keys_; | |
548 | size_t size_; | |
549 | }; | |
550 | std::unique_ptr<Keys> curr_block_keys; | |
551 | ||
552 | class BlockRepSlot; | |
553 | ||
554 | // BlockRep instances are fetched from and recycled to | |
555 | // block_rep_pool during parallel compression. | |
556 | struct BlockRep { | |
557 | Slice contents; | |
558 | Slice compressed_contents; | |
559 | std::unique_ptr<std::string> data; | |
560 | std::unique_ptr<std::string> compressed_data; | |
561 | CompressionType compression_type; | |
562 | std::unique_ptr<std::string> first_key_in_next_block; | |
563 | std::unique_ptr<Keys> keys; | |
564 | std::unique_ptr<BlockRepSlot> slot; | |
565 | Status status; | |
566 | }; | |
567 | // Use a vector of BlockRep as a buffer for a determined number | |
568 | // of BlockRep structures. All data referenced by pointers in | |
569 | // BlockRep will be freed when this vector is destructed. | |
570 | typedef std::vector<BlockRep> BlockRepBuffer; | |
571 | BlockRepBuffer block_rep_buf; | |
572 | // Use a thread-safe queue for concurrent access from block | |
573 | // building thread and writer thread. | |
574 | typedef WorkQueue<BlockRep*> BlockRepPool; | |
575 | BlockRepPool block_rep_pool; | |
576 | ||
577 | // Use BlockRepSlot to keep block order in write thread. | |
578 | // slot_ will pass references to BlockRep | |
579 | class BlockRepSlot { | |
580 | public: | |
581 | BlockRepSlot() : slot_(1) {} | |
582 | template <typename T> | |
583 | void Fill(T&& rep) { | |
584 | slot_.push(std::forward<T>(rep)); | |
585 | }; | |
586 | void Take(BlockRep*& rep) { slot_.pop(rep); } | |
587 | ||
588 | private: | |
589 | // slot_ will pass references to BlockRep in block_rep_buf, | |
590 | // and those references are always valid before the destruction of | |
591 | // block_rep_buf. | |
592 | WorkQueue<BlockRep*> slot_; | |
593 | }; | |
594 | ||
595 | // Compression queue will pass references to BlockRep in block_rep_buf, | |
596 | // and those references are always valid before the destruction of | |
597 | // block_rep_buf. | |
598 | typedef WorkQueue<BlockRep*> CompressQueue; | |
599 | CompressQueue compress_queue; | |
600 | std::vector<port::Thread> compress_thread_pool; | |
601 | ||
602 | // Write queue will pass references to BlockRep::slot in block_rep_buf, | |
603 | // and those references are always valid before the corresponding | |
604 | // BlockRep::slot is destructed, which is before the destruction of | |
605 | // block_rep_buf. | |
606 | typedef WorkQueue<BlockRepSlot*> WriteQueue; | |
607 | WriteQueue write_queue; | |
608 | std::unique_ptr<port::Thread> write_thread; | |
609 | ||
610 | // Estimate output file size when parallel compression is enabled. This is | |
611 | // necessary because compression & flush are no longer synchronized, | |
612 | // and BlockBasedTableBuilder::FileSize() is no longer accurate. | |
613 | // memory_order_relaxed suffices because accurate statistics is not required. | |
614 | class FileSizeEstimator { | |
615 | public: | |
616 | explicit FileSizeEstimator() | |
617 | : raw_bytes_compressed(0), | |
618 | raw_bytes_curr_block(0), | |
619 | raw_bytes_curr_block_set(false), | |
620 | raw_bytes_inflight(0), | |
621 | blocks_inflight(0), | |
622 | curr_compression_ratio(0), | |
623 | estimated_file_size(0) {} | |
624 | ||
625 | // Estimate file size when a block is about to be emitted to | |
626 | // compression thread | |
627 | void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) { | |
628 | uint64_t new_raw_bytes_inflight = | |
629 | raw_bytes_inflight.fetch_add(raw_block_size, | |
630 | std::memory_order_relaxed) + | |
631 | raw_block_size; | |
632 | ||
633 | uint64_t new_blocks_inflight = | |
634 | blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; | |
635 | ||
636 | estimated_file_size.store( | |
637 | curr_file_size + | |
638 | static_cast<uint64_t>( | |
639 | static_cast<double>(new_raw_bytes_inflight) * | |
640 | curr_compression_ratio.load(std::memory_order_relaxed)) + | |
641 | new_blocks_inflight * kBlockTrailerSize, | |
642 | std::memory_order_relaxed); | |
643 | } | |
644 | ||
645 | // Estimate file size when a block is already reaped from | |
646 | // compression thread | |
647 | void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { | |
648 | assert(raw_bytes_curr_block_set); | |
649 | ||
650 | uint64_t new_raw_bytes_compressed = | |
651 | raw_bytes_compressed + raw_bytes_curr_block; | |
652 | assert(new_raw_bytes_compressed > 0); | |
653 | ||
654 | curr_compression_ratio.store( | |
655 | (curr_compression_ratio.load(std::memory_order_relaxed) * | |
656 | raw_bytes_compressed + | |
657 | compressed_block_size) / | |
658 | static_cast<double>(new_raw_bytes_compressed), | |
659 | std::memory_order_relaxed); | |
660 | raw_bytes_compressed = new_raw_bytes_compressed; | |
661 | ||
662 | uint64_t new_raw_bytes_inflight = | |
663 | raw_bytes_inflight.fetch_sub(raw_bytes_curr_block, | |
664 | std::memory_order_relaxed) - | |
665 | raw_bytes_curr_block; | |
666 | ||
667 | uint64_t new_blocks_inflight = | |
668 | blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; | |
669 | ||
670 | estimated_file_size.store( | |
671 | curr_file_size + | |
672 | static_cast<uint64_t>( | |
673 | static_cast<double>(new_raw_bytes_inflight) * | |
674 | curr_compression_ratio.load(std::memory_order_relaxed)) + | |
675 | new_blocks_inflight * kBlockTrailerSize, | |
676 | std::memory_order_relaxed); | |
677 | ||
678 | raw_bytes_curr_block_set = false; | |
679 | } | |
680 | ||
681 | void SetEstimatedFileSize(uint64_t size) { | |
682 | estimated_file_size.store(size, std::memory_order_relaxed); | |
683 | } | |
684 | ||
685 | uint64_t GetEstimatedFileSize() { | |
686 | return estimated_file_size.load(std::memory_order_relaxed); | |
687 | } | |
688 | ||
689 | void SetCurrBlockRawSize(uint64_t size) { | |
690 | raw_bytes_curr_block = size; | |
691 | raw_bytes_curr_block_set = true; | |
692 | } | |
693 | ||
694 | private: | |
695 | // Raw bytes compressed so far. | |
696 | uint64_t raw_bytes_compressed; | |
697 | // Size of current block being appended. | |
698 | uint64_t raw_bytes_curr_block; | |
699 | // Whether raw_bytes_curr_block has been set for next | |
700 | // ReapBlock call. | |
701 | bool raw_bytes_curr_block_set; | |
702 | // Raw bytes under compression and not appended yet. | |
703 | std::atomic<uint64_t> raw_bytes_inflight; | |
704 | // Number of blocks under compression and not appended yet. | |
705 | std::atomic<uint64_t> blocks_inflight; | |
706 | // Current compression ratio, maintained by BGWorkWriteRawBlock. | |
707 | std::atomic<double> curr_compression_ratio; | |
708 | // Estimated SST file size. | |
709 | std::atomic<uint64_t> estimated_file_size; | |
710 | }; | |
711 | FileSizeEstimator file_size_estimator; | |
712 | ||
713 | // Facilities used for waiting first block completion. Need to Wait for | |
714 | // the completion of first block compression and flush to get a non-zero | |
715 | // compression ratio. | |
716 | std::atomic<bool> first_block_processed; | |
717 | std::condition_variable first_block_cond; | |
718 | std::mutex first_block_mutex; | |
719 | ||
720 | explicit ParallelCompressionRep(uint32_t parallel_threads) | |
721 | : curr_block_keys(new Keys()), | |
722 | block_rep_buf(parallel_threads), | |
723 | block_rep_pool(parallel_threads), | |
724 | compress_queue(parallel_threads), | |
725 | write_queue(parallel_threads), | |
726 | first_block_processed(false) { | |
727 | for (uint32_t i = 0; i < parallel_threads; i++) { | |
728 | block_rep_buf[i].contents = Slice(); | |
729 | block_rep_buf[i].compressed_contents = Slice(); | |
730 | block_rep_buf[i].data.reset(new std::string()); | |
731 | block_rep_buf[i].compressed_data.reset(new std::string()); | |
732 | block_rep_buf[i].compression_type = CompressionType(); | |
733 | block_rep_buf[i].first_key_in_next_block.reset(new std::string()); | |
734 | block_rep_buf[i].keys.reset(new Keys()); | |
735 | block_rep_buf[i].slot.reset(new BlockRepSlot()); | |
736 | block_rep_buf[i].status = Status::OK(); | |
737 | block_rep_pool.push(&block_rep_buf[i]); | |
738 | } | |
739 | } | |
740 | ||
741 | ~ParallelCompressionRep() { block_rep_pool.finish(); } | |
742 | ||
743 | // Make a block prepared to be emitted to compression thread | |
744 | // Used in non-buffered mode | |
745 | BlockRep* PrepareBlock(CompressionType compression_type, | |
746 | const Slice* first_key_in_next_block, | |
747 | BlockBuilder* data_block) { | |
748 | BlockRep* block_rep = | |
749 | PrepareBlockInternal(compression_type, first_key_in_next_block); | |
750 | assert(block_rep != nullptr); | |
751 | data_block->SwapAndReset(*(block_rep->data)); | |
752 | block_rep->contents = *(block_rep->data); | |
753 | std::swap(block_rep->keys, curr_block_keys); | |
754 | curr_block_keys->Clear(); | |
755 | return block_rep; | |
756 | } | |
757 | ||
758 | // Used in EnterUnbuffered | |
759 | BlockRep* PrepareBlock(CompressionType compression_type, | |
760 | const Slice* first_key_in_next_block, | |
761 | std::string* data_block, | |
762 | std::vector<std::string>* keys) { | |
763 | BlockRep* block_rep = | |
764 | PrepareBlockInternal(compression_type, first_key_in_next_block); | |
765 | assert(block_rep != nullptr); | |
766 | std::swap(*(block_rep->data), *data_block); | |
767 | block_rep->contents = *(block_rep->data); | |
768 | block_rep->keys->SwapAssign(*keys); | |
769 | return block_rep; | |
770 | } | |
771 | ||
772 | // Emit a block to compression thread | |
773 | void EmitBlock(BlockRep* block_rep) { | |
774 | assert(block_rep != nullptr); | |
775 | assert(block_rep->status.ok()); | |
776 | if (!write_queue.push(block_rep->slot.get())) { | |
777 | return; | |
778 | } | |
779 | if (!compress_queue.push(block_rep)) { | |
780 | return; | |
781 | } | |
782 | ||
783 | if (!first_block_processed.load(std::memory_order_relaxed)) { | |
784 | std::unique_lock<std::mutex> lock(first_block_mutex); | |
785 | first_block_cond.wait(lock, [this] { | |
786 | return first_block_processed.load(std::memory_order_relaxed); | |
787 | }); | |
788 | } | |
789 | } | |
790 | ||
791 | // Reap a block from compression thread | |
792 | void ReapBlock(BlockRep* block_rep) { | |
793 | assert(block_rep != nullptr); | |
794 | block_rep->compressed_data->clear(); | |
795 | block_rep_pool.push(block_rep); | |
796 | ||
797 | if (!first_block_processed.load(std::memory_order_relaxed)) { | |
798 | std::lock_guard<std::mutex> lock(first_block_mutex); | |
799 | first_block_processed.store(true, std::memory_order_relaxed); | |
800 | first_block_cond.notify_one(); | |
801 | } | |
802 | } | |
803 | ||
804 | private: | |
805 | BlockRep* PrepareBlockInternal(CompressionType compression_type, | |
806 | const Slice* first_key_in_next_block) { | |
807 | BlockRep* block_rep = nullptr; | |
808 | block_rep_pool.pop(block_rep); | |
809 | assert(block_rep != nullptr); | |
810 | ||
811 | assert(block_rep->data); | |
812 | ||
813 | block_rep->compression_type = compression_type; | |
814 | ||
815 | if (first_key_in_next_block == nullptr) { | |
816 | block_rep->first_key_in_next_block.reset(nullptr); | |
817 | } else { | |
818 | block_rep->first_key_in_next_block->assign( | |
819 | first_key_in_next_block->data(), first_key_in_next_block->size()); | |
820 | } | |
821 | ||
822 | return block_rep; | |
823 | } | |
7c673cae FG |
824 | }; |
825 | ||
826 | BlockBasedTableBuilder::BlockBasedTableBuilder( | |
11fdf7f2 | 827 | const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions, |
7c673cae FG |
828 | const BlockBasedTableOptions& table_options, |
829 | const InternalKeyComparator& internal_comparator, | |
830 | const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* | |
831 | int_tbl_prop_collector_factories, | |
832 | uint32_t column_family_id, WritableFileWriter* file, | |
833 | const CompressionType compression_type, | |
494da23a TL |
834 | const uint64_t sample_for_compression, |
835 | const CompressionOptions& compression_opts, const bool skip_filters, | |
f67539c2 TL |
836 | const std::string& column_family_name, const int level_at_creation, |
837 | const uint64_t creation_time, const uint64_t oldest_key_time, | |
20effc67 TL |
838 | const uint64_t target_file_size, const uint64_t file_creation_time, |
839 | const std::string& db_id, const std::string& db_session_id) { | |
7c673cae FG |
840 | BlockBasedTableOptions sanitized_table_options(table_options); |
841 | if (sanitized_table_options.format_version == 0 && | |
842 | sanitized_table_options.checksum != kCRC32c) { | |
843 | ROCKS_LOG_WARN( | |
844 | ioptions.info_log, | |
845 | "Silently converting format_version to 1 because checksum is " | |
846 | "non-default"); | |
847 | // silently convert format_version to 1 to keep consistent with current | |
848 | // behavior | |
849 | sanitized_table_options.format_version = 1; | |
850 | } | |
851 | ||
20effc67 TL |
852 | rep_ = new Rep( |
853 | ioptions, moptions, sanitized_table_options, internal_comparator, | |
854 | int_tbl_prop_collector_factories, column_family_id, file, | |
855 | compression_type, sample_for_compression, compression_opts, skip_filters, | |
856 | level_at_creation, column_family_name, creation_time, oldest_key_time, | |
857 | target_file_size, file_creation_time, db_id, db_session_id); | |
7c673cae FG |
858 | |
859 | if (rep_->filter_builder != nullptr) { | |
860 | rep_->filter_builder->StartBlock(0); | |
861 | } | |
862 | if (table_options.block_cache_compressed.get() != nullptr) { | |
20effc67 | 863 | BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>( |
7c673cae FG |
864 | table_options.block_cache_compressed.get(), file->writable_file(), |
865 | &rep_->compressed_cache_key_prefix[0], | |
866 | &rep_->compressed_cache_key_prefix_size); | |
867 | } | |
20effc67 TL |
868 | |
869 | if (rep_->IsParallelCompressionEnabled()) { | |
870 | StartParallelCompression(); | |
871 | } | |
7c673cae FG |
872 | } |
873 | ||
874 | BlockBasedTableBuilder::~BlockBasedTableBuilder() { | |
494da23a TL |
875 | // Catch errors where caller forgot to call Finish() |
876 | assert(rep_->state == Rep::State::kClosed); | |
7c673cae FG |
877 | delete rep_; |
878 | } | |
879 | ||
880 | void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { | |
881 | Rep* r = rep_; | |
494da23a | 882 | assert(rep_->state != Rep::State::kClosed); |
7c673cae FG |
883 | if (!ok()) return; |
884 | ValueType value_type = ExtractValueType(key); | |
885 | if (IsValueType(value_type)) { | |
494da23a TL |
886 | #ifndef NDEBUG |
887 | if (r->props.num_entries > r->props.num_range_deletions) { | |
7c673cae FG |
888 | assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0); |
889 | } | |
20effc67 | 890 | #endif // !NDEBUG |
7c673cae FG |
891 | |
892 | auto should_flush = r->flush_block_policy->Update(key, value); | |
893 | if (should_flush) { | |
894 | assert(!r->data_block.empty()); | |
20effc67 | 895 | r->first_key_in_next_block = &key; |
7c673cae FG |
896 | Flush(); |
897 | ||
20effc67 | 898 | if (r->state == Rep::State::kBuffered && r->target_file_size != 0 && |
494da23a TL |
899 | r->data_begin_offset > r->target_file_size) { |
900 | EnterUnbuffered(); | |
901 | } | |
902 | ||
7c673cae FG |
903 | // Add item to index block. |
904 | // We do not emit the index entry for a block until we have seen the | |
905 | // first key for the next data block. This allows us to use shorter | |
906 | // keys in the index block. For example, consider a block boundary | |
907 | // between the keys "the quick brown fox" and "the who". We can use | |
908 | // "the r" as the key for the index block entry since it is >= all | |
909 | // entries in the first block and < all entries in subsequent | |
910 | // blocks. | |
494da23a | 911 | if (ok() && r->state == Rep::State::kUnbuffered) { |
20effc67 TL |
912 | if (r->IsParallelCompressionEnabled()) { |
913 | r->pc_rep->curr_block_keys->Clear(); | |
914 | } else { | |
915 | r->index_builder->AddIndexEntry(&r->last_key, &key, | |
916 | r->pending_handle); | |
917 | } | |
7c673cae FG |
918 | } |
919 | } | |
920 | ||
921 | // Note: PartitionedFilterBlockBuilder requires key being added to filter | |
922 | // builder after being added to index builder. | |
20effc67 TL |
923 | if (r->state == Rep::State::kUnbuffered) { |
924 | if (r->IsParallelCompressionEnabled()) { | |
925 | r->pc_rep->curr_block_keys->PushBack(key); | |
926 | } else { | |
927 | if (r->filter_builder != nullptr) { | |
928 | size_t ts_sz = | |
929 | r->internal_comparator.user_comparator()->timestamp_size(); | |
930 | r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); | |
931 | } | |
932 | } | |
7c673cae FG |
933 | } |
934 | ||
935 | r->last_key.assign(key.data(), key.size()); | |
936 | r->data_block.Add(key, value); | |
494da23a TL |
937 | if (r->state == Rep::State::kBuffered) { |
938 | // Buffer keys to be replayed during `Finish()` once compression | |
939 | // dictionary has been finalized. | |
940 | if (r->data_block_and_keys_buffers.empty() || should_flush) { | |
941 | r->data_block_and_keys_buffers.emplace_back(); | |
942 | } | |
943 | r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString()); | |
944 | } else { | |
20effc67 TL |
945 | if (!r->IsParallelCompressionEnabled()) { |
946 | r->index_builder->OnKeyAdded(key); | |
947 | } | |
494da23a | 948 | } |
20effc67 TL |
949 | // TODO offset passed in is not accurate for parallel compression case |
950 | NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(), | |
7c673cae FG |
951 | r->table_properties_collectors, |
952 | r->ioptions.info_log); | |
953 | ||
954 | } else if (value_type == kTypeRangeDeletion) { | |
7c673cae | 955 | r->range_del_block.Add(key, value); |
20effc67 TL |
956 | // TODO offset passed in is not accurate for parallel compression case |
957 | NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(), | |
7c673cae FG |
958 | r->table_properties_collectors, |
959 | r->ioptions.info_log); | |
960 | } else { | |
961 | assert(false); | |
962 | } | |
494da23a TL |
963 | |
964 | r->props.num_entries++; | |
965 | r->props.raw_key_size += key.size(); | |
966 | r->props.raw_value_size += value.size(); | |
967 | if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) { | |
968 | r->props.num_deletions++; | |
969 | } else if (value_type == kTypeRangeDeletion) { | |
970 | r->props.num_deletions++; | |
971 | r->props.num_range_deletions++; | |
972 | } else if (value_type == kTypeMerge) { | |
973 | r->props.num_merge_operands++; | |
974 | } | |
7c673cae FG |
975 | } |
976 | ||
977 | void BlockBasedTableBuilder::Flush() { | |
978 | Rep* r = rep_; | |
494da23a | 979 | assert(rep_->state != Rep::State::kClosed); |
7c673cae FG |
980 | if (!ok()) return; |
981 | if (r->data_block.empty()) return; | |
20effc67 TL |
982 | if (r->IsParallelCompressionEnabled() && |
983 | r->state == Rep::State::kUnbuffered) { | |
984 | r->data_block.Finish(); | |
985 | ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock( | |
986 | r->compression_type, r->first_key_in_next_block, &(r->data_block)); | |
987 | assert(block_rep != nullptr); | |
988 | r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(), | |
989 | r->get_offset()); | |
990 | r->pc_rep->EmitBlock(block_rep); | |
991 | } else { | |
992 | WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */); | |
993 | } | |
7c673cae FG |
994 | } |
995 | ||
996 | void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, | |
997 | BlockHandle* handle, | |
998 | bool is_data_block) { | |
999 | WriteBlock(block->Finish(), handle, is_data_block); | |
1000 | block->Reset(); | |
1001 | } | |
1002 | ||
1003 | void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, | |
1004 | BlockHandle* handle, | |
1005 | bool is_data_block) { | |
20effc67 TL |
1006 | Rep* r = rep_; |
1007 | Slice block_contents; | |
1008 | CompressionType type; | |
1009 | if (r->state == Rep::State::kBuffered) { | |
1010 | assert(is_data_block); | |
1011 | assert(!r->data_block_and_keys_buffers.empty()); | |
1012 | r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString(); | |
1013 | r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size(); | |
1014 | return; | |
1015 | } | |
1016 | Status compress_status; | |
1017 | CompressAndVerifyBlock(raw_block_contents, is_data_block, | |
1018 | *(r->compression_ctxs[0]), r->verify_ctxs[0].get(), | |
1019 | &(r->compressed_output), &(block_contents), &type, | |
1020 | &compress_status); | |
1021 | r->SetStatus(compress_status); | |
1022 | if (!ok()) { | |
1023 | return; | |
1024 | } | |
1025 | WriteRawBlock(block_contents, type, handle, is_data_block); | |
1026 | r->compressed_output.clear(); | |
1027 | if (is_data_block) { | |
1028 | if (r->filter_builder != nullptr) { | |
1029 | r->filter_builder->StartBlock(r->get_offset()); | |
1030 | } | |
1031 | r->props.data_size = r->get_offset(); | |
1032 | ++r->props.num_data_blocks; | |
1033 | } | |
1034 | } | |
1035 | ||
1036 | void BlockBasedTableBuilder::BGWorkCompression( | |
1037 | const CompressionContext& compression_ctx, | |
1038 | UncompressionContext* verify_ctx) { | |
1039 | ParallelCompressionRep::BlockRep* block_rep = nullptr; | |
1040 | while (rep_->pc_rep->compress_queue.pop(block_rep)) { | |
1041 | assert(block_rep != nullptr); | |
1042 | CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/ | |
1043 | compression_ctx, verify_ctx, | |
1044 | block_rep->compressed_data.get(), | |
1045 | &block_rep->compressed_contents, | |
1046 | &(block_rep->compression_type), &block_rep->status); | |
1047 | block_rep->slot->Fill(block_rep); | |
1048 | } | |
1049 | } | |
1050 | ||
1051 | void BlockBasedTableBuilder::CompressAndVerifyBlock( | |
1052 | const Slice& raw_block_contents, bool is_data_block, | |
1053 | const CompressionContext& compression_ctx, UncompressionContext* verify_ctx, | |
1054 | std::string* compressed_output, Slice* block_contents, | |
1055 | CompressionType* type, Status* out_status) { | |
7c673cae FG |
1056 | // File format contains a sequence of blocks where each block has: |
1057 | // block_data: uint8[n] | |
1058 | // type: uint8 | |
1059 | // crc: uint32 | |
7c673cae | 1060 | Rep* r = rep_; |
20effc67 TL |
1061 | bool is_status_ok = ok(); |
1062 | if (!r->IsParallelCompressionEnabled()) { | |
1063 | assert(is_status_ok); | |
1064 | } | |
7c673cae | 1065 | |
20effc67 | 1066 | *type = r->compression_type; |
494da23a | 1067 | uint64_t sample_for_compression = r->sample_for_compression; |
7c673cae FG |
1068 | bool abort_compression = false; |
1069 | ||
494da23a TL |
1070 | StopWatchNano timer( |
1071 | r->ioptions.env, | |
1072 | ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)); | |
1073 | ||
20effc67 | 1074 | if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) { |
494da23a TL |
1075 | const CompressionDict* compression_dict; |
1076 | if (!is_data_block || r->compression_dict == nullptr) { | |
1077 | compression_dict = &CompressionDict::GetEmptyDict(); | |
11fdf7f2 | 1078 | } else { |
494da23a | 1079 | compression_dict = r->compression_dict.get(); |
7c673cae | 1080 | } |
494da23a | 1081 | assert(compression_dict != nullptr); |
20effc67 TL |
1082 | CompressionInfo compression_info(r->compression_opts, compression_ctx, |
1083 | *compression_dict, *type, | |
494da23a TL |
1084 | sample_for_compression); |
1085 | ||
1086 | std::string sampled_output_fast; | |
1087 | std::string sampled_output_slow; | |
20effc67 TL |
1088 | *block_contents = CompressBlock( |
1089 | raw_block_contents, compression_info, type, | |
494da23a | 1090 | r->table_options.format_version, is_data_block /* do_sample */, |
20effc67 | 1091 | compressed_output, &sampled_output_fast, &sampled_output_slow); |
494da23a TL |
1092 | |
1093 | // notify collectors on block add | |
1094 | NotifyCollectTableCollectorsOnBlockAdd( | |
1095 | r->table_properties_collectors, raw_block_contents.size(), | |
1096 | sampled_output_fast.size(), sampled_output_slow.size()); | |
7c673cae FG |
1097 | |
1098 | // Some of the compression algorithms are known to be unreliable. If | |
1099 | // the verify_compression flag is set then try to de-compress the | |
1100 | // compressed data and compare to the input. | |
20effc67 | 1101 | if (*type != kNoCompression && r->table_options.verify_compression) { |
7c673cae | 1102 | // Retrieve the uncompressed contents into a new buffer |
494da23a TL |
1103 | const UncompressionDict* verify_dict; |
1104 | if (!is_data_block || r->verify_dict == nullptr) { | |
1105 | verify_dict = &UncompressionDict::GetEmptyDict(); | |
1106 | } else { | |
1107 | verify_dict = r->verify_dict.get(); | |
1108 | } | |
1109 | assert(verify_dict != nullptr); | |
7c673cae | 1110 | BlockContents contents; |
20effc67 | 1111 | UncompressionInfo uncompression_info(*verify_ctx, *verify_dict, |
494da23a | 1112 | r->compression_type); |
7c673cae | 1113 | Status stat = UncompressBlockContentsForCompressionType( |
20effc67 | 1114 | uncompression_info, block_contents->data(), block_contents->size(), |
11fdf7f2 | 1115 | &contents, r->table_options.format_version, r->ioptions); |
7c673cae FG |
1116 | |
1117 | if (stat.ok()) { | |
1118 | bool compressed_ok = contents.data.compare(raw_block_contents) == 0; | |
1119 | if (!compressed_ok) { | |
1120 | // The result of the compression was invalid. abort. | |
1121 | abort_compression = true; | |
1122 | ROCKS_LOG_ERROR(r->ioptions.info_log, | |
1123 | "Decompressed block did not match raw block"); | |
20effc67 | 1124 | *out_status = |
7c673cae FG |
1125 | Status::Corruption("Decompressed block did not match raw block"); |
1126 | } | |
1127 | } else { | |
1128 | // Decompression reported an error. abort. | |
20effc67 TL |
1129 | *out_status = Status::Corruption(std::string("Could not decompress: ") + |
1130 | stat.getState()); | |
7c673cae FG |
1131 | abort_compression = true; |
1132 | } | |
1133 | } | |
1134 | } else { | |
1135 | // Block is too big to be compressed. | |
1136 | abort_compression = true; | |
1137 | } | |
1138 | ||
1139 | // Abort compression if the block is too big, or did not pass | |
1140 | // verification. | |
1141 | if (abort_compression) { | |
1142 | RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); | |
20effc67 TL |
1143 | *type = kNoCompression; |
1144 | *block_contents = raw_block_contents; | |
1145 | } else if (*type != kNoCompression) { | |
11fdf7f2 | 1146 | if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) { |
494da23a TL |
1147 | RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS, |
1148 | timer.ElapsedNanos()); | |
11fdf7f2 | 1149 | } |
494da23a TL |
1150 | RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED, |
1151 | raw_block_contents.size()); | |
7c673cae | 1152 | RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED); |
20effc67 | 1153 | } else if (*type != r->compression_type) { |
494da23a | 1154 | RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); |
7c673cae | 1155 | } |
7c673cae FG |
1156 | } |
1157 | ||
1158 | void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, | |
1159 | CompressionType type, | |
11fdf7f2 TL |
1160 | BlockHandle* handle, |
1161 | bool is_data_block) { | |
7c673cae | 1162 | Rep* r = rep_; |
20effc67 TL |
1163 | Status s = Status::OK(); |
1164 | IOStatus io_s = IOStatus::OK(); | |
7c673cae | 1165 | StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS); |
20effc67 | 1166 | handle->set_offset(r->get_offset()); |
7c673cae | 1167 | handle->set_size(block_contents.size()); |
20effc67 TL |
1168 | assert(status().ok()); |
1169 | assert(io_status().ok()); | |
1170 | io_s = r->file->Append(block_contents); | |
1171 | if (io_s.ok()) { | |
7c673cae FG |
1172 | char trailer[kBlockTrailerSize]; |
1173 | trailer[0] = type; | |
20effc67 | 1174 | uint32_t checksum = 0; |
7c673cae FG |
1175 | switch (r->table_options.checksum) { |
1176 | case kNoChecksum: | |
11fdf7f2 | 1177 | break; |
7c673cae | 1178 | case kCRC32c: { |
20effc67 TL |
1179 | uint32_t crc = |
1180 | crc32c::Value(block_contents.data(), block_contents.size()); | |
1181 | // Extend to cover compression type | |
1182 | crc = crc32c::Extend(crc, trailer, 1); | |
1183 | checksum = crc32c::Mask(crc); | |
7c673cae FG |
1184 | break; |
1185 | } | |
1186 | case kxxHash: { | |
f67539c2 TL |
1187 | XXH32_state_t* const state = XXH32_createState(); |
1188 | XXH32_reset(state, 0); | |
20effc67 TL |
1189 | XXH32_update(state, block_contents.data(), block_contents.size()); |
1190 | // Extend to cover compression type | |
1191 | XXH32_update(state, trailer, 1); | |
1192 | checksum = XXH32_digest(state); | |
f67539c2 | 1193 | XXH32_freeState(state); |
7c673cae FG |
1194 | break; |
1195 | } | |
494da23a TL |
1196 | case kxxHash64: { |
1197 | XXH64_state_t* const state = XXH64_createState(); | |
1198 | XXH64_reset(state, 0); | |
20effc67 TL |
1199 | XXH64_update(state, block_contents.data(), block_contents.size()); |
1200 | // Extend to cover compression type | |
1201 | XXH64_update(state, trailer, 1); | |
1202 | checksum = Lower32of64(XXH64_digest(state)); | |
494da23a TL |
1203 | XXH64_freeState(state); |
1204 | break; | |
1205 | } | |
20effc67 TL |
1206 | default: |
1207 | assert(false); | |
1208 | break; | |
7c673cae | 1209 | } |
20effc67 TL |
1210 | EncodeFixed32(trailer + 1, checksum); |
1211 | assert(io_s.ok()); | |
494da23a TL |
1212 | TEST_SYNC_POINT_CALLBACK( |
1213 | "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum", | |
1214 | static_cast<char*>(trailer)); | |
20effc67 TL |
1215 | io_s = r->file->Append(Slice(trailer, kBlockTrailerSize)); |
1216 | if (io_s.ok()) { | |
1217 | assert(s.ok()); | |
1218 | s = InsertBlockInCache(block_contents, type, handle); | |
1219 | if (!s.ok()) { | |
1220 | r->SetStatus(s); | |
1221 | } | |
1222 | } else { | |
1223 | r->SetIOStatus(io_s); | |
7c673cae | 1224 | } |
20effc67 TL |
1225 | if (s.ok() && io_s.ok()) { |
1226 | r->set_offset(r->get_offset() + block_contents.size() + | |
1227 | kBlockTrailerSize); | |
11fdf7f2 TL |
1228 | if (r->table_options.block_align && is_data_block) { |
1229 | size_t pad_bytes = | |
1230 | (r->alignment - ((block_contents.size() + kBlockTrailerSize) & | |
1231 | (r->alignment - 1))) & | |
1232 | (r->alignment - 1); | |
20effc67 TL |
1233 | io_s = r->file->Pad(pad_bytes); |
1234 | if (io_s.ok()) { | |
1235 | r->set_offset(r->get_offset() + pad_bytes); | |
1236 | } else { | |
1237 | r->SetIOStatus(io_s); | |
1238 | } | |
1239 | } | |
1240 | if (r->IsParallelCompressionEnabled()) { | |
1241 | if (is_data_block) { | |
1242 | r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(), | |
1243 | r->get_offset()); | |
1244 | } else { | |
1245 | r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset()); | |
11fdf7f2 TL |
1246 | } |
1247 | } | |
7c673cae | 1248 | } |
20effc67 TL |
1249 | } else { |
1250 | r->SetIOStatus(io_s); | |
1251 | } | |
1252 | if (!io_s.ok() && s.ok()) { | |
1253 | r->SetStatus(io_s); | |
7c673cae FG |
1254 | } |
1255 | } | |
1256 | ||
20effc67 TL |
1257 | void BlockBasedTableBuilder::BGWorkWriteRawBlock() { |
1258 | Rep* r = rep_; | |
1259 | ParallelCompressionRep::BlockRepSlot* slot = nullptr; | |
1260 | ParallelCompressionRep::BlockRep* block_rep = nullptr; | |
1261 | while (r->pc_rep->write_queue.pop(slot)) { | |
1262 | assert(slot != nullptr); | |
1263 | slot->Take(block_rep); | |
1264 | assert(block_rep != nullptr); | |
1265 | if (!block_rep->status.ok()) { | |
1266 | r->SetStatus(block_rep->status); | |
1267 | // Reap block so that blocked Flush() can finish | |
1268 | // if there is one, and Flush() will notice !ok() next time. | |
1269 | block_rep->status = Status::OK(); | |
1270 | r->pc_rep->ReapBlock(block_rep); | |
1271 | continue; | |
1272 | } | |
1273 | ||
1274 | for (size_t i = 0; i < block_rep->keys->Size(); i++) { | |
1275 | auto& key = (*block_rep->keys)[i]; | |
1276 | if (r->filter_builder != nullptr) { | |
1277 | size_t ts_sz = | |
1278 | r->internal_comparator.user_comparator()->timestamp_size(); | |
1279 | r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); | |
1280 | } | |
1281 | r->index_builder->OnKeyAdded(key); | |
1282 | } | |
1283 | ||
1284 | r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size()); | |
1285 | WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type, | |
1286 | &r->pending_handle, true /* is_data_block*/); | |
1287 | if (!ok()) { | |
1288 | break; | |
1289 | } | |
1290 | ||
1291 | if (r->filter_builder != nullptr) { | |
1292 | r->filter_builder->StartBlock(r->get_offset()); | |
1293 | } | |
1294 | r->props.data_size = r->get_offset(); | |
1295 | ++r->props.num_data_blocks; | |
1296 | ||
1297 | if (block_rep->first_key_in_next_block == nullptr) { | |
1298 | r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr, | |
1299 | r->pending_handle); | |
1300 | } else { | |
1301 | Slice first_key_in_next_block = | |
1302 | Slice(*block_rep->first_key_in_next_block); | |
1303 | r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), | |
1304 | &first_key_in_next_block, | |
1305 | r->pending_handle); | |
1306 | } | |
1307 | ||
1308 | r->pc_rep->ReapBlock(block_rep); | |
1309 | } | |
1310 | } | |
1311 | ||
1312 | void BlockBasedTableBuilder::StartParallelCompression() { | |
1313 | rep_->pc_rep.reset( | |
1314 | new ParallelCompressionRep(rep_->compression_opts.parallel_threads)); | |
1315 | rep_->pc_rep->compress_thread_pool.reserve( | |
1316 | rep_->compression_opts.parallel_threads); | |
1317 | for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) { | |
1318 | rep_->pc_rep->compress_thread_pool.emplace_back([this, i] { | |
1319 | BGWorkCompression(*(rep_->compression_ctxs[i]), | |
1320 | rep_->verify_ctxs[i].get()); | |
1321 | }); | |
1322 | } | |
1323 | rep_->pc_rep->write_thread.reset( | |
1324 | new port::Thread([this] { BGWorkWriteRawBlock(); })); | |
1325 | } | |
1326 | ||
1327 | void BlockBasedTableBuilder::StopParallelCompression() { | |
1328 | rep_->pc_rep->compress_queue.finish(); | |
1329 | for (auto& thread : rep_->pc_rep->compress_thread_pool) { | |
1330 | thread.join(); | |
1331 | } | |
1332 | rep_->pc_rep->write_queue.finish(); | |
1333 | rep_->pc_rep->write_thread->join(); | |
1334 | } | |
1335 | ||
1336 | Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); } | |
1337 | ||
1338 | IOStatus BlockBasedTableBuilder::io_status() const { | |
1339 | return rep_->GetIOStatus(); | |
1340 | } | |
7c673cae | 1341 | |
494da23a TL |
1342 | static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) { |
1343 | BlockContents* bc = reinterpret_cast<BlockContents*>(value); | |
1344 | delete bc; | |
7c673cae FG |
1345 | } |
1346 | ||
1347 | // | |
1348 | // Make a copy of the block contents and insert into compressed block cache | |
1349 | // | |
1350 | Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, | |
1351 | const CompressionType type, | |
1352 | const BlockHandle* handle) { | |
1353 | Rep* r = rep_; | |
1354 | Cache* block_cache_compressed = r->table_options.block_cache_compressed.get(); | |
1355 | ||
1356 | if (type != kNoCompression && block_cache_compressed != nullptr) { | |
7c673cae FG |
1357 | size_t size = block_contents.size(); |
1358 | ||
494da23a TL |
1359 | auto ubuf = |
1360 | AllocateBlock(size + 1, block_cache_compressed->memory_allocator()); | |
7c673cae FG |
1361 | memcpy(ubuf.get(), block_contents.data(), size); |
1362 | ubuf[size] = type; | |
1363 | ||
494da23a TL |
1364 | BlockContents* block_contents_to_cache = |
1365 | new BlockContents(std::move(ubuf), size); | |
1366 | #ifndef NDEBUG | |
1367 | block_contents_to_cache->is_raw_block = true; | |
1368 | #endif // NDEBUG | |
7c673cae FG |
1369 | |
1370 | // make cache key by appending the file offset to the cache prefix id | |
1371 | char* end = EncodeVarint64( | |
494da23a TL |
1372 | r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size, |
1373 | handle->offset()); | |
1374 | Slice key(r->compressed_cache_key_prefix, | |
1375 | static_cast<size_t>(end - r->compressed_cache_key_prefix)); | |
7c673cae FG |
1376 | |
1377 | // Insert into compressed block cache. | |
20effc67 TL |
1378 | // How should we deal with compressed cache full? |
1379 | block_cache_compressed | |
1380 | ->Insert(key, block_contents_to_cache, | |
1381 | block_contents_to_cache->ApproximateMemoryUsage(), | |
1382 | &DeleteCachedBlockContents) | |
1383 | .PermitUncheckedError(); | |
7c673cae FG |
1384 | |
1385 | // Invalidate OS cache. | |
20effc67 TL |
1386 | r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size) |
1387 | .PermitUncheckedError(); | |
7c673cae FG |
1388 | } |
1389 | return Status::OK(); | |
1390 | } | |
1391 | ||
11fdf7f2 TL |
1392 | void BlockBasedTableBuilder::WriteFilterBlock( |
1393 | MetaIndexBuilder* meta_index_builder) { | |
1394 | BlockHandle filter_block_handle; | |
1395 | bool empty_filter_block = (rep_->filter_builder == nullptr || | |
1396 | rep_->filter_builder->NumAdded() == 0); | |
1397 | if (ok() && !empty_filter_block) { | |
7c673cae | 1398 | Status s = Status::Incomplete(); |
11fdf7f2 TL |
1399 | while (ok() && s.IsIncomplete()) { |
1400 | Slice filter_content = | |
1401 | rep_->filter_builder->Finish(filter_block_handle, &s); | |
7c673cae | 1402 | assert(s.ok() || s.IsIncomplete()); |
11fdf7f2 | 1403 | rep_->props.filter_size += filter_content.size(); |
7c673cae FG |
1404 | WriteRawBlock(filter_content, kNoCompression, &filter_block_handle); |
1405 | } | |
1406 | } | |
11fdf7f2 TL |
1407 | if (ok() && !empty_filter_block) { |
1408 | // Add mapping from "<filter_block_prefix>.Name" to location | |
1409 | // of filter data. | |
1410 | std::string key; | |
1411 | if (rep_->filter_builder->IsBlockBased()) { | |
1412 | key = BlockBasedTable::kFilterBlockPrefix; | |
1413 | } else { | |
1414 | key = rep_->table_options.partition_filters | |
1415 | ? BlockBasedTable::kPartitionedFilterBlockPrefix | |
1416 | : BlockBasedTable::kFullFilterBlockPrefix; | |
1417 | } | |
1418 | key.append(rep_->table_options.filter_policy->Name()); | |
1419 | meta_index_builder->Add(key, filter_block_handle); | |
1420 | } | |
1421 | } | |
7c673cae | 1422 | |
11fdf7f2 TL |
1423 | void BlockBasedTableBuilder::WriteIndexBlock( |
1424 | MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) { | |
7c673cae | 1425 | IndexBuilder::IndexBlocks index_blocks; |
11fdf7f2 | 1426 | auto index_builder_status = rep_->index_builder->Finish(&index_blocks); |
7c673cae FG |
1427 | if (index_builder_status.IsIncomplete()) { |
1428 | // We we have more than one index partition then meta_blocks are not | |
1429 | // supported for the index. Currently meta_blocks are used only by | |
1430 | // HashIndexBuilder which is not multi-partition. | |
1431 | assert(index_blocks.meta_blocks.empty()); | |
11fdf7f2 | 1432 | } else if (ok() && !index_builder_status.ok()) { |
20effc67 | 1433 | rep_->SetStatus(index_builder_status); |
7c673cae | 1434 | } |
11fdf7f2 TL |
1435 | if (ok()) { |
1436 | for (const auto& item : index_blocks.meta_blocks) { | |
1437 | BlockHandle block_handle; | |
1438 | WriteBlock(item.second, &block_handle, false /* is_data_block */); | |
1439 | if (!ok()) { | |
1440 | break; | |
1441 | } | |
1442 | meta_index_builder->Add(item.first, block_handle); | |
1443 | } | |
7c673cae | 1444 | } |
11fdf7f2 TL |
1445 | if (ok()) { |
1446 | if (rep_->table_options.enable_index_compression) { | |
1447 | WriteBlock(index_blocks.index_block_contents, index_block_handle, false); | |
1448 | } else { | |
1449 | WriteRawBlock(index_blocks.index_block_contents, kNoCompression, | |
1450 | index_block_handle); | |
1451 | } | |
1452 | } | |
1453 | // If there are more index partitions, finish them and write them out | |
1454 | Status s = index_builder_status; | |
1455 | while (ok() && s.IsIncomplete()) { | |
1456 | s = rep_->index_builder->Finish(&index_blocks, *index_block_handle); | |
1457 | if (!s.ok() && !s.IsIncomplete()) { | |
20effc67 | 1458 | rep_->SetStatus(s); |
11fdf7f2 TL |
1459 | return; |
1460 | } | |
1461 | if (rep_->table_options.enable_index_compression) { | |
1462 | WriteBlock(index_blocks.index_block_contents, index_block_handle, false); | |
1463 | } else { | |
1464 | WriteRawBlock(index_blocks.index_block_contents, kNoCompression, | |
1465 | index_block_handle); | |
1466 | } | |
1467 | // The last index_block_handle will be for the partition index block | |
1468 | } | |
1469 | } | |
7c673cae | 1470 | |
11fdf7f2 TL |
1471 | void BlockBasedTableBuilder::WritePropertiesBlock( |
1472 | MetaIndexBuilder* meta_index_builder) { | |
1473 | BlockHandle properties_block_handle; | |
7c673cae | 1474 | if (ok()) { |
11fdf7f2 TL |
1475 | PropertyBlockBuilder property_block_builder; |
1476 | rep_->props.column_family_id = rep_->column_family_id; | |
1477 | rep_->props.column_family_name = rep_->column_family_name; | |
1478 | rep_->props.filter_policy_name = | |
1479 | rep_->table_options.filter_policy != nullptr | |
1480 | ? rep_->table_options.filter_policy->Name() | |
1481 | : ""; | |
1482 | rep_->props.index_size = | |
1483 | rep_->index_builder->IndexSize() + kBlockTrailerSize; | |
1484 | rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr | |
1485 | ? rep_->ioptions.user_comparator->Name() | |
1486 | : "nullptr"; | |
1487 | rep_->props.merge_operator_name = | |
1488 | rep_->ioptions.merge_operator != nullptr | |
1489 | ? rep_->ioptions.merge_operator->Name() | |
1490 | : "nullptr"; | |
1491 | rep_->props.compression_name = | |
494da23a TL |
1492 | CompressionTypeToString(rep_->compression_type); |
1493 | rep_->props.compression_options = | |
1494 | CompressionOptionsToString(rep_->compression_opts); | |
11fdf7f2 TL |
1495 | rep_->props.prefix_extractor_name = |
1496 | rep_->moptions.prefix_extractor != nullptr | |
1497 | ? rep_->moptions.prefix_extractor->Name() | |
1498 | : "nullptr"; | |
1499 | ||
1500 | std::string property_collectors_names = "["; | |
1501 | for (size_t i = 0; | |
1502 | i < rep_->ioptions.table_properties_collector_factories.size(); ++i) { | |
1503 | if (i != 0) { | |
1504 | property_collectors_names += ","; | |
7c673cae | 1505 | } |
11fdf7f2 TL |
1506 | property_collectors_names += |
1507 | rep_->ioptions.table_properties_collector_factories[i]->Name(); | |
1508 | } | |
1509 | property_collectors_names += "]"; | |
1510 | rep_->props.property_collectors_names = property_collectors_names; | |
1511 | if (rep_->table_options.index_type == | |
1512 | BlockBasedTableOptions::kTwoLevelIndexSearch) { | |
1513 | assert(rep_->p_index_builder_ != nullptr); | |
1514 | rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions(); | |
1515 | rep_->props.top_level_index_size = | |
1516 | rep_->p_index_builder_->TopLevelIndexSize(rep_->offset); | |
1517 | } | |
1518 | rep_->props.index_key_is_user_key = | |
1519 | !rep_->index_builder->seperator_is_key_plus_seq(); | |
1520 | rep_->props.index_value_is_delta_encoded = | |
1521 | rep_->use_delta_encoding_for_index_values; | |
1522 | rep_->props.creation_time = rep_->creation_time; | |
1523 | rep_->props.oldest_key_time = rep_->oldest_key_time; | |
f67539c2 | 1524 | rep_->props.file_creation_time = rep_->file_creation_time; |
20effc67 TL |
1525 | rep_->props.db_id = rep_->db_id; |
1526 | rep_->props.db_session_id = rep_->db_session_id; | |
1527 | rep_->props.db_host_id = rep_->db_host_id; | |
11fdf7f2 TL |
1528 | |
1529 | // Add basic properties | |
1530 | property_block_builder.AddTableProperty(rep_->props); | |
1531 | ||
1532 | // Add use collected properties | |
1533 | NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors, | |
1534 | rep_->ioptions.info_log, | |
1535 | &property_block_builder); | |
1536 | ||
1537 | WriteRawBlock(property_block_builder.Finish(), kNoCompression, | |
1538 | &properties_block_handle); | |
1539 | } | |
1540 | if (ok()) { | |
494da23a TL |
1541 | #ifndef NDEBUG |
1542 | { | |
1543 | uint64_t props_block_offset = properties_block_handle.offset(); | |
1544 | uint64_t props_block_size = properties_block_handle.size(); | |
1545 | TEST_SYNC_POINT_CALLBACK( | |
1546 | "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset", | |
1547 | &props_block_offset); | |
1548 | TEST_SYNC_POINT_CALLBACK( | |
1549 | "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize", | |
1550 | &props_block_size); | |
1551 | } | |
1552 | #endif // !NDEBUG | |
11fdf7f2 TL |
1553 | meta_index_builder->Add(kPropertiesBlock, properties_block_handle); |
1554 | } | |
1555 | } | |
1556 | ||
1557 | void BlockBasedTableBuilder::WriteCompressionDictBlock( | |
1558 | MetaIndexBuilder* meta_index_builder) { | |
494da23a TL |
1559 | if (rep_->compression_dict != nullptr && |
1560 | rep_->compression_dict->GetRawDict().size()) { | |
11fdf7f2 TL |
1561 | BlockHandle compression_dict_block_handle; |
1562 | if (ok()) { | |
494da23a | 1563 | WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression, |
11fdf7f2 | 1564 | &compression_dict_block_handle); |
494da23a TL |
1565 | #ifndef NDEBUG |
1566 | Slice compression_dict = rep_->compression_dict->GetRawDict(); | |
1567 | TEST_SYNC_POINT_CALLBACK( | |
1568 | "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict", | |
1569 | &compression_dict); | |
1570 | #endif // NDEBUG | |
11fdf7f2 TL |
1571 | } |
1572 | if (ok()) { | |
1573 | meta_index_builder->Add(kCompressionDictBlock, | |
1574 | compression_dict_block_handle); | |
7c673cae | 1575 | } |
11fdf7f2 TL |
1576 | } |
1577 | } | |
7c673cae | 1578 | |
11fdf7f2 TL |
1579 | void BlockBasedTableBuilder::WriteRangeDelBlock( |
1580 | MetaIndexBuilder* meta_index_builder) { | |
1581 | if (ok() && !rep_->range_del_block.empty()) { | |
1582 | BlockHandle range_del_block_handle; | |
1583 | WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression, | |
1584 | &range_del_block_handle); | |
1585 | meta_index_builder->Add(kRangeDelBlock, range_del_block_handle); | |
1586 | } | |
1587 | } | |
7c673cae | 1588 | |
494da23a TL |
1589 | void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle, |
1590 | BlockHandle& index_block_handle) { | |
1591 | Rep* r = rep_; | |
1592 | // No need to write out new footer if we're using default checksum. | |
1593 | // We're writing legacy magic number because we want old versions of RocksDB | |
1594 | // be able to read files generated with new release (just in case if | |
1595 | // somebody wants to roll back after an upgrade) | |
1596 | // TODO(icanadi) at some point in the future, when we're absolutely sure | |
1597 | // nobody will roll back to RocksDB 2.x versions, retire the legacy magic | |
1598 | // number and always write new table files with new magic number | |
1599 | bool legacy = (r->table_options.format_version == 0); | |
1600 | // this is guaranteed by BlockBasedTableBuilder's constructor | |
1601 | assert(r->table_options.checksum == kCRC32c || | |
1602 | r->table_options.format_version != 0); | |
1603 | Footer footer( | |
1604 | legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber, | |
1605 | r->table_options.format_version); | |
1606 | footer.set_metaindex_handle(metaindex_block_handle); | |
1607 | footer.set_index_handle(index_block_handle); | |
1608 | footer.set_checksum(r->table_options.checksum); | |
1609 | std::string footer_encoding; | |
1610 | footer.EncodeTo(&footer_encoding); | |
20effc67 TL |
1611 | assert(ok()); |
1612 | IOStatus ios = r->file->Append(footer_encoding); | |
1613 | if (ios.ok()) { | |
1614 | r->set_offset(r->get_offset() + footer_encoding.size()); | |
1615 | } else { | |
1616 | r->SetIOStatus(ios); | |
1617 | r->SetStatus(ios); | |
494da23a TL |
1618 | } |
1619 | } | |
1620 | ||
1621 | void BlockBasedTableBuilder::EnterUnbuffered() { | |
1622 | Rep* r = rep_; | |
1623 | assert(r->state == Rep::State::kBuffered); | |
1624 | r->state = Rep::State::kUnbuffered; | |
1625 | const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0 | |
1626 | ? r->compression_opts.zstd_max_train_bytes | |
1627 | : r->compression_opts.max_dict_bytes; | |
1628 | Random64 generator{r->creation_time}; | |
1629 | std::string compression_dict_samples; | |
1630 | std::vector<size_t> compression_dict_sample_lens; | |
1631 | if (!r->data_block_and_keys_buffers.empty()) { | |
1632 | while (compression_dict_samples.size() < kSampleBytes) { | |
1633 | size_t rand_idx = | |
f67539c2 TL |
1634 | static_cast<size_t>( |
1635 | generator.Uniform(r->data_block_and_keys_buffers.size())); | |
494da23a TL |
1636 | size_t copy_len = |
1637 | std::min(kSampleBytes - compression_dict_samples.size(), | |
1638 | r->data_block_and_keys_buffers[rand_idx].first.size()); | |
1639 | compression_dict_samples.append( | |
1640 | r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len); | |
1641 | compression_dict_sample_lens.emplace_back(copy_len); | |
1642 | } | |
1643 | } | |
1644 | ||
1645 | // final data block flushed, now we can generate dictionary from the samples. | |
1646 | // OK if compression_dict_samples is empty, we'll just get empty dictionary. | |
1647 | std::string dict; | |
1648 | if (r->compression_opts.zstd_max_train_bytes > 0) { | |
1649 | dict = ZSTD_TrainDictionary(compression_dict_samples, | |
1650 | compression_dict_sample_lens, | |
1651 | r->compression_opts.max_dict_bytes); | |
1652 | } else { | |
1653 | dict = std::move(compression_dict_samples); | |
1654 | } | |
1655 | r->compression_dict.reset(new CompressionDict(dict, r->compression_type, | |
1656 | r->compression_opts.level)); | |
1657 | r->verify_dict.reset(new UncompressionDict( | |
1658 | dict, r->compression_type == kZSTD || | |
1659 | r->compression_type == kZSTDNotFinalCompression)); | |
1660 | ||
1661 | for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) { | |
20effc67 | 1662 | auto& data_block = r->data_block_and_keys_buffers[i].first; |
494da23a TL |
1663 | auto& keys = r->data_block_and_keys_buffers[i].second; |
1664 | assert(!data_block.empty()); | |
1665 | assert(!keys.empty()); | |
1666 | ||
20effc67 TL |
1667 | if (r->IsParallelCompressionEnabled()) { |
1668 | Slice first_key_in_next_block; | |
1669 | const Slice* first_key_in_next_block_ptr = &first_key_in_next_block; | |
1670 | if (i + 1 < r->data_block_and_keys_buffers.size()) { | |
1671 | first_key_in_next_block = | |
1672 | r->data_block_and_keys_buffers[i + 1].second.front(); | |
1673 | } else { | |
1674 | first_key_in_next_block_ptr = r->first_key_in_next_block; | |
1675 | } | |
1676 | ||
1677 | ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock( | |
1678 | r->compression_type, first_key_in_next_block_ptr, &data_block, &keys); | |
1679 | assert(block_rep != nullptr); | |
1680 | r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(), | |
1681 | r->get_offset()); | |
1682 | r->pc_rep->EmitBlock(block_rep); | |
1683 | } else { | |
1684 | for (const auto& key : keys) { | |
1685 | if (r->filter_builder != nullptr) { | |
1686 | size_t ts_sz = | |
1687 | r->internal_comparator.user_comparator()->timestamp_size(); | |
1688 | r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); | |
1689 | } | |
1690 | r->index_builder->OnKeyAdded(key); | |
1691 | } | |
1692 | WriteBlock(Slice(data_block), &r->pending_handle, | |
1693 | true /* is_data_block */); | |
1694 | if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) { | |
1695 | Slice first_key_in_next_block = | |
1696 | r->data_block_and_keys_buffers[i + 1].second.front(); | |
1697 | Slice* first_key_in_next_block_ptr = &first_key_in_next_block; | |
1698 | r->index_builder->AddIndexEntry( | |
1699 | &keys.back(), first_key_in_next_block_ptr, r->pending_handle); | |
494da23a | 1700 | } |
494da23a TL |
1701 | } |
1702 | } | |
1703 | r->data_block_and_keys_buffers.clear(); | |
1704 | } | |
1705 | ||
11fdf7f2 TL |
1706 | Status BlockBasedTableBuilder::Finish() { |
1707 | Rep* r = rep_; | |
494da23a | 1708 | assert(r->state != Rep::State::kClosed); |
11fdf7f2 | 1709 | bool empty_data_block = r->data_block.empty(); |
20effc67 | 1710 | r->first_key_in_next_block = nullptr; |
11fdf7f2 | 1711 | Flush(); |
494da23a TL |
1712 | if (r->state == Rep::State::kBuffered) { |
1713 | EnterUnbuffered(); | |
1714 | } | |
20effc67 TL |
1715 | if (r->IsParallelCompressionEnabled()) { |
1716 | StopParallelCompression(); | |
1717 | #ifndef NDEBUG | |
1718 | for (const auto& br : r->pc_rep->block_rep_buf) { | |
1719 | assert(br.status.ok()); | |
1720 | } | |
1721 | #endif // !NDEBUG | |
1722 | } else { | |
1723 | // To make sure properties block is able to keep the accurate size of index | |
1724 | // block, we will finish writing all index entries first. | |
1725 | if (ok() && !empty_data_block) { | |
1726 | r->index_builder->AddIndexEntry( | |
1727 | &r->last_key, nullptr /* no next data block */, r->pending_handle); | |
1728 | } | |
11fdf7f2 | 1729 | } |
7c673cae | 1730 | |
494da23a | 1731 | // Write meta blocks, metaindex block and footer in the following order. |
11fdf7f2 TL |
1732 | // 1. [meta block: filter] |
1733 | // 2. [meta block: index] | |
1734 | // 3. [meta block: compression dictionary] | |
1735 | // 4. [meta block: range deletion tombstone] | |
1736 | // 5. [meta block: properties] | |
1737 | // 6. [metaindex block] | |
494da23a | 1738 | // 7. Footer |
11fdf7f2 TL |
1739 | BlockHandle metaindex_block_handle, index_block_handle; |
1740 | MetaIndexBuilder meta_index_builder; | |
1741 | WriteFilterBlock(&meta_index_builder); | |
1742 | WriteIndexBlock(&meta_index_builder, &index_block_handle); | |
1743 | WriteCompressionDictBlock(&meta_index_builder); | |
1744 | WriteRangeDelBlock(&meta_index_builder); | |
1745 | WritePropertiesBlock(&meta_index_builder); | |
7c673cae FG |
1746 | if (ok()) { |
1747 | // flush the meta index block | |
1748 | WriteRawBlock(meta_index_builder.Finish(), kNoCompression, | |
1749 | &metaindex_block_handle); | |
7c673cae | 1750 | } |
7c673cae | 1751 | if (ok()) { |
494da23a | 1752 | WriteFooter(metaindex_block_handle, index_block_handle); |
7c673cae | 1753 | } |
494da23a | 1754 | r->state = Rep::State::kClosed; |
20effc67 TL |
1755 | r->SetStatus(r->CopyIOStatus()); |
1756 | Status ret_status = r->CopyStatus(); | |
1757 | assert(!ret_status.ok() || io_status().ok()); | |
1758 | return ret_status; | |
7c673cae FG |
1759 | } |
1760 | ||
1761 | void BlockBasedTableBuilder::Abandon() { | |
494da23a | 1762 | assert(rep_->state != Rep::State::kClosed); |
20effc67 TL |
1763 | if (rep_->IsParallelCompressionEnabled()) { |
1764 | StopParallelCompression(); | |
1765 | } | |
494da23a | 1766 | rep_->state = Rep::State::kClosed; |
20effc67 TL |
1767 | rep_->CopyStatus().PermitUncheckedError(); |
1768 | rep_->CopyIOStatus().PermitUncheckedError(); | |
7c673cae FG |
1769 | } |
1770 | ||
1771 | uint64_t BlockBasedTableBuilder::NumEntries() const { | |
1772 | return rep_->props.num_entries; | |
1773 | } | |
1774 | ||
20effc67 TL |
1775 | bool BlockBasedTableBuilder::IsEmpty() const { |
1776 | return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0; | |
1777 | } | |
1778 | ||
494da23a | 1779 | uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } |
7c673cae | 1780 | |
20effc67 TL |
1781 | uint64_t BlockBasedTableBuilder::EstimatedFileSize() const { |
1782 | if (rep_->IsParallelCompressionEnabled()) { | |
1783 | // Use compression ratio so far and inflight raw bytes to estimate | |
1784 | // final SST size. | |
1785 | return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize(); | |
1786 | } else { | |
1787 | return FileSize(); | |
1788 | } | |
1789 | } | |
1790 | ||
7c673cae FG |
1791 | bool BlockBasedTableBuilder::NeedCompact() const { |
1792 | for (const auto& collector : rep_->table_properties_collectors) { | |
1793 | if (collector->NeedCompact()) { | |
1794 | return true; | |
1795 | } | |
1796 | } | |
1797 | return false; | |
1798 | } | |
1799 | ||
1800 | TableProperties BlockBasedTableBuilder::GetTableProperties() const { | |
1801 | TableProperties ret = rep_->props; | |
1802 | for (const auto& collector : rep_->table_properties_collectors) { | |
1803 | for (const auto& prop : collector->GetReadableProperties()) { | |
1804 | ret.readable_properties.insert(prop); | |
1805 | } | |
20effc67 | 1806 | collector->Finish(&ret.user_collected_properties).PermitUncheckedError(); |
7c673cae FG |
1807 | } |
1808 | return ret; | |
1809 | } | |
1810 | ||
20effc67 TL |
1811 | std::string BlockBasedTableBuilder::GetFileChecksum() const { |
1812 | if (rep_->file != nullptr) { | |
1813 | return rep_->file->GetFileChecksum(); | |
1814 | } else { | |
1815 | return kUnknownFileChecksum; | |
1816 | } | |
1817 | } | |
1818 | ||
f67539c2 TL |
1819 | const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const { |
1820 | if (rep_->file != nullptr) { | |
1821 | return rep_->file->GetFileChecksumFuncName(); | |
1822 | } else { | |
20effc67 | 1823 | return kUnknownFileChecksumFuncName; |
f67539c2 TL |
1824 | } |
1825 | } | |
1826 | ||
7c673cae FG |
1827 | const std::string BlockBasedTable::kFilterBlockPrefix = "filter."; |
1828 | const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter."; | |
1829 | const std::string BlockBasedTable::kPartitionedFilterBlockPrefix = | |
1830 | "partitionedfilter."; | |
f67539c2 | 1831 | } // namespace ROCKSDB_NAMESPACE |