]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/table_cache.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / table_cache.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/table_cache.h"
11
12 #include "db/dbformat.h"
13 #include "db/range_tombstone_fragmenter.h"
14 #include "db/snapshot_impl.h"
15 #include "db/version_edit.h"
16 #include "file/file_util.h"
17 #include "file/filename.h"
18 #include "file/random_access_file_reader.h"
19 #include "monitoring/perf_context_imp.h"
20 #include "rocksdb/statistics.h"
21 #include "table/block_based/block_based_table_reader.h"
22 #include "table/get_context.h"
23 #include "table/internal_iterator.h"
24 #include "table/iterator_wrapper.h"
25 #include "table/multiget_context.h"
26 #include "table/table_builder.h"
27 #include "table/table_reader.h"
28 #include "test_util/sync_point.h"
29 #include "util/cast_util.h"
30 #include "util/coding.h"
31 #include "util/stop_watch.h"
32
33 namespace ROCKSDB_NAMESPACE {
34
35 namespace {
36
37 template <class T>
38 static void DeleteEntry(const Slice& /*key*/, void* value) {
39 T* typed_value = reinterpret_cast<T*>(value);
40 delete typed_value;
41 }
42
43 static void UnrefEntry(void* arg1, void* arg2) {
44 Cache* cache = reinterpret_cast<Cache*>(arg1);
45 Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2);
46 cache->Release(h);
47 }
48
49 static Slice GetSliceForFileNumber(const uint64_t* file_number) {
50 return Slice(reinterpret_cast<const char*>(file_number),
51 sizeof(*file_number));
52 }
53
54 #ifndef ROCKSDB_LITE
55
56 void AppendVarint64(IterKey* key, uint64_t v) {
57 char buf[10];
58 auto ptr = EncodeVarint64(buf, v);
59 key->TrimAppend(key->Size(), buf, ptr - buf);
60 }
61
62 #endif // ROCKSDB_LITE
63
64 } // namespace
65
66 const int kLoadConcurency = 128;
67
68 TableCache::TableCache(const ImmutableCFOptions& ioptions,
69 const FileOptions& file_options, Cache* const cache,
70 BlockCacheTracer* const block_cache_tracer,
71 const std::shared_ptr<IOTracer>& io_tracer)
72 : ioptions_(ioptions),
73 file_options_(file_options),
74 cache_(cache),
75 immortal_tables_(false),
76 block_cache_tracer_(block_cache_tracer),
77 loader_mutex_(kLoadConcurency, GetSliceNPHash64),
78 io_tracer_(io_tracer) {
79 if (ioptions_.row_cache) {
80 // If the same cache is shared by multiple instances, we need to
81 // disambiguate its entries.
82 PutVarint64(&row_cache_id_, ioptions_.row_cache->NewId());
83 }
84 }
85
86 TableCache::~TableCache() {
87 }
88
89 TableReader* TableCache::GetTableReaderFromHandle(Cache::Handle* handle) {
90 return reinterpret_cast<TableReader*>(cache_->Value(handle));
91 }
92
93 void TableCache::ReleaseHandle(Cache::Handle* handle) {
94 cache_->Release(handle);
95 }
96
97 Status TableCache::GetTableReader(
98 const ReadOptions& ro, const FileOptions& file_options,
99 const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
100 bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
101 std::unique_ptr<TableReader>* table_reader,
102 const SliceTransform* prefix_extractor, bool skip_filters, int level,
103 bool prefetch_index_and_filter_in_cache,
104 size_t max_file_size_for_l0_meta_pin) {
105 std::string fname =
106 TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId());
107 std::unique_ptr<FSRandomAccessFile> file;
108 FileOptions fopts = file_options;
109 Status s = PrepareIOFromReadOptions(ro, ioptions_.env, fopts.io_options);
110 if (s.ok()) {
111 s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr);
112 }
113 RecordTick(ioptions_.statistics, NO_FILE_OPENS);
114 if (s.IsPathNotFound()) {
115 fname = Rocks2LevelTableFileName(fname);
116 s = PrepareIOFromReadOptions(ro, ioptions_.env, fopts.io_options);
117 if (s.ok()) {
118 s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file,
119 nullptr);
120 }
121 RecordTick(ioptions_.statistics, NO_FILE_OPENS);
122 }
123
124 if (s.ok()) {
125 if (!sequential_mode && ioptions_.advise_random_on_open) {
126 file->Hint(FSRandomAccessFile::kRandom);
127 }
128 StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
129 std::unique_ptr<RandomAccessFileReader> file_reader(
130 new RandomAccessFileReader(
131 std::move(file), fname, ioptions_.env, io_tracer_,
132 record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
133 file_read_hist, ioptions_.rate_limiter, ioptions_.listeners));
134 s = ioptions_.table_factory->NewTableReader(
135 ro,
136 TableReaderOptions(ioptions_, prefix_extractor, file_options,
137 internal_comparator, skip_filters, immortal_tables_,
138 false /* force_direct_prefetch */, level,
139 fd.largest_seqno, block_cache_tracer_,
140 max_file_size_for_l0_meta_pin),
141 std::move(file_reader), fd.GetFileSize(), table_reader,
142 prefetch_index_and_filter_in_cache);
143 TEST_SYNC_POINT("TableCache::GetTableReader:0");
144 }
145 return s;
146 }
147
148 void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) {
149 ReleaseHandle(handle);
150 uint64_t number = fd.GetNumber();
151 Slice key = GetSliceForFileNumber(&number);
152 cache_->Erase(key);
153 }
154
155 Status TableCache::FindTable(const ReadOptions& ro,
156 const FileOptions& file_options,
157 const InternalKeyComparator& internal_comparator,
158 const FileDescriptor& fd, Cache::Handle** handle,
159 const SliceTransform* prefix_extractor,
160 const bool no_io, bool record_read_stats,
161 HistogramImpl* file_read_hist, bool skip_filters,
162 int level, bool prefetch_index_and_filter_in_cache,
163 size_t max_file_size_for_l0_meta_pin) {
164 PERF_TIMER_GUARD_WITH_ENV(find_table_nanos, ioptions_.env);
165 uint64_t number = fd.GetNumber();
166 Slice key = GetSliceForFileNumber(&number);
167 *handle = cache_->Lookup(key);
168 TEST_SYNC_POINT_CALLBACK("TableCache::FindTable:0",
169 const_cast<bool*>(&no_io));
170
171 if (*handle == nullptr) {
172 if (no_io) {
173 return Status::Incomplete("Table not found in table_cache, no_io is set");
174 }
175 MutexLock load_lock(loader_mutex_.get(key));
176 // We check the cache again under loading mutex
177 *handle = cache_->Lookup(key);
178 if (*handle != nullptr) {
179 return Status::OK();
180 }
181
182 std::unique_ptr<TableReader> table_reader;
183 Status s = GetTableReader(
184 ro, file_options, internal_comparator, fd, false /* sequential mode */,
185 record_read_stats, file_read_hist, &table_reader, prefix_extractor,
186 skip_filters, level, prefetch_index_and_filter_in_cache,
187 max_file_size_for_l0_meta_pin);
188 if (!s.ok()) {
189 assert(table_reader == nullptr);
190 RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
191 // We do not cache error results so that if the error is transient,
192 // or somebody repairs the file, we recover automatically.
193 } else {
194 s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
195 handle);
196 if (s.ok()) {
197 // Release ownership of table reader.
198 table_reader.release();
199 }
200 }
201 return s;
202 }
203 return Status::OK();
204 }
205
206 InternalIterator* TableCache::NewIterator(
207 const ReadOptions& options, const FileOptions& file_options,
208 const InternalKeyComparator& icomparator, const FileMetaData& file_meta,
209 RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor,
210 TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
211 TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
212 size_t max_file_size_for_l0_meta_pin,
213 const InternalKey* smallest_compaction_key,
214 const InternalKey* largest_compaction_key, bool allow_unprepared_value) {
215 PERF_TIMER_GUARD(new_table_iterator_nanos);
216
217 Status s;
218 TableReader* table_reader = nullptr;
219 Cache::Handle* handle = nullptr;
220 if (table_reader_ptr != nullptr) {
221 *table_reader_ptr = nullptr;
222 }
223 bool for_compaction = caller == TableReaderCaller::kCompaction;
224 auto& fd = file_meta.fd;
225 table_reader = fd.table_reader;
226 if (table_reader == nullptr) {
227 s = FindTable(
228 options, file_options, icomparator, fd, &handle, prefix_extractor,
229 options.read_tier == kBlockCacheTier /* no_io */,
230 !for_compaction /* record_read_stats */, file_read_hist, skip_filters,
231 level, true /* prefetch_index_and_filter_in_cache */,
232 max_file_size_for_l0_meta_pin);
233 if (s.ok()) {
234 table_reader = GetTableReaderFromHandle(handle);
235 }
236 }
237 InternalIterator* result = nullptr;
238 if (s.ok()) {
239 if (options.table_filter &&
240 !options.table_filter(*table_reader->GetTableProperties())) {
241 result = NewEmptyInternalIterator<Slice>(arena);
242 } else {
243 result = table_reader->NewIterator(options, prefix_extractor, arena,
244 skip_filters, caller,
245 file_options.compaction_readahead_size,
246 allow_unprepared_value);
247 }
248 if (handle != nullptr) {
249 result->RegisterCleanup(&UnrefEntry, cache_, handle);
250 handle = nullptr; // prevent from releasing below
251 }
252
253 if (for_compaction) {
254 table_reader->SetupForCompaction();
255 }
256 if (table_reader_ptr != nullptr) {
257 *table_reader_ptr = table_reader;
258 }
259 }
260 if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions) {
261 if (range_del_agg->AddFile(fd.GetNumber())) {
262 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
263 static_cast<FragmentedRangeTombstoneIterator*>(
264 table_reader->NewRangeTombstoneIterator(options)));
265 if (range_del_iter != nullptr) {
266 s = range_del_iter->status();
267 }
268 if (s.ok()) {
269 const InternalKey* smallest = &file_meta.smallest;
270 const InternalKey* largest = &file_meta.largest;
271 if (smallest_compaction_key != nullptr) {
272 smallest = smallest_compaction_key;
273 }
274 if (largest_compaction_key != nullptr) {
275 largest = largest_compaction_key;
276 }
277 range_del_agg->AddTombstones(std::move(range_del_iter), smallest,
278 largest);
279 }
280 }
281 }
282
283 if (handle != nullptr) {
284 ReleaseHandle(handle);
285 }
286 if (!s.ok()) {
287 assert(result == nullptr);
288 result = NewErrorInternalIterator<Slice>(s, arena);
289 }
290 return result;
291 }
292
293 Status TableCache::GetRangeTombstoneIterator(
294 const ReadOptions& options,
295 const InternalKeyComparator& internal_comparator,
296 const FileMetaData& file_meta,
297 std::unique_ptr<FragmentedRangeTombstoneIterator>* out_iter) {
298 const FileDescriptor& fd = file_meta.fd;
299 Status s;
300 TableReader* t = fd.table_reader;
301 Cache::Handle* handle = nullptr;
302 if (t == nullptr) {
303 s = FindTable(options, file_options_, internal_comparator, fd, &handle);
304 if (s.ok()) {
305 t = GetTableReaderFromHandle(handle);
306 }
307 }
308 if (s.ok()) {
309 out_iter->reset(t->NewRangeTombstoneIterator(options));
310 assert(out_iter);
311 }
312 return s;
313 }
314
315 #ifndef ROCKSDB_LITE
316 void TableCache::CreateRowCacheKeyPrefix(const ReadOptions& options,
317 const FileDescriptor& fd,
318 const Slice& internal_key,
319 GetContext* get_context,
320 IterKey& row_cache_key) {
321 uint64_t fd_number = fd.GetNumber();
322 // We use the user key as cache key instead of the internal key,
323 // otherwise the whole cache would be invalidated every time the
324 // sequence key increases. However, to support caching snapshot
325 // reads, we append the sequence number (incremented by 1 to
326 // distinguish from 0) only in this case.
327 // If the snapshot is larger than the largest seqno in the file,
328 // all data should be exposed to the snapshot, so we treat it
329 // the same as there is no snapshot. The exception is that if
330 // a seq-checking callback is registered, some internal keys
331 // may still be filtered out.
332 uint64_t seq_no = 0;
333 // Maybe we can include the whole file ifsnapshot == fd.largest_seqno.
334 if (options.snapshot != nullptr &&
335 (get_context->has_callback() ||
336 static_cast_with_check<const SnapshotImpl>(options.snapshot)
337 ->GetSequenceNumber() <= fd.largest_seqno)) {
338 // We should consider to use options.snapshot->GetSequenceNumber()
339 // instead of GetInternalKeySeqno(k), which will make the code
340 // easier to understand.
341 seq_no = 1 + GetInternalKeySeqno(internal_key);
342 }
343
344 // Compute row cache key.
345 row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
346 row_cache_id_.size());
347 AppendVarint64(&row_cache_key, fd_number);
348 AppendVarint64(&row_cache_key, seq_no);
349 }
350
351 bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
352 size_t prefix_size, GetContext* get_context) {
353 bool found = false;
354
355 row_cache_key.TrimAppend(prefix_size, user_key.data(), user_key.size());
356 if (auto row_handle =
357 ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
358 // Cleanable routine to release the cache entry
359 Cleanable value_pinner;
360 auto release_cache_entry_func = [](void* cache_to_clean,
361 void* cache_handle) {
362 ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
363 };
364 auto found_row_cache_entry =
365 static_cast<const std::string*>(ioptions_.row_cache->Value(row_handle));
366 // If it comes here value is located on the cache.
367 // found_row_cache_entry points to the value on cache,
368 // and value_pinner has cleanup procedure for the cached entry.
369 // After replayGetContextLog() returns, get_context.pinnable_slice_
370 // will point to cache entry buffer (or a copy based on that) and
371 // cleanup routine under value_pinner will be delegated to
372 // get_context.pinnable_slice_. Cache entry is released when
373 // get_context.pinnable_slice_ is reset.
374 value_pinner.RegisterCleanup(release_cache_entry_func,
375 ioptions_.row_cache.get(), row_handle);
376 replayGetContextLog(*found_row_cache_entry, user_key, get_context,
377 &value_pinner);
378 RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
379 found = true;
380 } else {
381 RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
382 }
383 return found;
384 }
385 #endif // ROCKSDB_LITE
386
387 Status TableCache::Get(const ReadOptions& options,
388 const InternalKeyComparator& internal_comparator,
389 const FileMetaData& file_meta, const Slice& k,
390 GetContext* get_context,
391 const SliceTransform* prefix_extractor,
392 HistogramImpl* file_read_hist, bool skip_filters,
393 int level, size_t max_file_size_for_l0_meta_pin) {
394 auto& fd = file_meta.fd;
395 std::string* row_cache_entry = nullptr;
396 bool done = false;
397 #ifndef ROCKSDB_LITE
398 IterKey row_cache_key;
399 std::string row_cache_entry_buffer;
400
401 // Check row cache if enabled. Since row cache does not currently store
402 // sequence numbers, we cannot use it if we need to fetch the sequence.
403 if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
404 auto user_key = ExtractUserKey(k);
405 CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key);
406 done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(),
407 get_context);
408 if (!done) {
409 row_cache_entry = &row_cache_entry_buffer;
410 }
411 }
412 #endif // ROCKSDB_LITE
413 Status s;
414 TableReader* t = fd.table_reader;
415 Cache::Handle* handle = nullptr;
416 if (!done) {
417 assert(s.ok());
418 if (t == nullptr) {
419 s = FindTable(options, file_options_, internal_comparator, fd, &handle,
420 prefix_extractor,
421 options.read_tier == kBlockCacheTier /* no_io */,
422 true /* record_read_stats */, file_read_hist, skip_filters,
423 level, true /* prefetch_index_and_filter_in_cache */,
424 max_file_size_for_l0_meta_pin);
425 if (s.ok()) {
426 t = GetTableReaderFromHandle(handle);
427 }
428 }
429 SequenceNumber* max_covering_tombstone_seq =
430 get_context->max_covering_tombstone_seq();
431 if (s.ok() && max_covering_tombstone_seq != nullptr &&
432 !options.ignore_range_deletions) {
433 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
434 t->NewRangeTombstoneIterator(options));
435 if (range_del_iter != nullptr) {
436 *max_covering_tombstone_seq = std::max(
437 *max_covering_tombstone_seq,
438 range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)));
439 }
440 }
441 if (s.ok()) {
442 get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
443 s = t->Get(options, k, get_context, prefix_extractor, skip_filters);
444 get_context->SetReplayLog(nullptr);
445 } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
446 // Couldn't find Table in cache but treat as kFound if no_io set
447 get_context->MarkKeyMayExist();
448 s = Status::OK();
449 done = true;
450 }
451 }
452
453 #ifndef ROCKSDB_LITE
454 // Put the replay log in row cache only if something was found.
455 if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) {
456 size_t charge =
457 row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
458 void* row_ptr = new std::string(std::move(*row_cache_entry));
459 // If row cache is full, it's OK to continue.
460 ioptions_.row_cache
461 ->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
462 &DeleteEntry<std::string>)
463 .PermitUncheckedError();
464 }
465 #endif // ROCKSDB_LITE
466
467 if (handle != nullptr) {
468 ReleaseHandle(handle);
469 }
470 return s;
471 }
472
473 // Batched version of TableCache::MultiGet.
474 Status TableCache::MultiGet(const ReadOptions& options,
475 const InternalKeyComparator& internal_comparator,
476 const FileMetaData& file_meta,
477 const MultiGetContext::Range* mget_range,
478 const SliceTransform* prefix_extractor,
479 HistogramImpl* file_read_hist, bool skip_filters,
480 int level) {
481 auto& fd = file_meta.fd;
482 Status s;
483 TableReader* t = fd.table_reader;
484 Cache::Handle* handle = nullptr;
485 MultiGetRange table_range(*mget_range, mget_range->begin(),
486 mget_range->end());
487 #ifndef ROCKSDB_LITE
488 autovector<std::string, MultiGetContext::MAX_BATCH_SIZE> row_cache_entries;
489 IterKey row_cache_key;
490 size_t row_cache_key_prefix_size = 0;
491 KeyContext& first_key = *table_range.begin();
492 bool lookup_row_cache =
493 ioptions_.row_cache && !first_key.get_context->NeedToReadSequence();
494
495 // Check row cache if enabled. Since row cache does not currently store
496 // sequence numbers, we cannot use it if we need to fetch the sequence.
497 if (lookup_row_cache) {
498 GetContext* first_context = first_key.get_context;
499 CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context,
500 row_cache_key);
501 row_cache_key_prefix_size = row_cache_key.Size();
502
503 for (auto miter = table_range.begin(); miter != table_range.end();
504 ++miter) {
505 const Slice& user_key = miter->ukey_with_ts;
506
507 GetContext* get_context = miter->get_context;
508
509 if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
510 get_context)) {
511 table_range.SkipKey(miter);
512 } else {
513 row_cache_entries.emplace_back();
514 get_context->SetReplayLog(&(row_cache_entries.back()));
515 }
516 }
517 }
518 #endif // ROCKSDB_LITE
519
520 // Check that table_range is not empty. Its possible all keys may have been
521 // found in the row cache and thus the range may now be empty
522 if (s.ok() && !table_range.empty()) {
523 if (t == nullptr) {
524 s = FindTable(
525 options, file_options_, internal_comparator, fd, &handle,
526 prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */,
527 true /* record_read_stats */, file_read_hist, skip_filters, level);
528 TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
529 if (s.ok()) {
530 t = GetTableReaderFromHandle(handle);
531 assert(t);
532 }
533 }
534 if (s.ok() && !options.ignore_range_deletions) {
535 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
536 t->NewRangeTombstoneIterator(options));
537 if (range_del_iter != nullptr) {
538 for (auto iter = table_range.begin(); iter != table_range.end();
539 ++iter) {
540 SequenceNumber* max_covering_tombstone_seq =
541 iter->get_context->max_covering_tombstone_seq();
542 *max_covering_tombstone_seq = std::max(
543 *max_covering_tombstone_seq,
544 range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts));
545 }
546 }
547 }
548 if (s.ok()) {
549 t->MultiGet(options, &table_range, prefix_extractor, skip_filters);
550 } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
551 for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
552 Status* status = iter->s;
553 if (status->IsIncomplete()) {
554 // Couldn't find Table in cache but treat as kFound if no_io set
555 iter->get_context->MarkKeyMayExist();
556 s = Status::OK();
557 }
558 }
559 }
560 }
561
562 #ifndef ROCKSDB_LITE
563 if (lookup_row_cache) {
564 size_t row_idx = 0;
565
566 for (auto miter = table_range.begin(); miter != table_range.end();
567 ++miter) {
568 std::string& row_cache_entry = row_cache_entries[row_idx++];
569 const Slice& user_key = miter->ukey_with_ts;
570 ;
571 GetContext* get_context = miter->get_context;
572
573 get_context->SetReplayLog(nullptr);
574 // Compute row cache key.
575 row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(),
576 user_key.size());
577 // Put the replay log in row cache only if something was found.
578 if (s.ok() && !row_cache_entry.empty()) {
579 size_t charge =
580 row_cache_key.Size() + row_cache_entry.size() + sizeof(std::string);
581 void* row_ptr = new std::string(std::move(row_cache_entry));
582 // If row cache is full, it's OK.
583 ioptions_.row_cache
584 ->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
585 &DeleteEntry<std::string>)
586 .PermitUncheckedError();
587 }
588 }
589 }
590 #endif // ROCKSDB_LITE
591
592 if (handle != nullptr) {
593 ReleaseHandle(handle);
594 }
595 return s;
596 }
597
598 Status TableCache::GetTableProperties(
599 const FileOptions& file_options,
600 const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
601 std::shared_ptr<const TableProperties>* properties,
602 const SliceTransform* prefix_extractor, bool no_io) {
603 auto table_reader = fd.table_reader;
604 // table already been pre-loaded?
605 if (table_reader) {
606 *properties = table_reader->GetTableProperties();
607
608 return Status::OK();
609 }
610
611 Cache::Handle* table_handle = nullptr;
612 Status s = FindTable(ReadOptions(), file_options, internal_comparator, fd,
613 &table_handle, prefix_extractor, no_io);
614 if (!s.ok()) {
615 return s;
616 }
617 assert(table_handle);
618 auto table = GetTableReaderFromHandle(table_handle);
619 *properties = table->GetTableProperties();
620 ReleaseHandle(table_handle);
621 return s;
622 }
623
624 size_t TableCache::GetMemoryUsageByTableReader(
625 const FileOptions& file_options,
626 const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
627 const SliceTransform* prefix_extractor) {
628 auto table_reader = fd.table_reader;
629 // table already been pre-loaded?
630 if (table_reader) {
631 return table_reader->ApproximateMemoryUsage();
632 }
633
634 Cache::Handle* table_handle = nullptr;
635 Status s = FindTable(ReadOptions(), file_options, internal_comparator, fd,
636 &table_handle, prefix_extractor, true);
637 if (!s.ok()) {
638 return 0;
639 }
640 assert(table_handle);
641 auto table = GetTableReaderFromHandle(table_handle);
642 auto ret = table->ApproximateMemoryUsage();
643 ReleaseHandle(table_handle);
644 return ret;
645 }
646
647 void TableCache::Evict(Cache* cache, uint64_t file_number) {
648 cache->Erase(GetSliceForFileNumber(&file_number));
649 }
650
651 uint64_t TableCache::ApproximateOffsetOf(
652 const Slice& key, const FileDescriptor& fd, TableReaderCaller caller,
653 const InternalKeyComparator& internal_comparator,
654 const SliceTransform* prefix_extractor) {
655 uint64_t result = 0;
656 TableReader* table_reader = fd.table_reader;
657 Cache::Handle* table_handle = nullptr;
658 if (table_reader == nullptr) {
659 const bool for_compaction = (caller == TableReaderCaller::kCompaction);
660 Status s = FindTable(ReadOptions(), file_options_, internal_comparator, fd,
661 &table_handle, prefix_extractor, false /* no_io */,
662 !for_compaction /* record_read_stats */);
663 if (s.ok()) {
664 table_reader = GetTableReaderFromHandle(table_handle);
665 }
666 }
667
668 if (table_reader != nullptr) {
669 result = table_reader->ApproximateOffsetOf(key, caller);
670 }
671 if (table_handle != nullptr) {
672 ReleaseHandle(table_handle);
673 }
674
675 return result;
676 }
677
678 uint64_t TableCache::ApproximateSize(
679 const Slice& start, const Slice& end, const FileDescriptor& fd,
680 TableReaderCaller caller, const InternalKeyComparator& internal_comparator,
681 const SliceTransform* prefix_extractor) {
682 uint64_t result = 0;
683 TableReader* table_reader = fd.table_reader;
684 Cache::Handle* table_handle = nullptr;
685 if (table_reader == nullptr) {
686 const bool for_compaction = (caller == TableReaderCaller::kCompaction);
687 Status s = FindTable(ReadOptions(), file_options_, internal_comparator, fd,
688 &table_handle, prefix_extractor, false /* no_io */,
689 !for_compaction /* record_read_stats */);
690 if (s.ok()) {
691 table_reader = GetTableReaderFromHandle(table_handle);
692 }
693 }
694
695 if (table_reader != nullptr) {
696 result = table_reader->ApproximateSize(start, end, caller);
697 }
698 if (table_handle != nullptr) {
699 ReleaseHandle(table_handle);
700 }
701
702 return result;
703 }
704 } // namespace ROCKSDB_NAMESPACE