#include "db/table_cache.h"
#include "db/dbformat.h"
+#include "db/range_tombstone_fragmenter.h"
+#include "db/snapshot_impl.h"
#include "db/version_edit.h"
-#include "util/filename.h"
-
+#include "file/file_util.h"
+#include "file/filename.h"
+#include "file/random_access_file_reader.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/statistics.h"
+#include "table/block_based/block_based_table_reader.h"
#include "table/get_context.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
+#include "table/multiget_context.h"
#include "table/table_builder.h"
#include "table/table_reader.h"
+#include "test_util/sync_point.h"
+#include "util/cast_util.h"
#include "util/coding.h"
-#include "util/file_reader_writer.h"
#include "util/stop_watch.h"
-#include "util/sync_point.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
namespace {
cache->Release(h);
}
-static void DeleteTableReader(void* arg1, void* /*arg2*/) {
- TableReader* table_reader = reinterpret_cast<TableReader*>(arg1);
- delete table_reader;
-}
-
static Slice GetSliceForFileNumber(const uint64_t* file_number) {
return Slice(reinterpret_cast<const char*>(file_number),
sizeof(*file_number));
} // namespace
+const int kLoadConcurency = 128;
+
TableCache::TableCache(const ImmutableCFOptions& ioptions,
- const EnvOptions& env_options, Cache* const cache)
+ const FileOptions& file_options, Cache* const cache,
+ BlockCacheTracer* const block_cache_tracer,
+ const std::shared_ptr<IOTracer>& io_tracer)
: ioptions_(ioptions),
- env_options_(env_options),
+ file_options_(file_options),
cache_(cache),
- immortal_tables_(false) {
+ immortal_tables_(false),
+ block_cache_tracer_(block_cache_tracer),
+ loader_mutex_(kLoadConcurency, GetSliceNPHash64),
+ io_tracer_(io_tracer) {
if (ioptions_.row_cache) {
// If the same cache is shared by multiple instances, we need to
// disambiguate its entries.
}
Status TableCache::GetTableReader(
- const EnvOptions& env_options,
+ const ReadOptions& ro, const FileOptions& file_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
- bool sequential_mode, size_t readahead, bool record_read_stats,
- HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
+ bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
+ std::unique_ptr<TableReader>* table_reader,
const SliceTransform* prefix_extractor, bool skip_filters, int level,
- bool prefetch_index_and_filter_in_cache, bool for_compaction) {
+ bool prefetch_index_and_filter_in_cache,
+ size_t max_file_size_for_l0_meta_pin) {
std::string fname =
TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId());
- unique_ptr<RandomAccessFile> file;
- Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options);
-
- RecordTick(ioptions_.statistics, NO_FILE_OPENS);
+ std::unique_ptr<FSRandomAccessFile> file;
+ FileOptions fopts = file_options;
+ Status s = PrepareIOFromReadOptions(ro, ioptions_.env, fopts.io_options);
if (s.ok()) {
- if (readahead > 0 && !env_options.use_mmap_reads) {
- // Not compatible with mmap files since ReadaheadRandomAccessFile requires
- // its wrapped file's Read() to copy data into the provided scratch
- // buffer, which mmap files don't use.
- // TODO(ajkr): try madvise for mmap files in place of buffered readahead.
- file = NewReadaheadRandomAccessFile(std::move(file), readahead);
+ s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr);
+ }
+ RecordTick(ioptions_.statistics, NO_FILE_OPENS);
+ if (s.IsPathNotFound()) {
+ fname = Rocks2LevelTableFileName(fname);
+ s = PrepareIOFromReadOptions(ro, ioptions_.env, fopts.io_options);
+ if (s.ok()) {
+ s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file,
+ nullptr);
}
+ RecordTick(ioptions_.statistics, NO_FILE_OPENS);
+ }
+
+ if (s.ok()) {
if (!sequential_mode && ioptions_.advise_random_on_open) {
- file->Hint(RandomAccessFile::RANDOM);
+ file->Hint(FSRandomAccessFile::kRandom);
}
StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
- std::move(file), fname, ioptions_.env,
+ std::move(file), fname, ioptions_.env, io_tracer_,
record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
- file_read_hist, ioptions_.rate_limiter, for_compaction));
+ file_read_hist, ioptions_.rate_limiter, ioptions_.listeners));
s = ioptions_.table_factory->NewTableReader(
- TableReaderOptions(ioptions_, prefix_extractor, env_options,
+ ro,
+ TableReaderOptions(ioptions_, prefix_extractor, file_options,
internal_comparator, skip_filters, immortal_tables_,
- level, fd.largest_seqno),
+ false /* force_direct_prefetch */, level,
+ fd.largest_seqno, block_cache_tracer_,
+ max_file_size_for_l0_meta_pin),
std::move(file_reader), fd.GetFileSize(), table_reader,
prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0");
cache_->Erase(key);
}
-Status TableCache::FindTable(const EnvOptions& env_options,
+Status TableCache::FindTable(const ReadOptions& ro,
+ const FileOptions& file_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, Cache::Handle** handle,
const SliceTransform* prefix_extractor,
const bool no_io, bool record_read_stats,
HistogramImpl* file_read_hist, bool skip_filters,
- int level,
- bool prefetch_index_and_filter_in_cache) {
- PERF_TIMER_GUARD(find_table_nanos);
- Status s;
+ int level, bool prefetch_index_and_filter_in_cache,
+ size_t max_file_size_for_l0_meta_pin) {
+ PERF_TIMER_GUARD_WITH_ENV(find_table_nanos, ioptions_.env);
uint64_t number = fd.GetNumber();
Slice key = GetSliceForFileNumber(&number);
*handle = cache_->Lookup(key);
const_cast<bool*>(&no_io));
if (*handle == nullptr) {
- if (no_io) { // Don't do IO and return a not-found status
+ if (no_io) {
return Status::Incomplete("Table not found in table_cache, no_io is set");
}
- unique_ptr<TableReader> table_reader;
- s = GetTableReader(env_options, internal_comparator, fd,
- false /* sequential mode */, 0 /* readahead */,
- record_read_stats, file_read_hist, &table_reader,
- prefix_extractor, skip_filters, level,
- prefetch_index_and_filter_in_cache);
+ MutexLock load_lock(loader_mutex_.get(key));
+ // We check the cache again under loading mutex
+ *handle = cache_->Lookup(key);
+ if (*handle != nullptr) {
+ return Status::OK();
+ }
+
+ std::unique_ptr<TableReader> table_reader;
+ Status s = GetTableReader(
+ ro, file_options, internal_comparator, fd, false /* sequential mode */,
+ record_read_stats, file_read_hist, &table_reader, prefix_extractor,
+ skip_filters, level, prefetch_index_and_filter_in_cache,
+ max_file_size_for_l0_meta_pin);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
table_reader.release();
}
}
+ return s;
}
- return s;
+ return Status::OK();
}
InternalIterator* TableCache::NewIterator(
- const ReadOptions& options, const EnvOptions& env_options,
+ const ReadOptions& options, const FileOptions& file_options,
const InternalKeyComparator& icomparator, const FileMetaData& file_meta,
RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor,
TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
- bool for_compaction, Arena* arena, bool skip_filters, int level) {
+ TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
+ size_t max_file_size_for_l0_meta_pin,
+ const InternalKey* smallest_compaction_key,
+ const InternalKey* largest_compaction_key, bool allow_unprepared_value) {
PERF_TIMER_GUARD(new_table_iterator_nanos);
Status s;
- bool create_new_table_reader = false;
TableReader* table_reader = nullptr;
Cache::Handle* handle = nullptr;
if (table_reader_ptr != nullptr) {
*table_reader_ptr = nullptr;
}
- size_t readahead = 0;
- if (for_compaction) {
-#ifndef NDEBUG
- bool use_direct_reads_for_compaction = env_options.use_direct_reads;
- TEST_SYNC_POINT_CALLBACK("TableCache::NewIterator:for_compaction",
- &use_direct_reads_for_compaction);
-#endif // !NDEBUG
- if (ioptions_.new_table_reader_for_compaction_inputs) {
- // get compaction_readahead_size from env_options allows us to set the
- // value dynamically
- readahead = env_options.compaction_readahead_size;
- create_new_table_reader = true;
- }
- } else {
- readahead = options.readahead_size;
- create_new_table_reader = readahead > 0;
- }
-
+ bool for_compaction = caller == TableReaderCaller::kCompaction;
auto& fd = file_meta.fd;
- if (create_new_table_reader) {
- unique_ptr<TableReader> table_reader_unique_ptr;
- s = GetTableReader(
- env_options, icomparator, fd, true /* sequential_mode */, readahead,
- !for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
- prefix_extractor, false /* skip_filters */, level,
- true /* prefetch_index_and_filter_in_cache */, for_compaction);
+ table_reader = fd.table_reader;
+ if (table_reader == nullptr) {
+ s = FindTable(
+ options, file_options, icomparator, fd, &handle, prefix_extractor,
+ options.read_tier == kBlockCacheTier /* no_io */,
+ !for_compaction /* record_read_stats */, file_read_hist, skip_filters,
+ level, true /* prefetch_index_and_filter_in_cache */,
+ max_file_size_for_l0_meta_pin);
if (s.ok()) {
- table_reader = table_reader_unique_ptr.release();
- }
- } else {
- table_reader = fd.table_reader;
- if (table_reader == nullptr) {
- s = FindTable(env_options, icomparator, fd, &handle, prefix_extractor,
- options.read_tier == kBlockCacheTier /* no_io */,
- !for_compaction /* record read_stats */, file_read_hist,
- skip_filters, level);
- if (s.ok()) {
- table_reader = GetTableReaderFromHandle(handle);
- }
+ table_reader = GetTableReaderFromHandle(handle);
}
}
InternalIterator* result = nullptr;
result = NewEmptyInternalIterator<Slice>(arena);
} else {
result = table_reader->NewIterator(options, prefix_extractor, arena,
- skip_filters, for_compaction);
+ skip_filters, caller,
+ file_options.compaction_readahead_size,
+ allow_unprepared_value);
}
- if (create_new_table_reader) {
- assert(handle == nullptr);
- result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr);
- } else if (handle != nullptr) {
+ if (handle != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
handle = nullptr; // prevent from releasing below
}
}
if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions) {
if (range_del_agg->AddFile(fd.GetNumber())) {
- std::unique_ptr<InternalIterator> range_del_iter(
- table_reader->NewRangeTombstoneIterator(options));
+ std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
+ static_cast<FragmentedRangeTombstoneIterator*>(
+ table_reader->NewRangeTombstoneIterator(options)));
if (range_del_iter != nullptr) {
s = range_del_iter->status();
}
if (s.ok()) {
- s = range_del_agg->AddTombstones(
- std::move(range_del_iter),
- &file_meta.smallest,
- &file_meta.largest);
+ const InternalKey* smallest = &file_meta.smallest;
+ const InternalKey* largest = &file_meta.largest;
+ if (smallest_compaction_key != nullptr) {
+ smallest = smallest_compaction_key;
+ }
+ if (largest_compaction_key != nullptr) {
+ largest = largest_compaction_key;
+ }
+ range_del_agg->AddTombstones(std::move(range_del_iter), smallest,
+ largest);
}
}
}
return result;
}
+Status TableCache::GetRangeTombstoneIterator(
+ const ReadOptions& options,
+ const InternalKeyComparator& internal_comparator,
+ const FileMetaData& file_meta,
+ std::unique_ptr<FragmentedRangeTombstoneIterator>* out_iter) {
+ const FileDescriptor& fd = file_meta.fd;
+ Status s;
+ TableReader* t = fd.table_reader;
+ Cache::Handle* handle = nullptr;
+ if (t == nullptr) {
+ s = FindTable(options, file_options_, internal_comparator, fd, &handle);
+ if (s.ok()) {
+ t = GetTableReaderFromHandle(handle);
+ }
+ }
+ if (s.ok()) {
+ out_iter->reset(t->NewRangeTombstoneIterator(options));
+ assert(out_iter);
+ }
+ return s;
+}
+
+#ifndef ROCKSDB_LITE
+void TableCache::CreateRowCacheKeyPrefix(const ReadOptions& options,
+ const FileDescriptor& fd,
+ const Slice& internal_key,
+ GetContext* get_context,
+ IterKey& row_cache_key) {
+ uint64_t fd_number = fd.GetNumber();
+ // We use the user key as cache key instead of the internal key,
+ // otherwise the whole cache would be invalidated every time the
+ // sequence key increases. However, to support caching snapshot
+ // reads, we append the sequence number (incremented by 1 to
+ // distinguish from 0) only in this case.
+ // If the snapshot is larger than the largest seqno in the file,
+ // all data should be exposed to the snapshot, so we treat it
+ // the same as there is no snapshot. The exception is that if
+ // a seq-checking callback is registered, some internal keys
+ // may still be filtered out.
+ uint64_t seq_no = 0;
+ // Maybe we can include the whole file ifsnapshot == fd.largest_seqno.
+ if (options.snapshot != nullptr &&
+ (get_context->has_callback() ||
+ static_cast_with_check<const SnapshotImpl>(options.snapshot)
+ ->GetSequenceNumber() <= fd.largest_seqno)) {
+ // We should consider to use options.snapshot->GetSequenceNumber()
+ // instead of GetInternalKeySeqno(k), which will make the code
+ // easier to understand.
+ seq_no = 1 + GetInternalKeySeqno(internal_key);
+ }
+
+ // Compute row cache key.
+ row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
+ row_cache_id_.size());
+ AppendVarint64(&row_cache_key, fd_number);
+ AppendVarint64(&row_cache_key, seq_no);
+}
+
+bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
+ size_t prefix_size, GetContext* get_context) {
+ bool found = false;
+
+ row_cache_key.TrimAppend(prefix_size, user_key.data(), user_key.size());
+ if (auto row_handle =
+ ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
+ // Cleanable routine to release the cache entry
+ Cleanable value_pinner;
+ auto release_cache_entry_func = [](void* cache_to_clean,
+ void* cache_handle) {
+ ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
+ };
+ auto found_row_cache_entry =
+ static_cast<const std::string*>(ioptions_.row_cache->Value(row_handle));
+ // If it comes here value is located on the cache.
+ // found_row_cache_entry points to the value on cache,
+ // and value_pinner has cleanup procedure for the cached entry.
+ // After replayGetContextLog() returns, get_context.pinnable_slice_
+ // will point to cache entry buffer (or a copy based on that) and
+ // cleanup routine under value_pinner will be delegated to
+ // get_context.pinnable_slice_. Cache entry is released when
+ // get_context.pinnable_slice_ is reset.
+ value_pinner.RegisterCleanup(release_cache_entry_func,
+ ioptions_.row_cache.get(), row_handle);
+ replayGetContextLog(*found_row_cache_entry, user_key, get_context,
+ &value_pinner);
+ RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
+ found = true;
+ } else {
+ RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
+ }
+ return found;
+}
+#endif // ROCKSDB_LITE
+
Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k,
GetContext* get_context,
const SliceTransform* prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters,
- int level) {
+ int level, size_t max_file_size_for_l0_meta_pin) {
auto& fd = file_meta.fd;
std::string* row_cache_entry = nullptr;
bool done = false;
// Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
- uint64_t fd_number = fd.GetNumber();
auto user_key = ExtractUserKey(k);
- // We use the user key as cache key instead of the internal key,
- // otherwise the whole cache would be invalidated every time the
- // sequence key increases. However, to support caching snapshot
- // reads, we append the sequence number (incremented by 1 to
- // distinguish from 0) only in this case.
- uint64_t seq_no =
- options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);
-
- // Compute row cache key.
- row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
- row_cache_id_.size());
- AppendVarint64(&row_cache_key, fd_number);
- AppendVarint64(&row_cache_key, seq_no);
- row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
- user_key.size());
-
- if (auto row_handle =
- ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
- // Cleanable routine to release the cache entry
- Cleanable value_pinner;
- auto release_cache_entry_func = [](void* cache_to_clean,
- void* cache_handle) {
- ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
- };
- auto found_row_cache_entry = static_cast<const std::string*>(
- ioptions_.row_cache->Value(row_handle));
- // If it comes here value is located on the cache.
- // found_row_cache_entry points to the value on cache,
- // and value_pinner has cleanup procedure for the cached entry.
- // After replayGetContextLog() returns, get_context.pinnable_slice_
- // will point to cache entry buffer (or a copy based on that) and
- // cleanup routine under value_pinner will be delegated to
- // get_context.pinnable_slice_. Cache entry is released when
- // get_context.pinnable_slice_ is reset.
- value_pinner.RegisterCleanup(release_cache_entry_func,
- ioptions_.row_cache.get(), row_handle);
- replayGetContextLog(*found_row_cache_entry, user_key, get_context,
- &value_pinner);
- RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
- done = true;
- } else {
- // Not found, setting up the replay log.
- RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
+ CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key);
+ done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(),
+ get_context);
+ if (!done) {
row_cache_entry = &row_cache_entry_buffer;
}
}
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
- if (!done && s.ok()) {
+ if (!done) {
+ assert(s.ok());
if (t == nullptr) {
- s = FindTable(
- env_options_, internal_comparator, fd, &handle, prefix_extractor,
- options.read_tier == kBlockCacheTier /* no_io */,
- true /* record_read_stats */, file_read_hist, skip_filters, level);
+ s = FindTable(options, file_options_, internal_comparator, fd, &handle,
+ prefix_extractor,
+ options.read_tier == kBlockCacheTier /* no_io */,
+ true /* record_read_stats */, file_read_hist, skip_filters,
+ level, true /* prefetch_index_and_filter_in_cache */,
+ max_file_size_for_l0_meta_pin);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
}
- if (s.ok() && get_context->range_del_agg() != nullptr &&
+ SequenceNumber* max_covering_tombstone_seq =
+ get_context->max_covering_tombstone_seq();
+ if (s.ok() && max_covering_tombstone_seq != nullptr &&
!options.ignore_range_deletions) {
- std::unique_ptr<InternalIterator> range_del_iter(
+ std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
t->NewRangeTombstoneIterator(options));
if (range_del_iter != nullptr) {
- s = range_del_iter->status();
- }
- if (s.ok()) {
- s = get_context->range_del_agg()->AddTombstones(
- std::move(range_del_iter),
- &file_meta.smallest,
- &file_meta.largest);
+ *max_covering_tombstone_seq = std::max(
+ *max_covering_tombstone_seq,
+ range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)));
}
}
if (s.ok()) {
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
- ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
- &DeleteEntry<std::string>);
+ // If row cache is full, it's OK to continue.
+ ioptions_.row_cache
+ ->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
+ &DeleteEntry<std::string>)
+ .PermitUncheckedError();
+ }
+#endif // ROCKSDB_LITE
+
+ if (handle != nullptr) {
+ ReleaseHandle(handle);
+ }
+ return s;
+}
+
+// Batched version of TableCache::MultiGet.
+Status TableCache::MultiGet(const ReadOptions& options,
+ const InternalKeyComparator& internal_comparator,
+ const FileMetaData& file_meta,
+ const MultiGetContext::Range* mget_range,
+ const SliceTransform* prefix_extractor,
+ HistogramImpl* file_read_hist, bool skip_filters,
+ int level) {
+ auto& fd = file_meta.fd;
+ Status s;
+ TableReader* t = fd.table_reader;
+ Cache::Handle* handle = nullptr;
+ MultiGetRange table_range(*mget_range, mget_range->begin(),
+ mget_range->end());
+#ifndef ROCKSDB_LITE
+ autovector<std::string, MultiGetContext::MAX_BATCH_SIZE> row_cache_entries;
+ IterKey row_cache_key;
+ size_t row_cache_key_prefix_size = 0;
+ KeyContext& first_key = *table_range.begin();
+ bool lookup_row_cache =
+ ioptions_.row_cache && !first_key.get_context->NeedToReadSequence();
+
+ // Check row cache if enabled. Since row cache does not currently store
+ // sequence numbers, we cannot use it if we need to fetch the sequence.
+ if (lookup_row_cache) {
+ GetContext* first_context = first_key.get_context;
+ CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context,
+ row_cache_key);
+ row_cache_key_prefix_size = row_cache_key.Size();
+
+ for (auto miter = table_range.begin(); miter != table_range.end();
+ ++miter) {
+ const Slice& user_key = miter->ukey_with_ts;
+
+ GetContext* get_context = miter->get_context;
+
+ if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
+ get_context)) {
+ table_range.SkipKey(miter);
+ } else {
+ row_cache_entries.emplace_back();
+ get_context->SetReplayLog(&(row_cache_entries.back()));
+ }
+ }
+ }
+#endif // ROCKSDB_LITE
+
+ // Check that table_range is not empty. Its possible all keys may have been
+ // found in the row cache and thus the range may now be empty
+ if (s.ok() && !table_range.empty()) {
+ if (t == nullptr) {
+ s = FindTable(
+ options, file_options_, internal_comparator, fd, &handle,
+ prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */,
+ true /* record_read_stats */, file_read_hist, skip_filters, level);
+ TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
+ if (s.ok()) {
+ t = GetTableReaderFromHandle(handle);
+ assert(t);
+ }
+ }
+ if (s.ok() && !options.ignore_range_deletions) {
+ std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
+ t->NewRangeTombstoneIterator(options));
+ if (range_del_iter != nullptr) {
+ for (auto iter = table_range.begin(); iter != table_range.end();
+ ++iter) {
+ SequenceNumber* max_covering_tombstone_seq =
+ iter->get_context->max_covering_tombstone_seq();
+ *max_covering_tombstone_seq = std::max(
+ *max_covering_tombstone_seq,
+ range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts));
+ }
+ }
+ }
+ if (s.ok()) {
+ t->MultiGet(options, &table_range, prefix_extractor, skip_filters);
+ } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
+ for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
+ Status* status = iter->s;
+ if (status->IsIncomplete()) {
+ // Couldn't find Table in cache but treat as kFound if no_io set
+ iter->get_context->MarkKeyMayExist();
+ s = Status::OK();
+ }
+ }
+ }
+ }
+
+#ifndef ROCKSDB_LITE
+ if (lookup_row_cache) {
+ size_t row_idx = 0;
+
+ for (auto miter = table_range.begin(); miter != table_range.end();
+ ++miter) {
+ std::string& row_cache_entry = row_cache_entries[row_idx++];
+ const Slice& user_key = miter->ukey_with_ts;
+ ;
+ GetContext* get_context = miter->get_context;
+
+ get_context->SetReplayLog(nullptr);
+ // Compute row cache key.
+ row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(),
+ user_key.size());
+ // Put the replay log in row cache only if something was found.
+ if (s.ok() && !row_cache_entry.empty()) {
+ size_t charge =
+ row_cache_key.Size() + row_cache_entry.size() + sizeof(std::string);
+ void* row_ptr = new std::string(std::move(row_cache_entry));
+ // If row cache is full, it's OK.
+ ioptions_.row_cache
+ ->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
+ &DeleteEntry<std::string>)
+ .PermitUncheckedError();
+ }
+ }
}
#endif // ROCKSDB_LITE
}
Status TableCache::GetTableProperties(
- const EnvOptions& env_options,
+ const FileOptions& file_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
std::shared_ptr<const TableProperties>* properties,
const SliceTransform* prefix_extractor, bool no_io) {
- Status s;
auto table_reader = fd.table_reader;
// table already been pre-loaded?
if (table_reader) {
*properties = table_reader->GetTableProperties();
- return s;
+ return Status::OK();
}
Cache::Handle* table_handle = nullptr;
- s = FindTable(env_options, internal_comparator, fd, &table_handle,
- prefix_extractor, no_io);
+ Status s = FindTable(ReadOptions(), file_options, internal_comparator, fd,
+ &table_handle, prefix_extractor, no_io);
if (!s.ok()) {
return s;
}
}
size_t TableCache::GetMemoryUsageByTableReader(
- const EnvOptions& env_options,
+ const FileOptions& file_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
const SliceTransform* prefix_extractor) {
- Status s;
auto table_reader = fd.table_reader;
// table already been pre-loaded?
if (table_reader) {
}
Cache::Handle* table_handle = nullptr;
- s = FindTable(env_options, internal_comparator, fd, &table_handle,
- prefix_extractor, true);
+ Status s = FindTable(ReadOptions(), file_options, internal_comparator, fd,
+ &table_handle, prefix_extractor, true);
if (!s.ok()) {
return 0;
}
cache->Erase(GetSliceForFileNumber(&file_number));
}
-} // namespace rocksdb
+uint64_t TableCache::ApproximateOffsetOf(
+ const Slice& key, const FileDescriptor& fd, TableReaderCaller caller,
+ const InternalKeyComparator& internal_comparator,
+ const SliceTransform* prefix_extractor) {
+ uint64_t result = 0;
+ TableReader* table_reader = fd.table_reader;
+ Cache::Handle* table_handle = nullptr;
+ if (table_reader == nullptr) {
+ const bool for_compaction = (caller == TableReaderCaller::kCompaction);
+ Status s = FindTable(ReadOptions(), file_options_, internal_comparator, fd,
+ &table_handle, prefix_extractor, false /* no_io */,
+ !for_compaction /* record_read_stats */);
+ if (s.ok()) {
+ table_reader = GetTableReaderFromHandle(table_handle);
+ }
+ }
+
+ if (table_reader != nullptr) {
+ result = table_reader->ApproximateOffsetOf(key, caller);
+ }
+ if (table_handle != nullptr) {
+ ReleaseHandle(table_handle);
+ }
+
+ return result;
+}
+
+uint64_t TableCache::ApproximateSize(
+ const Slice& start, const Slice& end, const FileDescriptor& fd,
+ TableReaderCaller caller, const InternalKeyComparator& internal_comparator,
+ const SliceTransform* prefix_extractor) {
+ uint64_t result = 0;
+ TableReader* table_reader = fd.table_reader;
+ Cache::Handle* table_handle = nullptr;
+ if (table_reader == nullptr) {
+ const bool for_compaction = (caller == TableReaderCaller::kCompaction);
+ Status s = FindTable(ReadOptions(), file_options_, internal_comparator, fd,
+ &table_handle, prefix_extractor, false /* no_io */,
+ !for_compaction /* record_read_stats */);
+ if (s.ok()) {
+ table_reader = GetTableReaderFromHandle(table_handle);
+ }
+ }
+
+ if (table_reader != nullptr) {
+ result = table_reader->ApproximateSize(start, end, caller);
+ }
+ if (table_handle != nullptr) {
+ ReleaseHandle(table_handle);
+ }
+
+ return result;
+}
+} // namespace ROCKSDB_NAMESPACE