]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/table_cache.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / table_cache.cc
index 411959a33e16f8daedae6baac669e69309d7b884..c47d62891c4e18e9ae2468c14b0edbe1e3e8d084 100644 (file)
@@ -13,6 +13,7 @@
 #include "db/range_tombstone_fragmenter.h"
 #include "db/snapshot_impl.h"
 #include "db/version_edit.h"
+#include "file/file_util.h"
 #include "file/filename.h"
 #include "file/random_access_file_reader.h"
 #include "monitoring/perf_context_imp.h"
@@ -62,14 +63,19 @@ void AppendVarint64(IterKey* key, uint64_t v) {
 
 }  // namespace
 
+const int kLoadConcurency = 128;
+
 TableCache::TableCache(const ImmutableCFOptions& ioptions,
                        const FileOptions& file_options, Cache* const cache,
-                       BlockCacheTracer* const block_cache_tracer)
+                       BlockCacheTracer* const block_cache_tracer,
+                       const std::shared_ptr<IOTracer>& io_tracer)
     : ioptions_(ioptions),
       file_options_(file_options),
       cache_(cache),
       immortal_tables_(false),
-      block_cache_tracer_(block_cache_tracer) {
+      block_cache_tracer_(block_cache_tracer),
+      loader_mutex_(kLoadConcurency, GetSliceNPHash64),
+      io_tracer_(io_tracer) {
   if (ioptions_.row_cache) {
     // If the same cache is shared by multiple instances, we need to
     // disambiguate its entries.
@@ -89,21 +95,29 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) {
 }
 
 Status TableCache::GetTableReader(
-    const FileOptions& file_options,
+    const ReadOptions& ro, const FileOptions& file_options,
     const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
     bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
     std::unique_ptr<TableReader>* table_reader,
     const SliceTransform* prefix_extractor, bool skip_filters, int level,
-    bool prefetch_index_and_filter_in_cache) {
+    bool prefetch_index_and_filter_in_cache,
+    size_t max_file_size_for_l0_meta_pin) {
   std::string fname =
       TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId());
   std::unique_ptr<FSRandomAccessFile> file;
-  Status s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file,
-                                               nullptr);
+  FileOptions fopts = file_options;
+  Status s = PrepareIOFromReadOptions(ro, ioptions_.env, fopts.io_options);
+  if (s.ok()) {
+    s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr);
+  }
   RecordTick(ioptions_.statistics, NO_FILE_OPENS);
   if (s.IsPathNotFound()) {
     fname = Rocks2LevelTableFileName(fname);
-    s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file, nullptr);
+    s = PrepareIOFromReadOptions(ro, ioptions_.env, fopts.io_options);
+    if (s.ok()) {
+      s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file,
+                                            nullptr);
+    }
     RecordTick(ioptions_.statistics, NO_FILE_OPENS);
   }
 
@@ -114,13 +128,16 @@ Status TableCache::GetTableReader(
     StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
     std::unique_ptr<RandomAccessFileReader> file_reader(
         new RandomAccessFileReader(
-            std::move(file), fname, ioptions_.env,
+            std::move(file), fname, ioptions_.env, io_tracer_,
             record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
             file_read_hist, ioptions_.rate_limiter, ioptions_.listeners));
     s = ioptions_.table_factory->NewTableReader(
+        ro,
         TableReaderOptions(ioptions_, prefix_extractor, file_options,
                            internal_comparator, skip_filters, immortal_tables_,
-                           level, fd.largest_seqno, block_cache_tracer_),
+                           false /* force_direct_prefetch */, level,
+                           fd.largest_seqno, block_cache_tracer_,
+                           max_file_size_for_l0_meta_pin),
         std::move(file_reader), fd.GetFileSize(), table_reader,
         prefetch_index_and_filter_in_cache);
     TEST_SYNC_POINT("TableCache::GetTableReader:0");
@@ -135,16 +152,16 @@ void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) {
   cache_->Erase(key);
 }
 
-Status TableCache::FindTable(const FileOptions& file_options,
+Status TableCache::FindTable(const ReadOptions& ro,
+                             const FileOptions& file_options,
                              const InternalKeyComparator& internal_comparator,
                              const FileDescriptor& fd, Cache::Handle** handle,
                              const SliceTransform* prefix_extractor,
                              const bool no_io, bool record_read_stats,
                              HistogramImpl* file_read_hist, bool skip_filters,
-                             int level,
-                             bool prefetch_index_and_filter_in_cache) {
+                             int level, bool prefetch_index_and_filter_in_cache,
+                             size_t max_file_size_for_l0_meta_pin) {
   PERF_TIMER_GUARD_WITH_ENV(find_table_nanos, ioptions_.env);
-  Status s;
   uint64_t number = fd.GetNumber();
   Slice key = GetSliceForFileNumber(&number);
   *handle = cache_->Lookup(key);
@@ -152,14 +169,22 @@ Status TableCache::FindTable(const FileOptions& file_options,
                            const_cast<bool*>(&no_io));
 
   if (*handle == nullptr) {
-    if (no_io) {  // Don't do IO and return a not-found status
+    if (no_io) {
       return Status::Incomplete("Table not found in table_cache, no_io is set");
     }
+    MutexLock load_lock(loader_mutex_.get(key));
+    // We check the cache again under loading mutex
+    *handle = cache_->Lookup(key);
+    if (*handle != nullptr) {
+      return Status::OK();
+    }
+
     std::unique_ptr<TableReader> table_reader;
-    s = GetTableReader(file_options, internal_comparator, fd,
-                       false /* sequential mode */, record_read_stats,
-                       file_read_hist, &table_reader, prefix_extractor,
-                       skip_filters, level, prefetch_index_and_filter_in_cache);
+    Status s = GetTableReader(
+        ro, file_options, internal_comparator, fd, false /* sequential mode */,
+        record_read_stats, file_read_hist, &table_reader, prefix_extractor,
+        skip_filters, level, prefetch_index_and_filter_in_cache,
+        max_file_size_for_l0_meta_pin);
     if (!s.ok()) {
       assert(table_reader == nullptr);
       RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
@@ -173,8 +198,9 @@ Status TableCache::FindTable(const FileOptions& file_options,
         table_reader.release();
       }
     }
+    return s;
   }
-  return s;
+  return Status::OK();
 }
 
 InternalIterator* TableCache::NewIterator(
@@ -183,8 +209,9 @@ InternalIterator* TableCache::NewIterator(
     RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor,
     TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
     TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
+    size_t max_file_size_for_l0_meta_pin,
     const InternalKey* smallest_compaction_key,
-    const InternalKey* largest_compaction_key) {
+    const InternalKey* largest_compaction_key, bool allow_unprepared_value) {
   PERF_TIMER_GUARD(new_table_iterator_nanos);
 
   Status s;
@@ -197,10 +224,12 @@ InternalIterator* TableCache::NewIterator(
   auto& fd = file_meta.fd;
   table_reader = fd.table_reader;
   if (table_reader == nullptr) {
-    s = FindTable(file_options, icomparator, fd, &handle, prefix_extractor,
-                  options.read_tier == kBlockCacheTier /* no_io */,
-                  !for_compaction /* record_read_stats */, file_read_hist,
-                  skip_filters, level);
+    s = FindTable(
+        options, file_options, icomparator, fd, &handle, prefix_extractor,
+        options.read_tier == kBlockCacheTier /* no_io */,
+        !for_compaction /* record_read_stats */, file_read_hist, skip_filters,
+        level, true /* prefetch_index_and_filter_in_cache */,
+        max_file_size_for_l0_meta_pin);
     if (s.ok()) {
       table_reader = GetTableReaderFromHandle(handle);
     }
@@ -213,7 +242,8 @@ InternalIterator* TableCache::NewIterator(
     } else {
       result = table_reader->NewIterator(options, prefix_extractor, arena,
                                    skip_filters, caller,
-                                   file_options.compaction_readahead_size);
+                                   file_options.compaction_readahead_size,
+                                   allow_unprepared_value);
     }
     if (handle != nullptr) {
       result->RegisterCleanup(&UnrefEntry, cache_, handle);
@@ -270,7 +300,7 @@ Status TableCache::GetRangeTombstoneIterator(
   TableReader* t = fd.table_reader;
   Cache::Handle* handle = nullptr;
   if (t == nullptr) {
-    s = FindTable(file_options_, internal_comparator, fd, &handle);
+    s = FindTable(options, file_options_, internal_comparator, fd, &handle);
     if (s.ok()) {
       t = GetTableReaderFromHandle(handle);
     }
@@ -303,8 +333,7 @@ void TableCache::CreateRowCacheKeyPrefix(const ReadOptions& options,
   // Maybe we can include the whole file ifsnapshot == fd.largest_seqno.
   if (options.snapshot != nullptr &&
       (get_context->has_callback() ||
-       static_cast_with_check<const SnapshotImpl, const Snapshot>(
-           options.snapshot)
+       static_cast_with_check<const SnapshotImpl>(options.snapshot)
                ->GetSequenceNumber() <= fd.largest_seqno)) {
     // We should consider to use options.snapshot->GetSequenceNumber()
     // instead of GetInternalKeySeqno(k), which will make the code
@@ -361,7 +390,7 @@ Status TableCache::Get(const ReadOptions& options,
                        GetContext* get_context,
                        const SliceTransform* prefix_extractor,
                        HistogramImpl* file_read_hist, bool skip_filters,
-                       int level) {
+                       int level, size_t max_file_size_for_l0_meta_pin) {
   auto& fd = file_meta.fd;
   std::string* row_cache_entry = nullptr;
   bool done = false;
@@ -384,12 +413,15 @@ Status TableCache::Get(const ReadOptions& options,
   Status s;
   TableReader* t = fd.table_reader;
   Cache::Handle* handle = nullptr;
-  if (!done && s.ok()) {
+  if (!done) {
+    assert(s.ok());
     if (t == nullptr) {
-      s = FindTable(
-          file_options_, internal_comparator, fd, &handle, prefix_extractor,
-          options.read_tier == kBlockCacheTier /* no_io */,
-          true /* record_read_stats */, file_read_hist, skip_filters, level);
+      s = FindTable(options, file_options_, internal_comparator, fd, &handle,
+                    prefix_extractor,
+                    options.read_tier == kBlockCacheTier /* no_io */,
+                    true /* record_read_stats */, file_read_hist, skip_filters,
+                    level, true /* prefetch_index_and_filter_in_cache */,
+                    max_file_size_for_l0_meta_pin);
       if (s.ok()) {
         t = GetTableReaderFromHandle(handle);
       }
@@ -424,8 +456,11 @@ Status TableCache::Get(const ReadOptions& options,
     size_t charge =
         row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
     void* row_ptr = new std::string(std::move(*row_cache_entry));
-    ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
-                                &DeleteEntry<std::string>);
+    // If row cache is full, it's OK to continue.
+    ioptions_.row_cache
+        ->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
+                 &DeleteEntry<std::string>)
+        .PermitUncheckedError();
   }
 #endif  // ROCKSDB_LITE
 
@@ -467,8 +502,8 @@ Status TableCache::MultiGet(const ReadOptions& options,
 
     for (auto miter = table_range.begin(); miter != table_range.end();
          ++miter) {
-      const Slice& user_key = miter->ukey;
-      ;
+      const Slice& user_key = miter->ukey_with_ts;
+
       GetContext* get_context = miter->get_context;
 
       if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
@@ -487,8 +522,8 @@ Status TableCache::MultiGet(const ReadOptions& options,
   if (s.ok() && !table_range.empty()) {
     if (t == nullptr) {
       s = FindTable(
-          file_options_, internal_comparator, fd, &handle, prefix_extractor,
-          options.read_tier == kBlockCacheTier /* no_io */,
+          options, file_options_, internal_comparator, fd, &handle,
+          prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */,
           true /* record_read_stats */, file_read_hist, skip_filters, level);
       TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
       if (s.ok()) {
@@ -504,9 +539,9 @@ Status TableCache::MultiGet(const ReadOptions& options,
              ++iter) {
           SequenceNumber* max_covering_tombstone_seq =
               iter->get_context->max_covering_tombstone_seq();
-          *max_covering_tombstone_seq =
-              std::max(*max_covering_tombstone_seq,
-                       range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey));
+          *max_covering_tombstone_seq = std::max(
+              *max_covering_tombstone_seq,
+              range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts));
         }
       }
     }
@@ -531,7 +566,7 @@ Status TableCache::MultiGet(const ReadOptions& options,
     for (auto miter = table_range.begin(); miter != table_range.end();
          ++miter) {
       std::string& row_cache_entry = row_cache_entries[row_idx++];
-      const Slice& user_key = miter->ukey;
+      const Slice& user_key = miter->ukey_with_ts;
       ;
       GetContext* get_context = miter->get_context;
 
@@ -544,8 +579,11 @@ Status TableCache::MultiGet(const ReadOptions& options,
         size_t charge =
             row_cache_key.Size() + row_cache_entry.size() + sizeof(std::string);
         void* row_ptr = new std::string(std::move(row_cache_entry));
-        ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
-                                    &DeleteEntry<std::string>);
+        // If row cache is full, it's OK.
+        ioptions_.row_cache
+            ->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
+                     &DeleteEntry<std::string>)
+            .PermitUncheckedError();
       }
     }
   }
@@ -562,18 +600,17 @@ Status TableCache::GetTableProperties(
     const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
     std::shared_ptr<const TableProperties>* properties,
     const SliceTransform* prefix_extractor, bool no_io) {
-  Status s;
   auto table_reader = fd.table_reader;
   // table already been pre-loaded?
   if (table_reader) {
     *properties = table_reader->GetTableProperties();
 
-    return s;
+    return Status::OK();
   }
 
   Cache::Handle* table_handle = nullptr;
-  s = FindTable(file_options, internal_comparator, fd, &table_handle,
-                prefix_extractor, no_io);
+  Status s = FindTable(ReadOptions(), file_options, internal_comparator, fd,
+                       &table_handle, prefix_extractor, no_io);
   if (!s.ok()) {
     return s;
   }
@@ -588,7 +625,6 @@ size_t TableCache::GetMemoryUsageByTableReader(
     const FileOptions& file_options,
     const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
     const SliceTransform* prefix_extractor) {
-  Status s;
   auto table_reader = fd.table_reader;
   // table already been pre-loaded?
   if (table_reader) {
@@ -596,8 +632,8 @@ size_t TableCache::GetMemoryUsageByTableReader(
   }
 
   Cache::Handle* table_handle = nullptr;
-  s = FindTable(file_options, internal_comparator, fd, &table_handle,
-                prefix_extractor, true);
+  Status s = FindTable(ReadOptions(), file_options, internal_comparator, fd,
+                       &table_handle, prefix_extractor, true);
   if (!s.ok()) {
     return 0;
   }
@@ -621,8 +657,8 @@ uint64_t TableCache::ApproximateOffsetOf(
   Cache::Handle* table_handle = nullptr;
   if (table_reader == nullptr) {
     const bool for_compaction = (caller == TableReaderCaller::kCompaction);
-    Status s = FindTable(file_options_, internal_comparator, fd, &table_handle,
-                         prefix_extractor, false /* no_io */,
+    Status s = FindTable(ReadOptions(), file_options_, internal_comparator, fd,
+                         &table_handle, prefix_extractor, false /* no_io */,
                          !for_compaction /* record_read_stats */);
     if (s.ok()) {
       table_reader = GetTableReaderFromHandle(table_handle);
@@ -648,8 +684,8 @@ uint64_t TableCache::ApproximateSize(
   Cache::Handle* table_handle = nullptr;
   if (table_reader == nullptr) {
     const bool for_compaction = (caller == TableReaderCaller::kCompaction);
-    Status s = FindTable(file_options_, internal_comparator, fd, &table_handle,
-                         prefix_extractor, false /* no_io */,
+    Status s = FindTable(ReadOptions(), file_options_, internal_comparator, fd,
+                         &table_handle, prefix_extractor, false /* no_io */,
                          !for_compaction /* record_read_stats */);
     if (s.ok()) {
       table_reader = GetTableReaderFromHandle(table_handle);