1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "table/get_context.h"
8 #include "db/blob//blob_fetcher.h"
9 #include "db/merge_helper.h"
10 #include "db/pinned_iterators_manager.h"
11 #include "db/read_callback.h"
12 #include "db/wide/wide_column_serialization.h"
13 #include "monitoring/file_read_sample.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "monitoring/statistics.h"
16 #include "rocksdb/merge_operator.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/system_clock.h"
20 namespace ROCKSDB_NAMESPACE
{
24 void appendToReplayLog(std::string
* replay_log
, ValueType type
, Slice value
) {
27 if (replay_log
->empty()) {
28 // Optimization: in the common case of only one operation in the
29 // log, we allocate the exact amount of space needed.
30 replay_log
->reserve(1 + VarintLength(value
.size()) + value
.size());
32 replay_log
->push_back(type
);
33 PutLengthPrefixedSlice(replay_log
, value
);
39 #endif // ROCKSDB_LITE
44 GetContext::GetContext(
45 const Comparator
* ucmp
, const MergeOperator
* merge_operator
, Logger
* logger
,
46 Statistics
* statistics
, GetState init_state
, const Slice
& user_key
,
47 PinnableSlice
* pinnable_val
, PinnableWideColumns
* columns
,
48 std::string
* timestamp
, bool* value_found
, MergeContext
* merge_context
,
49 bool do_merge
, SequenceNumber
* _max_covering_tombstone_seq
,
50 SystemClock
* clock
, SequenceNumber
* seq
,
51 PinnedIteratorsManager
* _pinned_iters_mgr
, ReadCallback
* callback
,
52 bool* is_blob_index
, uint64_t tracing_get_id
, BlobFetcher
* blob_fetcher
)
54 merge_operator_(merge_operator
),
56 statistics_(statistics
),
59 pinnable_val_(pinnable_val
),
61 timestamp_(timestamp
),
62 value_found_(value_found
),
63 merge_context_(merge_context
),
64 max_covering_tombstone_seq_(_max_covering_tombstone_seq
),
68 pinned_iters_mgr_(_pinned_iters_mgr
),
71 is_blob_index_(is_blob_index
),
72 tracing_get_id_(tracing_get_id
),
73 blob_fetcher_(blob_fetcher
) {
75 *seq_
= kMaxSequenceNumber
;
77 sample_
= should_sample_file_read();
80 GetContext::GetContext(const Comparator
* ucmp
,
81 const MergeOperator
* merge_operator
, Logger
* logger
,
82 Statistics
* statistics
, GetState init_state
,
83 const Slice
& user_key
, PinnableSlice
* pinnable_val
,
84 PinnableWideColumns
* columns
, bool* value_found
,
85 MergeContext
* merge_context
, bool do_merge
,
86 SequenceNumber
* _max_covering_tombstone_seq
,
87 SystemClock
* clock
, SequenceNumber
* seq
,
88 PinnedIteratorsManager
* _pinned_iters_mgr
,
89 ReadCallback
* callback
, bool* is_blob_index
,
90 uint64_t tracing_get_id
, BlobFetcher
* blob_fetcher
)
91 : GetContext(ucmp
, merge_operator
, logger
, statistics
, init_state
, user_key
,
92 pinnable_val
, columns
, /*timestamp=*/nullptr, value_found
,
93 merge_context
, do_merge
, _max_covering_tombstone_seq
, clock
,
94 seq
, _pinned_iters_mgr
, callback
, is_blob_index
,
95 tracing_get_id
, blob_fetcher
) {}
97 // Called from TableCache::Get and Table::Get when file/block in which
98 // key may exist are not there in TableCache/BlockCache respectively. In this
99 // case we can't guarantee that key does not exist and are not permitted to do
100 // IO to be certain.Set the status=kFound and value_found=false to let the
101 // caller know that key may exist but is not there in memory
102 void GetContext::MarkKeyMayExist() {
104 if (value_found_
!= nullptr) {
105 *value_found_
= false;
109 void GetContext::SaveValue(const Slice
& value
, SequenceNumber
/*seq*/) {
110 assert(state_
== kNotFound
);
111 appendToReplayLog(replay_log_
, kTypeValue
, value
);
114 if (LIKELY(pinnable_val_
!= nullptr)) {
115 pinnable_val_
->PinSelf(value
);
119 void GetContext::ReportCounters() {
120 if (get_context_stats_
.num_cache_hit
> 0) {
121 RecordTick(statistics_
, BLOCK_CACHE_HIT
, get_context_stats_
.num_cache_hit
);
123 if (get_context_stats_
.num_cache_index_hit
> 0) {
124 RecordTick(statistics_
, BLOCK_CACHE_INDEX_HIT
,
125 get_context_stats_
.num_cache_index_hit
);
127 if (get_context_stats_
.num_cache_data_hit
> 0) {
128 RecordTick(statistics_
, BLOCK_CACHE_DATA_HIT
,
129 get_context_stats_
.num_cache_data_hit
);
131 if (get_context_stats_
.num_cache_filter_hit
> 0) {
132 RecordTick(statistics_
, BLOCK_CACHE_FILTER_HIT
,
133 get_context_stats_
.num_cache_filter_hit
);
135 if (get_context_stats_
.num_cache_compression_dict_hit
> 0) {
136 RecordTick(statistics_
, BLOCK_CACHE_COMPRESSION_DICT_HIT
,
137 get_context_stats_
.num_cache_compression_dict_hit
);
139 if (get_context_stats_
.num_cache_index_miss
> 0) {
140 RecordTick(statistics_
, BLOCK_CACHE_INDEX_MISS
,
141 get_context_stats_
.num_cache_index_miss
);
143 if (get_context_stats_
.num_cache_filter_miss
> 0) {
144 RecordTick(statistics_
, BLOCK_CACHE_FILTER_MISS
,
145 get_context_stats_
.num_cache_filter_miss
);
147 if (get_context_stats_
.num_cache_data_miss
> 0) {
148 RecordTick(statistics_
, BLOCK_CACHE_DATA_MISS
,
149 get_context_stats_
.num_cache_data_miss
);
151 if (get_context_stats_
.num_cache_compression_dict_miss
> 0) {
152 RecordTick(statistics_
, BLOCK_CACHE_COMPRESSION_DICT_MISS
,
153 get_context_stats_
.num_cache_compression_dict_miss
);
155 if (get_context_stats_
.num_cache_bytes_read
> 0) {
156 RecordTick(statistics_
, BLOCK_CACHE_BYTES_READ
,
157 get_context_stats_
.num_cache_bytes_read
);
159 if (get_context_stats_
.num_cache_miss
> 0) {
160 RecordTick(statistics_
, BLOCK_CACHE_MISS
,
161 get_context_stats_
.num_cache_miss
);
163 if (get_context_stats_
.num_cache_add
> 0) {
164 RecordTick(statistics_
, BLOCK_CACHE_ADD
, get_context_stats_
.num_cache_add
);
166 if (get_context_stats_
.num_cache_add_redundant
> 0) {
167 RecordTick(statistics_
, BLOCK_CACHE_ADD_REDUNDANT
,
168 get_context_stats_
.num_cache_add_redundant
);
170 if (get_context_stats_
.num_cache_bytes_write
> 0) {
171 RecordTick(statistics_
, BLOCK_CACHE_BYTES_WRITE
,
172 get_context_stats_
.num_cache_bytes_write
);
174 if (get_context_stats_
.num_cache_index_add
> 0) {
175 RecordTick(statistics_
, BLOCK_CACHE_INDEX_ADD
,
176 get_context_stats_
.num_cache_index_add
);
178 if (get_context_stats_
.num_cache_index_add_redundant
> 0) {
179 RecordTick(statistics_
, BLOCK_CACHE_INDEX_ADD_REDUNDANT
,
180 get_context_stats_
.num_cache_index_add_redundant
);
182 if (get_context_stats_
.num_cache_index_bytes_insert
> 0) {
183 RecordTick(statistics_
, BLOCK_CACHE_INDEX_BYTES_INSERT
,
184 get_context_stats_
.num_cache_index_bytes_insert
);
186 if (get_context_stats_
.num_cache_data_add
> 0) {
187 RecordTick(statistics_
, BLOCK_CACHE_DATA_ADD
,
188 get_context_stats_
.num_cache_data_add
);
190 if (get_context_stats_
.num_cache_data_add_redundant
> 0) {
191 RecordTick(statistics_
, BLOCK_CACHE_DATA_ADD_REDUNDANT
,
192 get_context_stats_
.num_cache_data_add_redundant
);
194 if (get_context_stats_
.num_cache_data_bytes_insert
> 0) {
195 RecordTick(statistics_
, BLOCK_CACHE_DATA_BYTES_INSERT
,
196 get_context_stats_
.num_cache_data_bytes_insert
);
198 if (get_context_stats_
.num_cache_filter_add
> 0) {
199 RecordTick(statistics_
, BLOCK_CACHE_FILTER_ADD
,
200 get_context_stats_
.num_cache_filter_add
);
202 if (get_context_stats_
.num_cache_filter_add_redundant
> 0) {
203 RecordTick(statistics_
, BLOCK_CACHE_FILTER_ADD_REDUNDANT
,
204 get_context_stats_
.num_cache_filter_add_redundant
);
206 if (get_context_stats_
.num_cache_filter_bytes_insert
> 0) {
207 RecordTick(statistics_
, BLOCK_CACHE_FILTER_BYTES_INSERT
,
208 get_context_stats_
.num_cache_filter_bytes_insert
);
210 if (get_context_stats_
.num_cache_compression_dict_add
> 0) {
211 RecordTick(statistics_
, BLOCK_CACHE_COMPRESSION_DICT_ADD
,
212 get_context_stats_
.num_cache_compression_dict_add
);
214 if (get_context_stats_
.num_cache_compression_dict_add_redundant
> 0) {
215 RecordTick(statistics_
, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT
,
216 get_context_stats_
.num_cache_compression_dict_add_redundant
);
218 if (get_context_stats_
.num_cache_compression_dict_bytes_insert
> 0) {
219 RecordTick(statistics_
, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT
,
220 get_context_stats_
.num_cache_compression_dict_bytes_insert
);
224 bool GetContext::SaveValue(const ParsedInternalKey
& parsed_key
,
225 const Slice
& value
, bool* matched
,
226 Cleanable
* value_pinner
) {
228 assert((state_
!= kMerge
&& parsed_key
.type
!= kTypeMerge
) ||
229 merge_context_
!= nullptr);
230 if (ucmp_
->EqualWithoutTimestamp(parsed_key
.user_key
, user_key_
)) {
232 // If the value is not in the snapshot, skip it
233 if (!CheckCallback(parsed_key
.sequence
)) {
234 return true; // to continue to the next seq
237 appendToReplayLog(replay_log_
, parsed_key
.type
, value
);
239 if (seq_
!= nullptr) {
240 // Set the sequence number if it is uninitialized
241 if (*seq_
== kMaxSequenceNumber
) {
242 *seq_
= parsed_key
.sequence
;
244 if (max_covering_tombstone_seq_
) {
245 *seq_
= std::max(*seq_
, *max_covering_tombstone_seq_
);
249 size_t ts_sz
= ucmp_
->timestamp_size();
250 if (ts_sz
> 0 && timestamp_
!= nullptr) {
251 if (!timestamp_
->empty()) {
252 assert(ts_sz
== timestamp_
->size());
253 // `timestamp` can be set before `SaveValue` is ever called
254 // when max_covering_tombstone_seq_ was set.
255 // If this key has a higher sequence number than range tombstone,
256 // then timestamp should be updated. `ts_from_rangetombstone_` is
257 // set to false afterwards so that only the key with highest seqno
258 // updates the timestamp.
259 if (ts_from_rangetombstone_
) {
260 assert(max_covering_tombstone_seq_
);
261 if (parsed_key
.sequence
> *max_covering_tombstone_seq_
) {
262 Slice ts
= ExtractTimestampFromUserKey(parsed_key
.user_key
, ts_sz
);
263 timestamp_
->assign(ts
.data(), ts
.size());
264 ts_from_rangetombstone_
= false;
268 // TODO optimize for small size ts
269 const std::string
kMaxTs(ts_sz
, '\xff');
270 if (timestamp_
->empty() ||
271 ucmp_
->CompareTimestamp(*timestamp_
, kMaxTs
) == 0) {
272 Slice ts
= ExtractTimestampFromUserKey(parsed_key
.user_key
, ts_sz
);
273 timestamp_
->assign(ts
.data(), ts
.size());
277 auto type
= parsed_key
.type
;
278 // Key matches. Process it
279 if ((type
== kTypeValue
|| type
== kTypeMerge
|| type
== kTypeBlobIndex
||
280 type
== kTypeWideColumnEntity
|| type
== kTypeDeletion
||
281 type
== kTypeDeletionWithTimestamp
|| type
== kTypeSingleDeletion
) &&
282 max_covering_tombstone_seq_
!= nullptr &&
283 *max_covering_tombstone_seq_
> parsed_key
.sequence
) {
284 // Note that deletion types are also considered, this is for the case
285 // when we need to return timestamp to user. If a range tombstone has a
286 // higher seqno than point tombstone, its timestamp should be returned.
287 type
= kTypeRangeDeletion
;
292 case kTypeWideColumnEntity
:
293 assert(state_
== kNotFound
|| state_
== kMerge
);
294 if (type
== kTypeBlobIndex
) {
295 if (is_blob_index_
== nullptr) {
296 // Blob value not supported. Stop.
297 state_
= kUnexpectedBlobIndex
;
302 if (is_blob_index_
!= nullptr) {
303 *is_blob_index_
= (type
== kTypeBlobIndex
);
306 if (kNotFound
== state_
) {
309 if (LIKELY(pinnable_val_
!= nullptr)) {
310 Slice value_to_use
= value
;
312 if (type
== kTypeWideColumnEntity
) {
313 Slice value_copy
= value
;
315 if (!WideColumnSerialization::GetValueOfDefaultColumn(
316 value_copy
, value_to_use
)
323 if (LIKELY(value_pinner
!= nullptr)) {
324 // If the backing resources for the value are provided, pin them
325 pinnable_val_
->PinSlice(value_to_use
, value_pinner
);
327 TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
329 // Otherwise copy the value
330 pinnable_val_
->PinSelf(value_to_use
);
332 } else if (columns_
!= nullptr) {
333 if (type
== kTypeWideColumnEntity
) {
334 if (!columns_
->SetWideColumnValue(value
, value_pinner
).ok()) {
339 columns_
->SetPlainValue(value
, value_pinner
);
343 // It means this function is called as part of DB GetMergeOperands
344 // API and the current value should be part of
345 // merge_context_->operand_list
346 if (type
== kTypeBlobIndex
) {
347 PinnableSlice pin_val
;
348 if (GetBlobValue(value
, &pin_val
) == false) {
351 Slice
blob_value(pin_val
);
352 push_operand(blob_value
, nullptr);
353 } else if (type
== kTypeWideColumnEntity
) {
354 Slice value_copy
= value
;
355 Slice value_of_default
;
357 if (!WideColumnSerialization::GetValueOfDefaultColumn(
358 value_copy
, value_of_default
)
364 push_operand(value_of_default
, value_pinner
);
366 assert(type
== kTypeValue
);
367 push_operand(value
, value_pinner
);
370 } else if (kMerge
== state_
) {
371 assert(merge_operator_
!= nullptr);
372 if (type
== kTypeBlobIndex
) {
373 PinnableSlice pin_val
;
374 if (GetBlobValue(value
, &pin_val
) == false) {
377 Slice
blob_value(pin_val
);
382 // It means this function is called as part of DB GetMergeOperands
383 // API and the current value should be part of
384 // merge_context_->operand_list
385 push_operand(blob_value
, nullptr);
387 } else if (type
== kTypeWideColumnEntity
) {
391 MergeWithEntity(value
);
393 // It means this function is called as part of DB GetMergeOperands
394 // API and the current value should be part of
395 // merge_context_->operand_list
396 Slice value_copy
= value
;
397 Slice value_of_default
;
399 if (!WideColumnSerialization::GetValueOfDefaultColumn(
400 value_copy
, value_of_default
)
406 push_operand(value_of_default
, value_pinner
);
409 assert(type
== kTypeValue
);
415 // It means this function is called as part of DB GetMergeOperands
416 // API and the current value should be part of
417 // merge_context_->operand_list
418 push_operand(value
, value_pinner
);
425 case kTypeDeletionWithTimestamp
:
426 case kTypeSingleDeletion
:
427 case kTypeRangeDeletion
:
428 // TODO(noetzli): Verify correctness once merge of single-deletes
430 assert(state_
== kNotFound
|| state_
== kMerge
);
431 if (kNotFound
== state_
) {
433 } else if (kMerge
== state_
) {
438 // If do_merge_ = false then the current value shouldn't be part of
439 // merge_context_->operand_list
444 assert(state_
== kNotFound
|| state_
== kMerge
);
446 // value_pinner is not set from plain_table_reader.cc for example.
447 push_operand(value
, value_pinner
);
448 if (do_merge_
&& merge_operator_
!= nullptr &&
449 merge_operator_
->ShouldMerge(
450 merge_context_
->GetOperandsDirectionBackward())) {
463 // state_ could be Corrupt, merge or notfound
467 void GetContext::Merge(const Slice
* value
) {
469 assert(!pinnable_val_
|| !columns_
);
472 const Status s
= MergeHelper::TimedFullMerge(
473 merge_operator_
, user_key_
, value
, merge_context_
->GetOperands(), &result
,
474 logger_
, statistics_
, clock_
, /* result_operand */ nullptr,
475 /* update_num_ops_stats */ true);
481 if (LIKELY(pinnable_val_
!= nullptr)) {
482 *(pinnable_val_
->GetSelf()) = std::move(result
);
483 pinnable_val_
->PinSelf();
488 columns_
->SetPlainValue(result
);
491 void GetContext::MergeWithEntity(Slice entity
) {
493 assert(!pinnable_val_
|| !columns_
);
495 if (LIKELY(pinnable_val_
!= nullptr)) {
496 Slice value_of_default
;
499 const Status s
= WideColumnSerialization::GetValueOfDefaultColumn(
500 entity
, value_of_default
);
508 const Status s
= MergeHelper::TimedFullMerge(
509 merge_operator_
, user_key_
, &value_of_default
,
510 merge_context_
->GetOperands(), pinnable_val_
->GetSelf(), logger_
,
511 statistics_
, clock_
, /* result_operand */ nullptr,
512 /* update_num_ops_stats */ true);
519 pinnable_val_
->PinSelf();
526 const Status s
= MergeHelper::TimedFullMergeWithEntity(
527 merge_operator_
, user_key_
, entity
, merge_context_
->GetOperands(),
528 &result
, logger_
, statistics_
, clock_
, /* update_num_ops_stats */ true);
537 const Status s
= columns_
->SetWideColumnValue(result
);
545 bool GetContext::GetBlobValue(const Slice
& blob_index
,
546 PinnableSlice
* blob_value
) {
547 constexpr FilePrefetchBuffer
* prefetch_buffer
= nullptr;
548 constexpr uint64_t* bytes_read
= nullptr;
550 Status status
= blob_fetcher_
->FetchBlob(
551 user_key_
, blob_index
, prefetch_buffer
, blob_value
, bytes_read
);
553 if (status
.IsIncomplete()) {
554 // FIXME: this code is not covered by unit tests
561 *is_blob_index_
= false;
565 void GetContext::push_operand(const Slice
& value
, Cleanable
* value_pinner
) {
566 // TODO(yanqin) preserve timestamps information in merge_context
567 if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
568 value_pinner
!= nullptr) {
569 value_pinner
->DelegateCleanupsTo(pinned_iters_mgr());
570 merge_context_
->PushOperand(value
, true /*value_pinned*/);
572 merge_context_
->PushOperand(value
, false);
576 void replayGetContextLog(const Slice
& replay_log
, const Slice
& user_key
,
577 GetContext
* get_context
, Cleanable
* value_pinner
) {
579 Slice s
= replay_log
;
581 auto type
= static_cast<ValueType
>(*s
.data());
584 bool ret
= GetLengthPrefixedSlice(&s
, &value
);
588 bool dont_care
__attribute__((__unused__
));
589 // Since SequenceNumber is not stored and unknown, we will use
590 // kMaxSequenceNumber.
591 get_context
->SaveValue(
592 ParsedInternalKey(user_key
, kMaxSequenceNumber
, type
), value
,
593 &dont_care
, value_pinner
);
595 #else // ROCKSDB_LITE
601 #endif // ROCKSDB_LITE
604 } // namespace ROCKSDB_NAMESPACE