1 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
9 #include <unordered_map>
11 #include "file/random_access_file_reader.h"
12 #include "file/writable_file_writer.h"
13 #include "rocksdb/utilities/cache_dump_load.h"
14 #include "table/block_based/block.h"
15 #include "table/block_based/block_like_traits.h"
16 #include "table/block_based/block_type.h"
17 #include "table/block_based/cachable_entry.h"
18 #include "table/block_based/parsed_full_filter_block.h"
19 #include "table/block_based/reader_common.h"
21 namespace ROCKSDB_NAMESPACE
{
23 // the read buffer size of for the default CacheDumpReader
24 const unsigned int kDumpReaderBufferSize
= 1024; // 1KB
25 static const unsigned int kSizePrefixLen
= 4;
27 enum CacheDumpUnitType
: unsigned char {
33 kCompressionDictionary
= 6,
35 kHashIndexPrefixes
= 8,
36 kHashIndexMetadata
= 9,
39 kDeprecatedFilterBlock
= 12, // OBSOLETE / DEPRECATED
40 kFilterMetaBlock
= 13,
44 // The metadata of a dump unit. After it is serilized, its size is fixed 16
47 // sequence number is a monotonically increasing number to indicate the order
48 // of the blocks being written. Header is 0.
49 uint32_t sequence_num
;
50 // The Crc32c checksum of its dump unit.
51 uint32_t dump_unit_checksum
;
52 // The dump unit size after the dump unit is serilized to a string.
53 uint64_t dump_unit_size
;
57 dump_unit_checksum
= 0;
62 // The data structure to hold a block and its information.
64 // The timestamp when the block is identified, copied, and dumped from block
67 // The type of the block
68 CacheDumpUnitType type
;
69 // The key of this block when the block is referenced by this Cache
73 // The Crc32c checksum of the block
74 uint32_t value_checksum
;
75 // Pointer to the block. Note that, in the dump process, it points to a memory
76 // buffer copied from cache block. The buffer is freed when we process the
77 // next block. In the load process, we use an std::string to store the
78 // serialized dump_unit read from the reader. So it points to the memory
79 // address of the begin of the block in this string.
82 DumpUnit() { reset(); }
86 type
= CacheDumpUnitType::kBlockTypeMax
;
94 // The default implementation of the Cache Dumper
95 class CacheDumperImpl
: public CacheDumper
{
97 CacheDumperImpl(const CacheDumpOptions
& dump_options
,
98 const std::shared_ptr
<Cache
>& cache
,
99 std::unique_ptr
<CacheDumpWriter
>&& writer
)
100 : options_(dump_options
), cache_(cache
), writer_(std::move(writer
)) {}
101 ~CacheDumperImpl() { writer_
.reset(); }
102 Status
SetDumpFilter(std::vector
<DB
*> db_list
) override
;
103 IOStatus
DumpCacheEntriesToWriter() override
;
106 IOStatus
WriteBlock(CacheDumpUnitType type
, const Slice
& key
,
108 IOStatus
WriteHeader();
109 IOStatus
WriteFooter();
110 bool ShouldFilterOut(const Slice
& key
);
111 std::function
<void(const Slice
&, void*, size_t, Cache::DeleterFn
)>
112 DumpOneBlockCallBack();
114 CacheDumpOptions options_
;
115 std::shared_ptr
<Cache
> cache_
;
116 std::unique_ptr
<CacheDumpWriter
> writer_
;
117 UnorderedMap
<Cache::DeleterFn
, CacheEntryRole
> role_map_
;
119 uint32_t sequence_num_
;
120 // The cache key prefix filter. Currently, we use db_session_id as the prefix,
121 // so using std::set to store the prefixes as filter is enough. Further
122 // improvement can be applied like BloomFilter or others to speedup the
124 std::set
<std::string
> prefix_filter_
;
127 // The default implementation of CacheDumpedLoader
128 class CacheDumpedLoaderImpl
: public CacheDumpedLoader
{
130 CacheDumpedLoaderImpl(const CacheDumpOptions
& dump_options
,
131 const BlockBasedTableOptions
& /*toptions*/,
132 const std::shared_ptr
<SecondaryCache
>& secondary_cache
,
133 std::unique_ptr
<CacheDumpReader
>&& reader
)
134 : options_(dump_options
),
135 secondary_cache_(secondary_cache
),
136 reader_(std::move(reader
)) {}
137 ~CacheDumpedLoaderImpl() {}
138 IOStatus
RestoreCacheEntriesToSecondaryCache() override
;
141 IOStatus
ReadDumpUnitMeta(std::string
* data
, DumpUnitMeta
* unit_meta
);
142 IOStatus
ReadDumpUnit(size_t len
, std::string
* data
, DumpUnit
* unit
);
143 IOStatus
ReadHeader(std::string
* data
, DumpUnit
* dump_unit
);
144 IOStatus
ReadCacheBlock(std::string
* data
, DumpUnit
* dump_unit
);
146 CacheDumpOptions options_
;
147 std::shared_ptr
<SecondaryCache
> secondary_cache_
;
148 std::unique_ptr
<CacheDumpReader
> reader_
;
149 UnorderedMap
<Cache::DeleterFn
, CacheEntryRole
> role_map_
;
152 // The default implementation of CacheDumpWriter. We write the blocks to a file
154 class ToFileCacheDumpWriter
: public CacheDumpWriter
{
156 explicit ToFileCacheDumpWriter(
157 std::unique_ptr
<WritableFileWriter
>&& file_writer
)
158 : file_writer_(std::move(file_writer
)) {}
160 ~ToFileCacheDumpWriter() { Close().PermitUncheckedError(); }
162 // Write the serialized metadata to the file
163 virtual IOStatus
WriteMetadata(const Slice
& metadata
) override
{
164 assert(file_writer_
!= nullptr);
166 PutFixed32(&prefix
, static_cast<uint32_t>(metadata
.size()));
167 IOStatus io_s
= file_writer_
->Append(Slice(prefix
));
171 io_s
= file_writer_
->Append(metadata
);
175 // Write the serialized data to the file
176 virtual IOStatus
WritePacket(const Slice
& data
) override
{
177 assert(file_writer_
!= nullptr);
179 PutFixed32(&prefix
, static_cast<uint32_t>(data
.size()));
180 IOStatus io_s
= file_writer_
->Append(Slice(prefix
));
184 io_s
= file_writer_
->Append(data
);
189 virtual IOStatus
Close() override
{
190 file_writer_
.reset();
191 return IOStatus::OK();
195 std::unique_ptr
<WritableFileWriter
> file_writer_
;
198 // The default implementation of CacheDumpReader. It is implemented based on
199 // RandomAccessFileReader. Note that, we keep an internal variable to remember
200 // the current offset.
201 class FromFileCacheDumpReader
: public CacheDumpReader
{
203 explicit FromFileCacheDumpReader(
204 std::unique_ptr
<RandomAccessFileReader
>&& reader
)
205 : file_reader_(std::move(reader
)),
207 buffer_(new char[kDumpReaderBufferSize
]) {}
209 ~FromFileCacheDumpReader() { delete[] buffer_
; }
211 virtual IOStatus
ReadMetadata(std::string
* metadata
) override
{
212 uint32_t metadata_len
= 0;
213 IOStatus io_s
= ReadSizePrefix(&metadata_len
);
217 return Read(metadata_len
, metadata
);
220 virtual IOStatus
ReadPacket(std::string
* data
) override
{
221 uint32_t data_len
= 0;
222 IOStatus io_s
= ReadSizePrefix(&data_len
);
226 return Read(data_len
, data
);
230 IOStatus
ReadSizePrefix(uint32_t* len
) {
232 IOStatus io_s
= Read(kSizePrefixLen
, &prefix
);
236 Slice
encoded_slice(prefix
);
237 if (!GetFixed32(&encoded_slice
, len
)) {
238 return IOStatus::Corruption("Decode size prefix string failed");
240 return IOStatus::OK();
243 IOStatus
Read(size_t len
, std::string
* data
) {
244 assert(file_reader_
!= nullptr);
247 unsigned int bytes_to_read
= static_cast<unsigned int>(len
);
248 unsigned int to_read
= bytes_to_read
> kDumpReaderBufferSize
249 ? kDumpReaderBufferSize
252 while (to_read
> 0) {
253 io_s
= file_reader_
->Read(IOOptions(), offset_
, to_read
, &result_
,
255 Env::IO_TOTAL
/* rate_limiter_priority */);
259 if (result_
.size() < to_read
) {
260 return IOStatus::Corruption("Corrupted cache dump file.");
262 data
->append(result_
.data(), result_
.size());
265 bytes_to_read
-= to_read
;
266 to_read
= bytes_to_read
> kDumpReaderBufferSize
? kDumpReaderBufferSize
271 std::unique_ptr
<RandomAccessFileReader
> file_reader_
;
277 // The cache dump and load helper class
278 class CacheDumperHelper
{
280 // serialize the dump_unit_meta to a string, it is fixed 16 bytes size.
281 static void EncodeDumpUnitMeta(const DumpUnitMeta
& meta
, std::string
* data
) {
283 PutFixed32(data
, static_cast<uint32_t>(meta
.sequence_num
));
284 PutFixed32(data
, static_cast<uint32_t>(meta
.dump_unit_checksum
));
285 PutFixed64(data
, meta
.dump_unit_size
);
288 // Serialize the dump_unit to a string.
289 static void EncodeDumpUnit(const DumpUnit
& dump_unit
, std::string
* data
) {
291 PutFixed64(data
, dump_unit
.timestamp
);
292 data
->push_back(dump_unit
.type
);
293 PutLengthPrefixedSlice(data
, dump_unit
.key
);
294 PutFixed32(data
, static_cast<uint32_t>(dump_unit
.value_len
));
295 PutFixed32(data
, dump_unit
.value_checksum
);
296 PutLengthPrefixedSlice(data
,
297 Slice((char*)dump_unit
.value
, dump_unit
.value_len
));
300 // Deserialize the dump_unit_meta from a string
301 static Status
DecodeDumpUnitMeta(const std::string
& encoded_data
,
302 DumpUnitMeta
* unit_meta
) {
303 assert(unit_meta
!= nullptr);
304 Slice encoded_slice
= Slice(encoded_data
);
305 if (!GetFixed32(&encoded_slice
, &(unit_meta
->sequence_num
))) {
306 return Status::Incomplete("Decode dumped unit meta sequence_num failed");
308 if (!GetFixed32(&encoded_slice
, &(unit_meta
->dump_unit_checksum
))) {
309 return Status::Incomplete(
310 "Decode dumped unit meta dump_unit_checksum failed");
312 if (!GetFixed64(&encoded_slice
, &(unit_meta
->dump_unit_size
))) {
313 return Status::Incomplete(
314 "Decode dumped unit meta dump_unit_size failed");
319 // Deserialize the dump_unit from a string.
320 static Status
DecodeDumpUnit(const std::string
& encoded_data
,
321 DumpUnit
* dump_unit
) {
322 assert(dump_unit
!= nullptr);
323 Slice encoded_slice
= Slice(encoded_data
);
326 if (!GetFixed64(&encoded_slice
, &dump_unit
->timestamp
)) {
327 return Status::Incomplete("Decode dumped unit string failed");
329 // Decode the block type
330 dump_unit
->type
= static_cast<CacheDumpUnitType
>(encoded_slice
[0]);
331 encoded_slice
.remove_prefix(1);
333 if (!GetLengthPrefixedSlice(&encoded_slice
, &(dump_unit
->key
))) {
334 return Status::Incomplete("Decode dumped unit string failed");
336 // Decode the value size
338 if (!GetFixed32(&encoded_slice
, &value_len
)) {
339 return Status::Incomplete("Decode dumped unit string failed");
341 dump_unit
->value_len
= static_cast<size_t>(value_len
);
342 // Decode the value checksum
343 if (!GetFixed32(&encoded_slice
, &(dump_unit
->value_checksum
))) {
344 return Status::Incomplete("Decode dumped unit string failed");
346 // Decode the block content and copy to the memory space whose pointer
347 // will be managed by the cache finally.
349 if (!GetLengthPrefixedSlice(&encoded_slice
, &block
)) {
350 return Status::Incomplete("Decode dumped unit string failed");
352 dump_unit
->value
= (void*)block
.data();
353 assert(block
.size() == dump_unit
->value_len
);
358 } // namespace ROCKSDB_NAMESPACE
359 #endif // ROCKSDB_LITE