1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "table/get_context.h"
7 #include "db/merge_helper.h"
8 #include "db/pinned_iterators_manager.h"
9 #include "db/read_callback.h"
10 #include "monitoring/file_read_sample.h"
11 #include "monitoring/perf_context_imp.h"
12 #include "monitoring/statistics.h"
13 #include "rocksdb/env.h"
14 #include "rocksdb/merge_operator.h"
15 #include "rocksdb/statistics.h"
17 namespace ROCKSDB_NAMESPACE
{
21 void appendToReplayLog(std::string
* replay_log
, ValueType type
, Slice value
) {
24 if (replay_log
->empty()) {
25 // Optimization: in the common case of only one operation in the
26 // log, we allocate the exact amount of space needed.
27 replay_log
->reserve(1 + VarintLength(value
.size()) + value
.size());
29 replay_log
->push_back(type
);
30 PutLengthPrefixedSlice(replay_log
, value
);
36 #endif // ROCKSDB_LITE
41 GetContext::GetContext(
42 const Comparator
* ucmp
, const MergeOperator
* merge_operator
, Logger
* logger
,
43 Statistics
* statistics
, GetState init_state
, const Slice
& user_key
,
44 PinnableSlice
* pinnable_val
, std::string
* timestamp
, bool* value_found
,
45 MergeContext
* merge_context
, bool do_merge
,
46 SequenceNumber
* _max_covering_tombstone_seq
, Env
* env
, SequenceNumber
* seq
,
47 PinnedIteratorsManager
* _pinned_iters_mgr
, ReadCallback
* callback
,
48 bool* is_blob_index
, uint64_t tracing_get_id
)
50 merge_operator_(merge_operator
),
52 statistics_(statistics
),
55 pinnable_val_(pinnable_val
),
56 timestamp_(timestamp
),
57 value_found_(value_found
),
58 merge_context_(merge_context
),
59 max_covering_tombstone_seq_(_max_covering_tombstone_seq
),
63 pinned_iters_mgr_(_pinned_iters_mgr
),
66 is_blob_index_(is_blob_index
),
67 tracing_get_id_(tracing_get_id
) {
69 *seq_
= kMaxSequenceNumber
;
71 sample_
= should_sample_file_read();
74 GetContext::GetContext(
75 const Comparator
* ucmp
, const MergeOperator
* merge_operator
, Logger
* logger
,
76 Statistics
* statistics
, GetState init_state
, const Slice
& user_key
,
77 PinnableSlice
* pinnable_val
, bool* value_found
, MergeContext
* merge_context
,
78 bool do_merge
, SequenceNumber
* _max_covering_tombstone_seq
, Env
* env
,
79 SequenceNumber
* seq
, PinnedIteratorsManager
* _pinned_iters_mgr
,
80 ReadCallback
* callback
, bool* is_blob_index
, uint64_t tracing_get_id
)
81 : GetContext(ucmp
, merge_operator
, logger
, statistics
, init_state
, user_key
,
82 pinnable_val
, nullptr, value_found
, merge_context
, do_merge
,
83 _max_covering_tombstone_seq
, env
, seq
, _pinned_iters_mgr
,
84 callback
, is_blob_index
, tracing_get_id
) {}
86 // Called from TableCache::Get and Table::Get when file/block in which
87 // key may exist are not there in TableCache/BlockCache respectively. In this
88 // case we can't guarantee that key does not exist and are not permitted to do
89 // IO to be certain.Set the status=kFound and value_found=false to let the
90 // caller know that key may exist but is not there in memory
91 void GetContext::MarkKeyMayExist() {
93 if (value_found_
!= nullptr) {
94 *value_found_
= false;
98 void GetContext::SaveValue(const Slice
& value
, SequenceNumber
/*seq*/) {
99 assert(state_
== kNotFound
);
100 appendToReplayLog(replay_log_
, kTypeValue
, value
);
103 if (LIKELY(pinnable_val_
!= nullptr)) {
104 pinnable_val_
->PinSelf(value
);
108 void GetContext::ReportCounters() {
109 if (get_context_stats_
.num_cache_hit
> 0) {
110 RecordTick(statistics_
, BLOCK_CACHE_HIT
, get_context_stats_
.num_cache_hit
);
112 if (get_context_stats_
.num_cache_index_hit
> 0) {
113 RecordTick(statistics_
, BLOCK_CACHE_INDEX_HIT
,
114 get_context_stats_
.num_cache_index_hit
);
116 if (get_context_stats_
.num_cache_data_hit
> 0) {
117 RecordTick(statistics_
, BLOCK_CACHE_DATA_HIT
,
118 get_context_stats_
.num_cache_data_hit
);
120 if (get_context_stats_
.num_cache_filter_hit
> 0) {
121 RecordTick(statistics_
, BLOCK_CACHE_FILTER_HIT
,
122 get_context_stats_
.num_cache_filter_hit
);
124 if (get_context_stats_
.num_cache_compression_dict_hit
> 0) {
125 RecordTick(statistics_
, BLOCK_CACHE_COMPRESSION_DICT_HIT
,
126 get_context_stats_
.num_cache_compression_dict_hit
);
128 if (get_context_stats_
.num_cache_index_miss
> 0) {
129 RecordTick(statistics_
, BLOCK_CACHE_INDEX_MISS
,
130 get_context_stats_
.num_cache_index_miss
);
132 if (get_context_stats_
.num_cache_filter_miss
> 0) {
133 RecordTick(statistics_
, BLOCK_CACHE_FILTER_MISS
,
134 get_context_stats_
.num_cache_filter_miss
);
136 if (get_context_stats_
.num_cache_data_miss
> 0) {
137 RecordTick(statistics_
, BLOCK_CACHE_DATA_MISS
,
138 get_context_stats_
.num_cache_data_miss
);
140 if (get_context_stats_
.num_cache_compression_dict_miss
> 0) {
141 RecordTick(statistics_
, BLOCK_CACHE_COMPRESSION_DICT_MISS
,
142 get_context_stats_
.num_cache_compression_dict_miss
);
144 if (get_context_stats_
.num_cache_bytes_read
> 0) {
145 RecordTick(statistics_
, BLOCK_CACHE_BYTES_READ
,
146 get_context_stats_
.num_cache_bytes_read
);
148 if (get_context_stats_
.num_cache_miss
> 0) {
149 RecordTick(statistics_
, BLOCK_CACHE_MISS
,
150 get_context_stats_
.num_cache_miss
);
152 if (get_context_stats_
.num_cache_add
> 0) {
153 RecordTick(statistics_
, BLOCK_CACHE_ADD
, get_context_stats_
.num_cache_add
);
155 if (get_context_stats_
.num_cache_add_redundant
> 0) {
156 RecordTick(statistics_
, BLOCK_CACHE_ADD_REDUNDANT
,
157 get_context_stats_
.num_cache_add_redundant
);
159 if (get_context_stats_
.num_cache_bytes_write
> 0) {
160 RecordTick(statistics_
, BLOCK_CACHE_BYTES_WRITE
,
161 get_context_stats_
.num_cache_bytes_write
);
163 if (get_context_stats_
.num_cache_index_add
> 0) {
164 RecordTick(statistics_
, BLOCK_CACHE_INDEX_ADD
,
165 get_context_stats_
.num_cache_index_add
);
167 if (get_context_stats_
.num_cache_index_add_redundant
> 0) {
168 RecordTick(statistics_
, BLOCK_CACHE_INDEX_ADD_REDUNDANT
,
169 get_context_stats_
.num_cache_index_add_redundant
);
171 if (get_context_stats_
.num_cache_index_bytes_insert
> 0) {
172 RecordTick(statistics_
, BLOCK_CACHE_INDEX_BYTES_INSERT
,
173 get_context_stats_
.num_cache_index_bytes_insert
);
175 if (get_context_stats_
.num_cache_data_add
> 0) {
176 RecordTick(statistics_
, BLOCK_CACHE_DATA_ADD
,
177 get_context_stats_
.num_cache_data_add
);
179 if (get_context_stats_
.num_cache_data_add_redundant
> 0) {
180 RecordTick(statistics_
, BLOCK_CACHE_DATA_ADD_REDUNDANT
,
181 get_context_stats_
.num_cache_data_add_redundant
);
183 if (get_context_stats_
.num_cache_data_bytes_insert
> 0) {
184 RecordTick(statistics_
, BLOCK_CACHE_DATA_BYTES_INSERT
,
185 get_context_stats_
.num_cache_data_bytes_insert
);
187 if (get_context_stats_
.num_cache_filter_add
> 0) {
188 RecordTick(statistics_
, BLOCK_CACHE_FILTER_ADD
,
189 get_context_stats_
.num_cache_filter_add
);
191 if (get_context_stats_
.num_cache_filter_add_redundant
> 0) {
192 RecordTick(statistics_
, BLOCK_CACHE_FILTER_ADD_REDUNDANT
,
193 get_context_stats_
.num_cache_filter_add_redundant
);
195 if (get_context_stats_
.num_cache_filter_bytes_insert
> 0) {
196 RecordTick(statistics_
, BLOCK_CACHE_FILTER_BYTES_INSERT
,
197 get_context_stats_
.num_cache_filter_bytes_insert
);
199 if (get_context_stats_
.num_cache_compression_dict_add
> 0) {
200 RecordTick(statistics_
, BLOCK_CACHE_COMPRESSION_DICT_ADD
,
201 get_context_stats_
.num_cache_compression_dict_add
);
203 if (get_context_stats_
.num_cache_compression_dict_add_redundant
> 0) {
204 RecordTick(statistics_
, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT
,
205 get_context_stats_
.num_cache_compression_dict_add_redundant
);
207 if (get_context_stats_
.num_cache_compression_dict_bytes_insert
> 0) {
208 RecordTick(statistics_
, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT
,
209 get_context_stats_
.num_cache_compression_dict_bytes_insert
);
213 bool GetContext::SaveValue(const ParsedInternalKey
& parsed_key
,
214 const Slice
& value
, bool* matched
,
215 Cleanable
* value_pinner
) {
217 assert((state_
!= kMerge
&& parsed_key
.type
!= kTypeMerge
) ||
218 merge_context_
!= nullptr);
219 if (ucmp_
->CompareWithoutTimestamp(parsed_key
.user_key
, user_key_
) == 0) {
221 // If the value is not in the snapshot, skip it
222 if (!CheckCallback(parsed_key
.sequence
)) {
223 return true; // to continue to the next seq
226 appendToReplayLog(replay_log_
, parsed_key
.type
, value
);
228 if (seq_
!= nullptr) {
229 // Set the sequence number if it is uninitialized
230 if (*seq_
== kMaxSequenceNumber
) {
231 *seq_
= parsed_key
.sequence
;
235 auto type
= parsed_key
.type
;
236 // Key matches. Process it
237 if ((type
== kTypeValue
|| type
== kTypeMerge
|| type
== kTypeBlobIndex
) &&
238 max_covering_tombstone_seq_
!= nullptr &&
239 *max_covering_tombstone_seq_
> parsed_key
.sequence
) {
240 type
= kTypeRangeDeletion
;
245 assert(state_
== kNotFound
|| state_
== kMerge
);
246 if (type
== kTypeBlobIndex
&& is_blob_index_
== nullptr) {
247 // Blob value not supported. Stop.
248 state_
= kUnexpectedBlobIndex
;
251 if (kNotFound
== state_
) {
254 if (LIKELY(pinnable_val_
!= nullptr)) {
255 if (LIKELY(value_pinner
!= nullptr)) {
256 // If the backing resources for the value are provided, pin them
257 pinnable_val_
->PinSlice(value
, value_pinner
);
259 TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
262 // Otherwise copy the value
263 pinnable_val_
->PinSelf(value
);
267 // It means this function is called as part of DB GetMergeOperands
268 // API and the current value should be part of
269 // merge_context_->operand_list
270 push_operand(value
, value_pinner
);
272 } else if (kMerge
== state_
) {
273 assert(merge_operator_
!= nullptr);
276 if (LIKELY(pinnable_val_
!= nullptr)) {
277 Status merge_status
= MergeHelper::TimedFullMerge(
278 merge_operator_
, user_key_
, &value
,
279 merge_context_
->GetOperands(), pinnable_val_
->GetSelf(),
280 logger_
, statistics_
, env_
);
281 pinnable_val_
->PinSelf();
282 if (!merge_status
.ok()) {
287 // It means this function is called as part of DB GetMergeOperands
288 // API and the current value should be part of
289 // merge_context_->operand_list
290 push_operand(value
, value_pinner
);
293 if (state_
== kFound
) {
294 size_t ts_sz
= ucmp_
->timestamp_size();
295 if (ts_sz
> 0 && timestamp_
!= nullptr) {
296 Slice ts
= ExtractTimestampFromUserKey(parsed_key
.user_key
, ts_sz
);
297 timestamp_
->assign(ts
.data(), ts
.size());
300 if (is_blob_index_
!= nullptr) {
301 *is_blob_index_
= (type
== kTypeBlobIndex
);
306 case kTypeDeletionWithTimestamp
:
307 case kTypeSingleDeletion
:
308 case kTypeRangeDeletion
:
309 // TODO(noetzli): Verify correctness once merge of single-deletes
311 assert(state_
== kNotFound
|| state_
== kMerge
);
312 if (kNotFound
== state_
) {
314 } else if (kMerge
== state_
) {
316 if (LIKELY(pinnable_val_
!= nullptr)) {
318 Status merge_status
= MergeHelper::TimedFullMerge(
319 merge_operator_
, user_key_
, nullptr,
320 merge_context_
->GetOperands(), pinnable_val_
->GetSelf(),
321 logger_
, statistics_
, env_
);
322 pinnable_val_
->PinSelf();
323 if (!merge_status
.ok()) {
327 // If do_merge_ = false then the current value shouldn't be part of
328 // merge_context_->operand_list
334 assert(state_
== kNotFound
|| state_
== kMerge
);
336 // value_pinner is not set from plain_table_reader.cc for example.
337 push_operand(value
, value_pinner
);
338 if (do_merge_
&& merge_operator_
!= nullptr &&
339 merge_operator_
->ShouldMerge(
340 merge_context_
->GetOperandsDirectionBackward())) {
342 if (LIKELY(pinnable_val_
!= nullptr)) {
343 // do_merge_ = true this is the case where this function is called
344 // as part of DB Get API hence merge operators should be merged.
346 Status merge_status
= MergeHelper::TimedFullMerge(
347 merge_operator_
, user_key_
, nullptr,
348 merge_context_
->GetOperands(), pinnable_val_
->GetSelf(),
349 logger_
, statistics_
, env_
);
350 pinnable_val_
->PinSelf();
351 if (!merge_status
.ok()) {
366 // state_ could be Corrupt, merge or notfound
370 void GetContext::push_operand(const Slice
& value
, Cleanable
* value_pinner
) {
371 if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
372 value_pinner
!= nullptr) {
373 value_pinner
->DelegateCleanupsTo(pinned_iters_mgr());
374 merge_context_
->PushOperand(value
, true /*value_pinned*/);
376 merge_context_
->PushOperand(value
, false);
380 void replayGetContextLog(const Slice
& replay_log
, const Slice
& user_key
,
381 GetContext
* get_context
, Cleanable
* value_pinner
) {
383 Slice s
= replay_log
;
385 auto type
= static_cast<ValueType
>(*s
.data());
388 bool ret
= GetLengthPrefixedSlice(&s
, &value
);
392 bool dont_care
__attribute__((__unused__
));
393 // Since SequenceNumber is not stored and unknown, we will use
394 // kMaxSequenceNumber.
395 get_context
->SaveValue(
396 ParsedInternalKey(user_key
, kMaxSequenceNumber
, type
), value
,
397 &dont_care
, value_pinner
);
399 #else // ROCKSDB_LITE
405 #endif // ROCKSDB_LITE
408 } // namespace ROCKSDB_NAMESPACE