1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/table_cache.h"
12 #include "db/dbformat.h"
13 #include "db/version_edit.h"
14 #include "util/filename.h"
16 #include "monitoring/perf_context_imp.h"
17 #include "rocksdb/statistics.h"
18 #include "table/get_context.h"
19 #include "table/internal_iterator.h"
20 #include "table/iterator_wrapper.h"
21 #include "table/table_builder.h"
22 #include "table/table_reader.h"
23 #include "util/coding.h"
24 #include "util/file_reader_writer.h"
25 #include "util/stop_watch.h"
26 #include "util/sync_point.h"
33 static void DeleteEntry(const Slice
& key
, void* value
) {
34 T
* typed_value
= reinterpret_cast<T
*>(value
);
38 static void UnrefEntry(void* arg1
, void* arg2
) {
39 Cache
* cache
= reinterpret_cast<Cache
*>(arg1
);
40 Cache::Handle
* h
= reinterpret_cast<Cache::Handle
*>(arg2
);
44 static void DeleteTableReader(void* arg1
, void* arg2
) {
45 TableReader
* table_reader
= reinterpret_cast<TableReader
*>(arg1
);
49 static Slice
GetSliceForFileNumber(const uint64_t* file_number
) {
50 return Slice(reinterpret_cast<const char*>(file_number
),
51 sizeof(*file_number
));
56 void AppendVarint64(IterKey
* key
, uint64_t v
) {
58 auto ptr
= EncodeVarint64(buf
, v
);
59 key
->TrimAppend(key
->Size(), buf
, ptr
- buf
);
62 #endif // ROCKSDB_LITE
66 TableCache::TableCache(const ImmutableCFOptions
& ioptions
,
67 const EnvOptions
& env_options
, Cache
* const cache
)
68 : ioptions_(ioptions
), env_options_(env_options
), cache_(cache
) {
69 if (ioptions_
.row_cache
) {
70 // If the same cache is shared by multiple instances, we need to
71 // disambiguate its entries.
72 PutVarint64(&row_cache_id_
, ioptions_
.row_cache
->NewId());
76 TableCache::~TableCache() {
79 TableReader
* TableCache::GetTableReaderFromHandle(Cache::Handle
* handle
) {
80 return reinterpret_cast<TableReader
*>(cache_
->Value(handle
));
83 void TableCache::ReleaseHandle(Cache::Handle
* handle
) {
84 cache_
->Release(handle
);
87 Status
TableCache::GetTableReader(
88 const EnvOptions
& env_options
,
89 const InternalKeyComparator
& internal_comparator
, const FileDescriptor
& fd
,
90 bool sequential_mode
, size_t readahead
, bool record_read_stats
,
91 HistogramImpl
* file_read_hist
, unique_ptr
<TableReader
>* table_reader
,
92 bool skip_filters
, int level
, bool prefetch_index_and_filter_in_cache
) {
94 TableFileName(ioptions_
.db_paths
, fd
.GetNumber(), fd
.GetPathId());
95 unique_ptr
<RandomAccessFile
> file
;
96 Status s
= ioptions_
.env
->NewRandomAccessFile(fname
, &file
, env_options
);
98 RecordTick(ioptions_
.statistics
, NO_FILE_OPENS
);
101 file
= NewReadaheadRandomAccessFile(std::move(file
), readahead
);
103 if (!sequential_mode
&& ioptions_
.advise_random_on_open
) {
104 file
->Hint(RandomAccessFile::RANDOM
);
106 StopWatch
sw(ioptions_
.env
, ioptions_
.statistics
, TABLE_OPEN_IO_MICROS
);
107 std::unique_ptr
<RandomAccessFileReader
> file_reader(
108 new RandomAccessFileReader(std::move(file
), ioptions_
.env
,
109 ioptions_
.statistics
, record_read_stats
,
111 s
= ioptions_
.table_factory
->NewTableReader(
112 TableReaderOptions(ioptions_
, env_options
, internal_comparator
,
113 skip_filters
, level
),
114 std::move(file_reader
), fd
.GetFileSize(), table_reader
,
115 prefetch_index_and_filter_in_cache
);
116 TEST_SYNC_POINT("TableCache::GetTableReader:0");
121 void TableCache::EraseHandle(const FileDescriptor
& fd
, Cache::Handle
* handle
) {
122 ReleaseHandle(handle
);
123 uint64_t number
= fd
.GetNumber();
124 Slice key
= GetSliceForFileNumber(&number
);
128 Status
TableCache::FindTable(const EnvOptions
& env_options
,
129 const InternalKeyComparator
& internal_comparator
,
130 const FileDescriptor
& fd
, Cache::Handle
** handle
,
131 const bool no_io
, bool record_read_stats
,
132 HistogramImpl
* file_read_hist
, bool skip_filters
,
134 bool prefetch_index_and_filter_in_cache
) {
135 PERF_TIMER_GUARD(find_table_nanos
);
137 uint64_t number
= fd
.GetNumber();
138 Slice key
= GetSliceForFileNumber(&number
);
139 *handle
= cache_
->Lookup(key
);
140 TEST_SYNC_POINT_CALLBACK("TableCache::FindTable:0",
141 const_cast<bool*>(&no_io
));
143 if (*handle
== nullptr) {
144 if (no_io
) { // Don't do IO and return a not-found status
145 return Status::Incomplete("Table not found in table_cache, no_io is set");
147 unique_ptr
<TableReader
> table_reader
;
148 s
= GetTableReader(env_options
, internal_comparator
, fd
,
149 false /* sequential mode */, 0 /* readahead */,
150 record_read_stats
, file_read_hist
, &table_reader
,
151 skip_filters
, level
, prefetch_index_and_filter_in_cache
);
153 assert(table_reader
== nullptr);
154 RecordTick(ioptions_
.statistics
, NO_FILE_ERRORS
);
155 // We do not cache error results so that if the error is transient,
156 // or somebody repairs the file, we recover automatically.
158 s
= cache_
->Insert(key
, table_reader
.get(), 1, &DeleteEntry
<TableReader
>,
161 // Release ownership of table reader.
162 table_reader
.release();
169 InternalIterator
* TableCache::NewIterator(
170 const ReadOptions
& options
, const EnvOptions
& env_options
,
171 const InternalKeyComparator
& icomparator
, const FileDescriptor
& fd
,
172 RangeDelAggregator
* range_del_agg
, TableReader
** table_reader_ptr
,
173 HistogramImpl
* file_read_hist
, bool for_compaction
, Arena
* arena
,
174 bool skip_filters
, int level
) {
175 PERF_TIMER_GUARD(new_table_iterator_nanos
);
178 bool create_new_table_reader
= false;
179 TableReader
* table_reader
= nullptr;
180 Cache::Handle
* handle
= nullptr;
182 if (table_reader_ptr
!= nullptr) {
183 *table_reader_ptr
= nullptr;
185 size_t readahead
= 0;
186 if (for_compaction
) {
188 bool use_direct_reads_for_compaction
= env_options
.use_direct_reads
;
189 TEST_SYNC_POINT_CALLBACK("TableCache::NewIterator:for_compaction",
190 &use_direct_reads_for_compaction
);
192 if (ioptions_
.new_table_reader_for_compaction_inputs
) {
193 readahead
= ioptions_
.compaction_readahead_size
;
194 create_new_table_reader
= true;
197 readahead
= options
.readahead_size
;
198 create_new_table_reader
= readahead
> 0;
201 if (create_new_table_reader
) {
202 unique_ptr
<TableReader
> table_reader_unique_ptr
;
204 env_options
, icomparator
, fd
, true /* sequential_mode */, readahead
,
205 !for_compaction
/* record stats */, nullptr, &table_reader_unique_ptr
,
206 false /* skip_filters */, level
);
208 table_reader
= table_reader_unique_ptr
.release();
211 table_reader
= fd
.table_reader
;
212 if (table_reader
== nullptr) {
213 s
= FindTable(env_options
, icomparator
, fd
, &handle
,
214 options
.read_tier
== kBlockCacheTier
/* no_io */,
215 !for_compaction
/* record read_stats */, file_read_hist
,
216 skip_filters
, level
);
218 table_reader
= GetTableReaderFromHandle(handle
);
223 InternalIterator
* result
= nullptr;
225 result
= table_reader
->NewIterator(options
, arena
, skip_filters
);
226 if (create_new_table_reader
) {
227 assert(handle
== nullptr);
228 result
->RegisterCleanup(&DeleteTableReader
, table_reader
, nullptr);
229 } else if (handle
!= nullptr) {
230 result
->RegisterCleanup(&UnrefEntry
, cache_
, handle
);
231 handle
= nullptr; // prevent from releasing below
234 if (for_compaction
) {
235 table_reader
->SetupForCompaction();
237 if (table_reader_ptr
!= nullptr) {
238 *table_reader_ptr
= table_reader
;
241 if (s
.ok() && range_del_agg
!= nullptr && !options
.ignore_range_deletions
) {
242 std::unique_ptr
<InternalIterator
> range_del_iter(
243 table_reader
->NewRangeTombstoneIterator(options
));
244 if (range_del_iter
!= nullptr) {
245 s
= range_del_iter
->status();
248 s
= range_del_agg
->AddTombstones(std::move(range_del_iter
));
252 if (handle
!= nullptr) {
253 ReleaseHandle(handle
);
256 assert(result
== nullptr);
257 result
= NewErrorInternalIterator(s
, arena
);
262 Status
TableCache::Get(const ReadOptions
& options
,
263 const InternalKeyComparator
& internal_comparator
,
264 const FileDescriptor
& fd
, const Slice
& k
,
265 GetContext
* get_context
, HistogramImpl
* file_read_hist
,
266 bool skip_filters
, int level
) {
267 std::string
* row_cache_entry
= nullptr;
270 IterKey row_cache_key
;
271 std::string row_cache_entry_buffer
;
272 // Check row cache if enabled. Since row cache does not currently store
273 // sequence numbers, we cannot use it if we need to fetch the sequence.
274 if (ioptions_
.row_cache
&& !get_context
->NeedToReadSequence()) {
275 uint64_t fd_number
= fd
.GetNumber();
276 auto user_key
= ExtractUserKey(k
);
277 // We use the user key as cache key instead of the internal key,
278 // otherwise the whole cache would be invalidated every time the
279 // sequence key increases. However, to support caching snapshot
280 // reads, we append the sequence number (incremented by 1 to
281 // distinguish from 0) only in this case.
283 options
.snapshot
== nullptr ? 0 : 1 + GetInternalKeySeqno(k
);
285 // Compute row cache key.
286 row_cache_key
.TrimAppend(row_cache_key
.Size(), row_cache_id_
.data(),
287 row_cache_id_
.size());
288 AppendVarint64(&row_cache_key
, fd_number
);
289 AppendVarint64(&row_cache_key
, seq_no
);
290 row_cache_key
.TrimAppend(row_cache_key
.Size(), user_key
.data(),
293 if (auto row_handle
=
294 ioptions_
.row_cache
->Lookup(row_cache_key
.GetUserKey())) {
295 auto found_row_cache_entry
= static_cast<const std::string
*>(
296 ioptions_
.row_cache
->Value(row_handle
));
297 replayGetContextLog(*found_row_cache_entry
, user_key
, get_context
);
298 ioptions_
.row_cache
->Release(row_handle
);
299 RecordTick(ioptions_
.statistics
, ROW_CACHE_HIT
);
302 // Not found, setting up the replay log.
303 RecordTick(ioptions_
.statistics
, ROW_CACHE_MISS
);
304 row_cache_entry
= &row_cache_entry_buffer
;
307 #endif // ROCKSDB_LITE
309 TableReader
* t
= fd
.table_reader
;
310 Cache::Handle
* handle
= nullptr;
311 if (!done
&& s
.ok()) {
313 s
= FindTable(env_options_
, internal_comparator
, fd
, &handle
,
314 options
.read_tier
== kBlockCacheTier
/* no_io */,
315 true /* record_read_stats */, file_read_hist
, skip_filters
,
318 t
= GetTableReaderFromHandle(handle
);
321 if (s
.ok() && get_context
->range_del_agg() != nullptr &&
322 !options
.ignore_range_deletions
) {
323 std::unique_ptr
<InternalIterator
> range_del_iter(
324 t
->NewRangeTombstoneIterator(options
));
325 if (range_del_iter
!= nullptr) {
326 s
= range_del_iter
->status();
329 s
= get_context
->range_del_agg()->AddTombstones(
330 std::move(range_del_iter
));
334 get_context
->SetReplayLog(row_cache_entry
); // nullptr if no cache.
335 s
= t
->Get(options
, k
, get_context
, skip_filters
);
336 get_context
->SetReplayLog(nullptr);
337 } else if (options
.read_tier
== kBlockCacheTier
&& s
.IsIncomplete()) {
338 // Couldn't find Table in cache but treat as kFound if no_io set
339 get_context
->MarkKeyMayExist();
346 // Put the replay log in row cache only if something was found.
347 if (!done
&& s
.ok() && row_cache_entry
&& !row_cache_entry
->empty()) {
349 row_cache_key
.Size() + row_cache_entry
->size() + sizeof(std::string
);
350 void* row_ptr
= new std::string(std::move(*row_cache_entry
));
351 ioptions_
.row_cache
->Insert(row_cache_key
.GetUserKey(), row_ptr
, charge
,
352 &DeleteEntry
<std::string
>);
354 #endif // ROCKSDB_LITE
356 if (handle
!= nullptr) {
357 ReleaseHandle(handle
);
362 Status
TableCache::GetTableProperties(
363 const EnvOptions
& env_options
,
364 const InternalKeyComparator
& internal_comparator
, const FileDescriptor
& fd
,
365 std::shared_ptr
<const TableProperties
>* properties
, bool no_io
) {
367 auto table_reader
= fd
.table_reader
;
368 // table already been pre-loaded?
370 *properties
= table_reader
->GetTableProperties();
375 Cache::Handle
* table_handle
= nullptr;
376 s
= FindTable(env_options
, internal_comparator
, fd
, &table_handle
, no_io
);
380 assert(table_handle
);
381 auto table
= GetTableReaderFromHandle(table_handle
);
382 *properties
= table
->GetTableProperties();
383 ReleaseHandle(table_handle
);
387 size_t TableCache::GetMemoryUsageByTableReader(
388 const EnvOptions
& env_options
,
389 const InternalKeyComparator
& internal_comparator
,
390 const FileDescriptor
& fd
) {
392 auto table_reader
= fd
.table_reader
;
393 // table already been pre-loaded?
395 return table_reader
->ApproximateMemoryUsage();
398 Cache::Handle
* table_handle
= nullptr;
399 s
= FindTable(env_options
, internal_comparator
, fd
, &table_handle
, true);
403 assert(table_handle
);
404 auto table
= GetTableReaderFromHandle(table_handle
);
405 auto ret
= table
->ApproximateMemoryUsage();
406 ReleaseHandle(table_handle
);
410 void TableCache::Evict(Cache
* cache
, uint64_t file_number
) {
411 cache
->Erase(GetSliceForFileNumber(&file_number
));
414 } // namespace rocksdb