]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/table/block_based/partitioned_index_reader.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / table / block_based / partitioned_index_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "table/block_based/partitioned_index_reader.h"
10
11 #include "file/random_access_file_reader.h"
12 #include "table/block_based/block_based_table_reader.h"
13 #include "table/block_based/partitioned_index_iterator.h"
14
15 namespace ROCKSDB_NAMESPACE {
16 Status PartitionIndexReader::Create(
17 const BlockBasedTable* table, const ReadOptions& ro,
18 FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch,
19 bool pin, BlockCacheLookupContext* lookup_context,
20 std::unique_ptr<IndexReader>* index_reader) {
21 assert(table != nullptr);
22 assert(table->get_rep());
23 assert(!pin || prefetch);
24 assert(index_reader != nullptr);
25
26 CachableEntry<Block> index_block;
27 if (prefetch || !use_cache) {
28 const Status s =
29 ReadIndexBlock(table, prefetch_buffer, ro, use_cache,
30 /*get_context=*/nullptr, lookup_context, &index_block);
31 if (!s.ok()) {
32 return s;
33 }
34
35 if (use_cache && !pin) {
36 index_block.Reset();
37 }
38 }
39
40 index_reader->reset(new PartitionIndexReader(table, std::move(index_block)));
41
42 return Status::OK();
43 }
44
45 InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
46 const ReadOptions& read_options, bool /* disable_prefix_seek */,
47 IndexBlockIter* iter, GetContext* get_context,
48 BlockCacheLookupContext* lookup_context) {
49 const bool no_io = (read_options.read_tier == kBlockCacheTier);
50 CachableEntry<Block> index_block;
51 const Status s =
52 GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority,
53 get_context, lookup_context, &index_block);
54 if (!s.ok()) {
55 if (iter != nullptr) {
56 iter->Invalidate(s);
57 return iter;
58 }
59
60 return NewErrorInternalIterator<IndexValue>(s);
61 }
62
63 const BlockBasedTable::Rep* rep = table()->rep_;
64 InternalIteratorBase<IndexValue>* it = nullptr;
65
66 Statistics* kNullStats = nullptr;
67 // Filters are already checked before seeking the index
68 if (!partition_map_.empty()) {
69 // We don't return pinned data from index blocks, so no need
70 // to set `block_contents_pinned`.
71 it = NewTwoLevelIterator(
72 new BlockBasedTable::PartitionedIndexIteratorState(table(),
73 &partition_map_),
74 index_block.GetValue()->NewIndexIterator(
75 internal_comparator()->user_comparator(),
76 rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
77 index_has_first_key(), index_key_includes_seq(),
78 index_value_is_full()));
79 } else {
80 ReadOptions ro;
81 ro.fill_cache = read_options.fill_cache;
82 ro.deadline = read_options.deadline;
83 ro.io_timeout = read_options.io_timeout;
84 ro.adaptive_readahead = read_options.adaptive_readahead;
85 ro.async_io = read_options.async_io;
86 ro.rate_limiter_priority = read_options.rate_limiter_priority;
87
88 // We don't return pinned data from index blocks, so no need
89 // to set `block_contents_pinned`.
90 std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(
91 index_block.GetValue()->NewIndexIterator(
92 internal_comparator()->user_comparator(),
93 rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
94 index_has_first_key(), index_key_includes_seq(),
95 index_value_is_full()));
96
97 it = new PartitionedIndexIterator(
98 table(), ro, *internal_comparator(), std::move(index_iter),
99 lookup_context ? lookup_context->caller
100 : TableReaderCaller::kUncategorized);
101 }
102
103 assert(it != nullptr);
104 index_block.TransferTo(it);
105
106 return it;
107
108 // TODO(myabandeh): Update TwoLevelIterator to be able to make use of
109 // on-stack BlockIter while the state is on heap. Currentlly it assumes
110 // the first level iter is always on heap and will attempt to delete it
111 // in its destructor.
112 }
113 Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
114 bool pin) {
115 // Before read partitions, prefetch them to avoid lots of IOs
116 BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
117 const BlockBasedTable::Rep* rep = table()->rep_;
118 IndexBlockIter biter;
119 BlockHandle handle;
120 Statistics* kNullStats = nullptr;
121
122 CachableEntry<Block> index_block;
123 {
124 Status s = GetOrReadIndexBlock(false /* no_io */, ro.rate_limiter_priority,
125 nullptr /* get_context */, &lookup_context,
126 &index_block);
127 if (!s.ok()) {
128 return s;
129 }
130 }
131
132 // We don't return pinned data from index blocks, so no need
133 // to set `block_contents_pinned`.
134 index_block.GetValue()->NewIndexIterator(
135 internal_comparator()->user_comparator(),
136 rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true,
137 index_has_first_key(), index_key_includes_seq(), index_value_is_full());
138 // Index partitions are assumed to be consecuitive. Prefetch them all.
139 // Read the first block offset
140 biter.SeekToFirst();
141 if (!biter.Valid()) {
142 // Empty index.
143 return biter.status();
144 }
145 handle = biter.value().handle;
146 uint64_t prefetch_off = handle.offset();
147
148 // Read the last block's offset
149 biter.SeekToLast();
150 if (!biter.Valid()) {
151 // Empty index.
152 return biter.status();
153 }
154 handle = biter.value().handle;
155 uint64_t last_off =
156 handle.offset() + BlockBasedTable::BlockSizeWithTrailer(handle);
157 uint64_t prefetch_len = last_off - prefetch_off;
158 std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
159 rep->CreateFilePrefetchBuffer(
160 0, 0, &prefetch_buffer, false /*Implicit auto readahead*/,
161 0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/);
162 IOOptions opts;
163 {
164 Status s = rep->file->PrepareIOOptions(ro, opts);
165 if (s.ok()) {
166 s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
167 static_cast<size_t>(prefetch_len),
168 ro.rate_limiter_priority);
169 }
170 if (!s.ok()) {
171 return s;
172 }
173 }
174
175 // For saving "all or nothing" to partition_map_
176 UnorderedMap<uint64_t, CachableEntry<Block>> map_in_progress;
177
178 // After prefetch, read the partitions one by one
179 biter.SeekToFirst();
180 size_t partition_count = 0;
181 for (; biter.Valid(); biter.Next()) {
182 handle = biter.value().handle;
183 CachableEntry<Block> block;
184 ++partition_count;
185 // TODO: Support counter batch update for partitioned index and
186 // filter blocks
187 Status s = table()->MaybeReadBlockAndLoadToCache(
188 prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
189 /*wait=*/true, /*for_compaction=*/false, &block, BlockType::kIndex,
190 /*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr,
191 /*async_read=*/false);
192
193 if (!s.ok()) {
194 return s;
195 }
196 if (block.GetValue() != nullptr) {
197 // Might need to "pin" some mmap-read blocks (GetOwnValue) if some
198 // partitions are successfully compressed (cached) and some are not
199 // compressed (mmap eligible)
200 if (block.IsCached() || block.GetOwnValue()) {
201 if (pin) {
202 map_in_progress[handle.offset()] = std::move(block);
203 }
204 }
205 }
206 }
207 Status s = biter.status();
208 // Save (pin) them only if everything checks out
209 if (map_in_progress.size() == partition_count && s.ok()) {
210 std::swap(partition_map_, map_in_progress);
211 }
212 return s;
213 }
214
215 } // namespace ROCKSDB_NAMESPACE