#include "db/range_tombstone_fragmenter.h"
#include "db/snapshot_impl.h"
#include "db/version_edit.h"
+#include "file/file_util.h"
#include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "monitoring/perf_context_imp.h"
} // namespace
+const int kLoadConcurency = 128;
+
TableCache::TableCache(const ImmutableCFOptions& ioptions,
const FileOptions& file_options, Cache* const cache,
- BlockCacheTracer* const block_cache_tracer)
+ BlockCacheTracer* const block_cache_tracer,
+ const std::shared_ptr<IOTracer>& io_tracer)
: ioptions_(ioptions),
file_options_(file_options),
cache_(cache),
immortal_tables_(false),
- block_cache_tracer_(block_cache_tracer) {
+ block_cache_tracer_(block_cache_tracer),
+ loader_mutex_(kLoadConcurency, GetSliceNPHash64),
+ io_tracer_(io_tracer) {
if (ioptions_.row_cache) {
// If the same cache is shared by multiple instances, we need to
// disambiguate its entries.
}
Status TableCache::GetTableReader(
- const FileOptions& file_options,
+ const ReadOptions& ro, const FileOptions& file_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
std::unique_ptr<TableReader>* table_reader,
const SliceTransform* prefix_extractor, bool skip_filters, int level,
- bool prefetch_index_and_filter_in_cache) {
+ bool prefetch_index_and_filter_in_cache,
+ size_t max_file_size_for_l0_meta_pin) {
std::string fname =
TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId());
std::unique_ptr<FSRandomAccessFile> file;
- Status s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file,
- nullptr);
+ FileOptions fopts = file_options;
+ Status s = PrepareIOFromReadOptions(ro, ioptions_.env, fopts.io_options);
+ if (s.ok()) {
+ s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr);
+ }
RecordTick(ioptions_.statistics, NO_FILE_OPENS);
if (s.IsPathNotFound()) {
fname = Rocks2LevelTableFileName(fname);
- s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file, nullptr);
+ s = PrepareIOFromReadOptions(ro, ioptions_.env, fopts.io_options);
+ if (s.ok()) {
+ s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file,
+ nullptr);
+ }
RecordTick(ioptions_.statistics, NO_FILE_OPENS);
}
StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
- std::move(file), fname, ioptions_.env,
+ std::move(file), fname, ioptions_.env, io_tracer_,
record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
file_read_hist, ioptions_.rate_limiter, ioptions_.listeners));
s = ioptions_.table_factory->NewTableReader(
+ ro,
TableReaderOptions(ioptions_, prefix_extractor, file_options,
internal_comparator, skip_filters, immortal_tables_,
- level, fd.largest_seqno, block_cache_tracer_),
+ false /* force_direct_prefetch */, level,
+ fd.largest_seqno, block_cache_tracer_,
+ max_file_size_for_l0_meta_pin),
std::move(file_reader), fd.GetFileSize(), table_reader,
prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0");
cache_->Erase(key);
}
-Status TableCache::FindTable(const FileOptions& file_options,
+Status TableCache::FindTable(const ReadOptions& ro,
+ const FileOptions& file_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, Cache::Handle** handle,
const SliceTransform* prefix_extractor,
const bool no_io, bool record_read_stats,
HistogramImpl* file_read_hist, bool skip_filters,
- int level,
- bool prefetch_index_and_filter_in_cache) {
+ int level, bool prefetch_index_and_filter_in_cache,
+ size_t max_file_size_for_l0_meta_pin) {
PERF_TIMER_GUARD_WITH_ENV(find_table_nanos, ioptions_.env);
- Status s;
uint64_t number = fd.GetNumber();
Slice key = GetSliceForFileNumber(&number);
*handle = cache_->Lookup(key);
const_cast<bool*>(&no_io));
if (*handle == nullptr) {
- if (no_io) { // Don't do IO and return a not-found status
+ if (no_io) {
return Status::Incomplete("Table not found in table_cache, no_io is set");
}
+ MutexLock load_lock(loader_mutex_.get(key));
+ // We check the cache again under loading mutex
+ *handle = cache_->Lookup(key);
+ if (*handle != nullptr) {
+ return Status::OK();
+ }
+
std::unique_ptr<TableReader> table_reader;
- s = GetTableReader(file_options, internal_comparator, fd,
- false /* sequential mode */, record_read_stats,
- file_read_hist, &table_reader, prefix_extractor,
- skip_filters, level, prefetch_index_and_filter_in_cache);
+ Status s = GetTableReader(
+ ro, file_options, internal_comparator, fd, false /* sequential mode */,
+ record_read_stats, file_read_hist, &table_reader, prefix_extractor,
+ skip_filters, level, prefetch_index_and_filter_in_cache,
+ max_file_size_for_l0_meta_pin);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
table_reader.release();
}
}
+ return s;
}
- return s;
+ return Status::OK();
}
InternalIterator* TableCache::NewIterator(
RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor,
TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
+ size_t max_file_size_for_l0_meta_pin,
const InternalKey* smallest_compaction_key,
- const InternalKey* largest_compaction_key) {
+ const InternalKey* largest_compaction_key, bool allow_unprepared_value) {
PERF_TIMER_GUARD(new_table_iterator_nanos);
Status s;
auto& fd = file_meta.fd;
table_reader = fd.table_reader;
if (table_reader == nullptr) {
- s = FindTable(file_options, icomparator, fd, &handle, prefix_extractor,
- options.read_tier == kBlockCacheTier /* no_io */,
- !for_compaction /* record_read_stats */, file_read_hist,
- skip_filters, level);
+ s = FindTable(
+ options, file_options, icomparator, fd, &handle, prefix_extractor,
+ options.read_tier == kBlockCacheTier /* no_io */,
+ !for_compaction /* record_read_stats */, file_read_hist, skip_filters,
+ level, true /* prefetch_index_and_filter_in_cache */,
+ max_file_size_for_l0_meta_pin);
if (s.ok()) {
table_reader = GetTableReaderFromHandle(handle);
}
} else {
result = table_reader->NewIterator(options, prefix_extractor, arena,
skip_filters, caller,
- file_options.compaction_readahead_size);
+ file_options.compaction_readahead_size,
+ allow_unprepared_value);
}
if (handle != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
if (t == nullptr) {
- s = FindTable(file_options_, internal_comparator, fd, &handle);
+ s = FindTable(options, file_options_, internal_comparator, fd, &handle);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
// Maybe we can include the whole file ifsnapshot == fd.largest_seqno.
if (options.snapshot != nullptr &&
(get_context->has_callback() ||
- static_cast_with_check<const SnapshotImpl, const Snapshot>(
- options.snapshot)
+ static_cast_with_check<const SnapshotImpl>(options.snapshot)
->GetSequenceNumber() <= fd.largest_seqno)) {
// We should consider to use options.snapshot->GetSequenceNumber()
// instead of GetInternalKeySeqno(k), which will make the code
GetContext* get_context,
const SliceTransform* prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters,
- int level) {
+ int level, size_t max_file_size_for_l0_meta_pin) {
auto& fd = file_meta.fd;
std::string* row_cache_entry = nullptr;
bool done = false;
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
- if (!done && s.ok()) {
+ if (!done) {
+ assert(s.ok());
if (t == nullptr) {
- s = FindTable(
- file_options_, internal_comparator, fd, &handle, prefix_extractor,
- options.read_tier == kBlockCacheTier /* no_io */,
- true /* record_read_stats */, file_read_hist, skip_filters, level);
+ s = FindTable(options, file_options_, internal_comparator, fd, &handle,
+ prefix_extractor,
+ options.read_tier == kBlockCacheTier /* no_io */,
+ true /* record_read_stats */, file_read_hist, skip_filters,
+ level, true /* prefetch_index_and_filter_in_cache */,
+ max_file_size_for_l0_meta_pin);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
- ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
- &DeleteEntry<std::string>);
+ // If row cache is full, it's OK to continue.
+ ioptions_.row_cache
+ ->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
+ &DeleteEntry<std::string>)
+ .PermitUncheckedError();
}
#endif // ROCKSDB_LITE
for (auto miter = table_range.begin(); miter != table_range.end();
++miter) {
- const Slice& user_key = miter->ukey;
- ;
+ const Slice& user_key = miter->ukey_with_ts;
+
GetContext* get_context = miter->get_context;
if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
if (s.ok() && !table_range.empty()) {
if (t == nullptr) {
s = FindTable(
- file_options_, internal_comparator, fd, &handle, prefix_extractor,
- options.read_tier == kBlockCacheTier /* no_io */,
+ options, file_options_, internal_comparator, fd, &handle,
+ prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters, level);
TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
if (s.ok()) {
++iter) {
SequenceNumber* max_covering_tombstone_seq =
iter->get_context->max_covering_tombstone_seq();
- *max_covering_tombstone_seq =
- std::max(*max_covering_tombstone_seq,
- range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey));
+ *max_covering_tombstone_seq = std::max(
+ *max_covering_tombstone_seq,
+ range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts));
}
}
}
for (auto miter = table_range.begin(); miter != table_range.end();
++miter) {
std::string& row_cache_entry = row_cache_entries[row_idx++];
- const Slice& user_key = miter->ukey;
+ const Slice& user_key = miter->ukey_with_ts;
;
GetContext* get_context = miter->get_context;
size_t charge =
row_cache_key.Size() + row_cache_entry.size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(row_cache_entry));
- ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
- &DeleteEntry<std::string>);
+ // If row cache is full, it's OK.
+ ioptions_.row_cache
+ ->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
+ &DeleteEntry<std::string>)
+ .PermitUncheckedError();
}
}
}
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
std::shared_ptr<const TableProperties>* properties,
const SliceTransform* prefix_extractor, bool no_io) {
- Status s;
auto table_reader = fd.table_reader;
// table already been pre-loaded?
if (table_reader) {
*properties = table_reader->GetTableProperties();
- return s;
+ return Status::OK();
}
Cache::Handle* table_handle = nullptr;
- s = FindTable(file_options, internal_comparator, fd, &table_handle,
- prefix_extractor, no_io);
+ Status s = FindTable(ReadOptions(), file_options, internal_comparator, fd,
+ &table_handle, prefix_extractor, no_io);
if (!s.ok()) {
return s;
}
const FileOptions& file_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
const SliceTransform* prefix_extractor) {
- Status s;
auto table_reader = fd.table_reader;
// table already been pre-loaded?
if (table_reader) {
}
Cache::Handle* table_handle = nullptr;
- s = FindTable(file_options, internal_comparator, fd, &table_handle,
- prefix_extractor, true);
+ Status s = FindTable(ReadOptions(), file_options, internal_comparator, fd,
+ &table_handle, prefix_extractor, true);
if (!s.ok()) {
return 0;
}
Cache::Handle* table_handle = nullptr;
if (table_reader == nullptr) {
const bool for_compaction = (caller == TableReaderCaller::kCompaction);
- Status s = FindTable(file_options_, internal_comparator, fd, &table_handle,
- prefix_extractor, false /* no_io */,
+ Status s = FindTable(ReadOptions(), file_options_, internal_comparator, fd,
+ &table_handle, prefix_extractor, false /* no_io */,
!for_compaction /* record_read_stats */);
if (s.ok()) {
table_reader = GetTableReaderFromHandle(table_handle);
Cache::Handle* table_handle = nullptr;
if (table_reader == nullptr) {
const bool for_compaction = (caller == TableReaderCaller::kCompaction);
- Status s = FindTable(file_options_, internal_comparator, fd, &table_handle,
- prefix_extractor, false /* no_io */,
+ Status s = FindTable(ReadOptions(), file_options_, internal_comparator, fd,
+ &table_handle, prefix_extractor, false /* no_io */,
!for_compaction /* record_read_stats */);
if (s.ok()) {
table_reader = GetTableReaderFromHandle(table_handle);