]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/table/get_context.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / table / get_context.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "table/get_context.h"
7
8 #include "db/blob//blob_fetcher.h"
9 #include "db/merge_helper.h"
10 #include "db/pinned_iterators_manager.h"
11 #include "db/read_callback.h"
12 #include "db/wide/wide_column_serialization.h"
13 #include "monitoring/file_read_sample.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "monitoring/statistics.h"
16 #include "rocksdb/merge_operator.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/system_clock.h"
19
20 namespace ROCKSDB_NAMESPACE {
21
22 namespace {
23
24 void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
25 #ifndef ROCKSDB_LITE
26 if (replay_log) {
27 if (replay_log->empty()) {
28 // Optimization: in the common case of only one operation in the
29 // log, we allocate the exact amount of space needed.
30 replay_log->reserve(1 + VarintLength(value.size()) + value.size());
31 }
32 replay_log->push_back(type);
33 PutLengthPrefixedSlice(replay_log, value);
34 }
35 #else
36 (void)replay_log;
37 (void)type;
38 (void)value;
39 #endif // ROCKSDB_LITE
40 }
41
42 } // namespace
43
44 GetContext::GetContext(
45 const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
46 Statistics* statistics, GetState init_state, const Slice& user_key,
47 PinnableSlice* pinnable_val, PinnableWideColumns* columns,
48 std::string* timestamp, bool* value_found, MergeContext* merge_context,
49 bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
50 SystemClock* clock, SequenceNumber* seq,
51 PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
52 bool* is_blob_index, uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
53 : ucmp_(ucmp),
54 merge_operator_(merge_operator),
55 logger_(logger),
56 statistics_(statistics),
57 state_(init_state),
58 user_key_(user_key),
59 pinnable_val_(pinnable_val),
60 columns_(columns),
61 timestamp_(timestamp),
62 value_found_(value_found),
63 merge_context_(merge_context),
64 max_covering_tombstone_seq_(_max_covering_tombstone_seq),
65 clock_(clock),
66 seq_(seq),
67 replay_log_(nullptr),
68 pinned_iters_mgr_(_pinned_iters_mgr),
69 callback_(callback),
70 do_merge_(do_merge),
71 is_blob_index_(is_blob_index),
72 tracing_get_id_(tracing_get_id),
73 blob_fetcher_(blob_fetcher) {
74 if (seq_) {
75 *seq_ = kMaxSequenceNumber;
76 }
77 sample_ = should_sample_file_read();
78 }
79
80 GetContext::GetContext(const Comparator* ucmp,
81 const MergeOperator* merge_operator, Logger* logger,
82 Statistics* statistics, GetState init_state,
83 const Slice& user_key, PinnableSlice* pinnable_val,
84 PinnableWideColumns* columns, bool* value_found,
85 MergeContext* merge_context, bool do_merge,
86 SequenceNumber* _max_covering_tombstone_seq,
87 SystemClock* clock, SequenceNumber* seq,
88 PinnedIteratorsManager* _pinned_iters_mgr,
89 ReadCallback* callback, bool* is_blob_index,
90 uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
91 : GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
92 pinnable_val, columns, /*timestamp=*/nullptr, value_found,
93 merge_context, do_merge, _max_covering_tombstone_seq, clock,
94 seq, _pinned_iters_mgr, callback, is_blob_index,
95 tracing_get_id, blob_fetcher) {}
96
97 // Called from TableCache::Get and Table::Get when file/block in which
98 // key may exist are not there in TableCache/BlockCache respectively. In this
99 // case we can't guarantee that key does not exist and are not permitted to do
100 // IO to be certain.Set the status=kFound and value_found=false to let the
101 // caller know that key may exist but is not there in memory
102 void GetContext::MarkKeyMayExist() {
103 state_ = kFound;
104 if (value_found_ != nullptr) {
105 *value_found_ = false;
106 }
107 }
108
109 void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) {
110 assert(state_ == kNotFound);
111 appendToReplayLog(replay_log_, kTypeValue, value);
112
113 state_ = kFound;
114 if (LIKELY(pinnable_val_ != nullptr)) {
115 pinnable_val_->PinSelf(value);
116 }
117 }
118
119 void GetContext::ReportCounters() {
120 if (get_context_stats_.num_cache_hit > 0) {
121 RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit);
122 }
123 if (get_context_stats_.num_cache_index_hit > 0) {
124 RecordTick(statistics_, BLOCK_CACHE_INDEX_HIT,
125 get_context_stats_.num_cache_index_hit);
126 }
127 if (get_context_stats_.num_cache_data_hit > 0) {
128 RecordTick(statistics_, BLOCK_CACHE_DATA_HIT,
129 get_context_stats_.num_cache_data_hit);
130 }
131 if (get_context_stats_.num_cache_filter_hit > 0) {
132 RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT,
133 get_context_stats_.num_cache_filter_hit);
134 }
135 if (get_context_stats_.num_cache_compression_dict_hit > 0) {
136 RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT,
137 get_context_stats_.num_cache_compression_dict_hit);
138 }
139 if (get_context_stats_.num_cache_index_miss > 0) {
140 RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS,
141 get_context_stats_.num_cache_index_miss);
142 }
143 if (get_context_stats_.num_cache_filter_miss > 0) {
144 RecordTick(statistics_, BLOCK_CACHE_FILTER_MISS,
145 get_context_stats_.num_cache_filter_miss);
146 }
147 if (get_context_stats_.num_cache_data_miss > 0) {
148 RecordTick(statistics_, BLOCK_CACHE_DATA_MISS,
149 get_context_stats_.num_cache_data_miss);
150 }
151 if (get_context_stats_.num_cache_compression_dict_miss > 0) {
152 RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS,
153 get_context_stats_.num_cache_compression_dict_miss);
154 }
155 if (get_context_stats_.num_cache_bytes_read > 0) {
156 RecordTick(statistics_, BLOCK_CACHE_BYTES_READ,
157 get_context_stats_.num_cache_bytes_read);
158 }
159 if (get_context_stats_.num_cache_miss > 0) {
160 RecordTick(statistics_, BLOCK_CACHE_MISS,
161 get_context_stats_.num_cache_miss);
162 }
163 if (get_context_stats_.num_cache_add > 0) {
164 RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add);
165 }
166 if (get_context_stats_.num_cache_add_redundant > 0) {
167 RecordTick(statistics_, BLOCK_CACHE_ADD_REDUNDANT,
168 get_context_stats_.num_cache_add_redundant);
169 }
170 if (get_context_stats_.num_cache_bytes_write > 0) {
171 RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE,
172 get_context_stats_.num_cache_bytes_write);
173 }
174 if (get_context_stats_.num_cache_index_add > 0) {
175 RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD,
176 get_context_stats_.num_cache_index_add);
177 }
178 if (get_context_stats_.num_cache_index_add_redundant > 0) {
179 RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD_REDUNDANT,
180 get_context_stats_.num_cache_index_add_redundant);
181 }
182 if (get_context_stats_.num_cache_index_bytes_insert > 0) {
183 RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT,
184 get_context_stats_.num_cache_index_bytes_insert);
185 }
186 if (get_context_stats_.num_cache_data_add > 0) {
187 RecordTick(statistics_, BLOCK_CACHE_DATA_ADD,
188 get_context_stats_.num_cache_data_add);
189 }
190 if (get_context_stats_.num_cache_data_add_redundant > 0) {
191 RecordTick(statistics_, BLOCK_CACHE_DATA_ADD_REDUNDANT,
192 get_context_stats_.num_cache_data_add_redundant);
193 }
194 if (get_context_stats_.num_cache_data_bytes_insert > 0) {
195 RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT,
196 get_context_stats_.num_cache_data_bytes_insert);
197 }
198 if (get_context_stats_.num_cache_filter_add > 0) {
199 RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD,
200 get_context_stats_.num_cache_filter_add);
201 }
202 if (get_context_stats_.num_cache_filter_add_redundant > 0) {
203 RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD_REDUNDANT,
204 get_context_stats_.num_cache_filter_add_redundant);
205 }
206 if (get_context_stats_.num_cache_filter_bytes_insert > 0) {
207 RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
208 get_context_stats_.num_cache_filter_bytes_insert);
209 }
210 if (get_context_stats_.num_cache_compression_dict_add > 0) {
211 RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
212 get_context_stats_.num_cache_compression_dict_add);
213 }
214 if (get_context_stats_.num_cache_compression_dict_add_redundant > 0) {
215 RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
216 get_context_stats_.num_cache_compression_dict_add_redundant);
217 }
218 if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
219 RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
220 get_context_stats_.num_cache_compression_dict_bytes_insert);
221 }
222 }
223
224 bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
225 const Slice& value, bool* matched,
226 Cleanable* value_pinner) {
227 assert(matched);
228 assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
229 merge_context_ != nullptr);
230 if (ucmp_->EqualWithoutTimestamp(parsed_key.user_key, user_key_)) {
231 *matched = true;
232 // If the value is not in the snapshot, skip it
233 if (!CheckCallback(parsed_key.sequence)) {
234 return true; // to continue to the next seq
235 }
236
237 appendToReplayLog(replay_log_, parsed_key.type, value);
238
239 if (seq_ != nullptr) {
240 // Set the sequence number if it is uninitialized
241 if (*seq_ == kMaxSequenceNumber) {
242 *seq_ = parsed_key.sequence;
243 }
244 if (max_covering_tombstone_seq_) {
245 *seq_ = std::max(*seq_, *max_covering_tombstone_seq_);
246 }
247 }
248
249 size_t ts_sz = ucmp_->timestamp_size();
250 if (ts_sz > 0 && timestamp_ != nullptr) {
251 if (!timestamp_->empty()) {
252 assert(ts_sz == timestamp_->size());
253 // `timestamp` can be set before `SaveValue` is ever called
254 // when max_covering_tombstone_seq_ was set.
255 // If this key has a higher sequence number than range tombstone,
256 // then timestamp should be updated. `ts_from_rangetombstone_` is
257 // set to false afterwards so that only the key with highest seqno
258 // updates the timestamp.
259 if (ts_from_rangetombstone_) {
260 assert(max_covering_tombstone_seq_);
261 if (parsed_key.sequence > *max_covering_tombstone_seq_) {
262 Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
263 timestamp_->assign(ts.data(), ts.size());
264 ts_from_rangetombstone_ = false;
265 }
266 }
267 }
268 // TODO optimize for small size ts
269 const std::string kMaxTs(ts_sz, '\xff');
270 if (timestamp_->empty() ||
271 ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) {
272 Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
273 timestamp_->assign(ts.data(), ts.size());
274 }
275 }
276
277 auto type = parsed_key.type;
278 // Key matches. Process it
279 if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
280 type == kTypeWideColumnEntity || type == kTypeDeletion ||
281 type == kTypeDeletionWithTimestamp || type == kTypeSingleDeletion) &&
282 max_covering_tombstone_seq_ != nullptr &&
283 *max_covering_tombstone_seq_ > parsed_key.sequence) {
284 // Note that deletion types are also considered, this is for the case
285 // when we need to return timestamp to user. If a range tombstone has a
286 // higher seqno than point tombstone, its timestamp should be returned.
287 type = kTypeRangeDeletion;
288 }
289 switch (type) {
290 case kTypeValue:
291 case kTypeBlobIndex:
292 case kTypeWideColumnEntity:
293 assert(state_ == kNotFound || state_ == kMerge);
294 if (type == kTypeBlobIndex) {
295 if (is_blob_index_ == nullptr) {
296 // Blob value not supported. Stop.
297 state_ = kUnexpectedBlobIndex;
298 return false;
299 }
300 }
301
302 if (is_blob_index_ != nullptr) {
303 *is_blob_index_ = (type == kTypeBlobIndex);
304 }
305
306 if (kNotFound == state_) {
307 state_ = kFound;
308 if (do_merge_) {
309 if (LIKELY(pinnable_val_ != nullptr)) {
310 Slice value_to_use = value;
311
312 if (type == kTypeWideColumnEntity) {
313 Slice value_copy = value;
314
315 if (!WideColumnSerialization::GetValueOfDefaultColumn(
316 value_copy, value_to_use)
317 .ok()) {
318 state_ = kCorrupt;
319 return false;
320 }
321 }
322
323 if (LIKELY(value_pinner != nullptr)) {
324 // If the backing resources for the value are provided, pin them
325 pinnable_val_->PinSlice(value_to_use, value_pinner);
326 } else {
327 TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
328 this);
329 // Otherwise copy the value
330 pinnable_val_->PinSelf(value_to_use);
331 }
332 } else if (columns_ != nullptr) {
333 if (type == kTypeWideColumnEntity) {
334 if (!columns_->SetWideColumnValue(value, value_pinner).ok()) {
335 state_ = kCorrupt;
336 return false;
337 }
338 } else {
339 columns_->SetPlainValue(value, value_pinner);
340 }
341 }
342 } else {
343 // It means this function is called as part of DB GetMergeOperands
344 // API and the current value should be part of
345 // merge_context_->operand_list
346 if (type == kTypeBlobIndex) {
347 PinnableSlice pin_val;
348 if (GetBlobValue(value, &pin_val) == false) {
349 return false;
350 }
351 Slice blob_value(pin_val);
352 push_operand(blob_value, nullptr);
353 } else if (type == kTypeWideColumnEntity) {
354 Slice value_copy = value;
355 Slice value_of_default;
356
357 if (!WideColumnSerialization::GetValueOfDefaultColumn(
358 value_copy, value_of_default)
359 .ok()) {
360 state_ = kCorrupt;
361 return false;
362 }
363
364 push_operand(value_of_default, value_pinner);
365 } else {
366 assert(type == kTypeValue);
367 push_operand(value, value_pinner);
368 }
369 }
370 } else if (kMerge == state_) {
371 assert(merge_operator_ != nullptr);
372 if (type == kTypeBlobIndex) {
373 PinnableSlice pin_val;
374 if (GetBlobValue(value, &pin_val) == false) {
375 return false;
376 }
377 Slice blob_value(pin_val);
378 state_ = kFound;
379 if (do_merge_) {
380 Merge(&blob_value);
381 } else {
382 // It means this function is called as part of DB GetMergeOperands
383 // API and the current value should be part of
384 // merge_context_->operand_list
385 push_operand(blob_value, nullptr);
386 }
387 } else if (type == kTypeWideColumnEntity) {
388 state_ = kFound;
389
390 if (do_merge_) {
391 MergeWithEntity(value);
392 } else {
393 // It means this function is called as part of DB GetMergeOperands
394 // API and the current value should be part of
395 // merge_context_->operand_list
396 Slice value_copy = value;
397 Slice value_of_default;
398
399 if (!WideColumnSerialization::GetValueOfDefaultColumn(
400 value_copy, value_of_default)
401 .ok()) {
402 state_ = kCorrupt;
403 return false;
404 }
405
406 push_operand(value_of_default, value_pinner);
407 }
408 } else {
409 assert(type == kTypeValue);
410
411 state_ = kFound;
412 if (do_merge_) {
413 Merge(&value);
414 } else {
415 // It means this function is called as part of DB GetMergeOperands
416 // API and the current value should be part of
417 // merge_context_->operand_list
418 push_operand(value, value_pinner);
419 }
420 }
421 }
422 return false;
423
424 case kTypeDeletion:
425 case kTypeDeletionWithTimestamp:
426 case kTypeSingleDeletion:
427 case kTypeRangeDeletion:
428 // TODO(noetzli): Verify correctness once merge of single-deletes
429 // is supported
430 assert(state_ == kNotFound || state_ == kMerge);
431 if (kNotFound == state_) {
432 state_ = kDeleted;
433 } else if (kMerge == state_) {
434 state_ = kFound;
435 if (do_merge_) {
436 Merge(nullptr);
437 }
438 // If do_merge_ = false then the current value shouldn't be part of
439 // merge_context_->operand_list
440 }
441 return false;
442
443 case kTypeMerge:
444 assert(state_ == kNotFound || state_ == kMerge);
445 state_ = kMerge;
446 // value_pinner is not set from plain_table_reader.cc for example.
447 push_operand(value, value_pinner);
448 if (do_merge_ && merge_operator_ != nullptr &&
449 merge_operator_->ShouldMerge(
450 merge_context_->GetOperandsDirectionBackward())) {
451 state_ = kFound;
452 Merge(nullptr);
453 return false;
454 }
455 return true;
456
457 default:
458 assert(false);
459 break;
460 }
461 }
462
463 // state_ could be Corrupt, merge or notfound
464 return false;
465 }
466
467 void GetContext::Merge(const Slice* value) {
468 assert(do_merge_);
469 assert(!pinnable_val_ || !columns_);
470
471 std::string result;
472 const Status s = MergeHelper::TimedFullMerge(
473 merge_operator_, user_key_, value, merge_context_->GetOperands(), &result,
474 logger_, statistics_, clock_, /* result_operand */ nullptr,
475 /* update_num_ops_stats */ true);
476 if (!s.ok()) {
477 state_ = kCorrupt;
478 return;
479 }
480
481 if (LIKELY(pinnable_val_ != nullptr)) {
482 *(pinnable_val_->GetSelf()) = std::move(result);
483 pinnable_val_->PinSelf();
484 return;
485 }
486
487 assert(columns_);
488 columns_->SetPlainValue(result);
489 }
490
491 void GetContext::MergeWithEntity(Slice entity) {
492 assert(do_merge_);
493 assert(!pinnable_val_ || !columns_);
494
495 if (LIKELY(pinnable_val_ != nullptr)) {
496 Slice value_of_default;
497
498 {
499 const Status s = WideColumnSerialization::GetValueOfDefaultColumn(
500 entity, value_of_default);
501 if (!s.ok()) {
502 state_ = kCorrupt;
503 return;
504 }
505 }
506
507 {
508 const Status s = MergeHelper::TimedFullMerge(
509 merge_operator_, user_key_, &value_of_default,
510 merge_context_->GetOperands(), pinnable_val_->GetSelf(), logger_,
511 statistics_, clock_, /* result_operand */ nullptr,
512 /* update_num_ops_stats */ true);
513 if (!s.ok()) {
514 state_ = kCorrupt;
515 return;
516 }
517 }
518
519 pinnable_val_->PinSelf();
520 return;
521 }
522
523 std::string result;
524
525 {
526 const Status s = MergeHelper::TimedFullMergeWithEntity(
527 merge_operator_, user_key_, entity, merge_context_->GetOperands(),
528 &result, logger_, statistics_, clock_, /* update_num_ops_stats */ true);
529 if (!s.ok()) {
530 state_ = kCorrupt;
531 return;
532 }
533 }
534
535 {
536 assert(columns_);
537 const Status s = columns_->SetWideColumnValue(result);
538 if (!s.ok()) {
539 state_ = kCorrupt;
540 return;
541 }
542 }
543 }
544
545 bool GetContext::GetBlobValue(const Slice& blob_index,
546 PinnableSlice* blob_value) {
547 constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
548 constexpr uint64_t* bytes_read = nullptr;
549
550 Status status = blob_fetcher_->FetchBlob(
551 user_key_, blob_index, prefetch_buffer, blob_value, bytes_read);
552 if (!status.ok()) {
553 if (status.IsIncomplete()) {
554 // FIXME: this code is not covered by unit tests
555 MarkKeyMayExist();
556 return false;
557 }
558 state_ = kCorrupt;
559 return false;
560 }
561 *is_blob_index_ = false;
562 return true;
563 }
564
565 void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
566 // TODO(yanqin) preserve timestamps information in merge_context
567 if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
568 value_pinner != nullptr) {
569 value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
570 merge_context_->PushOperand(value, true /*value_pinned*/);
571 } else {
572 merge_context_->PushOperand(value, false);
573 }
574 }
575
576 void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
577 GetContext* get_context, Cleanable* value_pinner) {
578 #ifndef ROCKSDB_LITE
579 Slice s = replay_log;
580 while (s.size()) {
581 auto type = static_cast<ValueType>(*s.data());
582 s.remove_prefix(1);
583 Slice value;
584 bool ret = GetLengthPrefixedSlice(&s, &value);
585 assert(ret);
586 (void)ret;
587
588 bool dont_care __attribute__((__unused__));
589 // Since SequenceNumber is not stored and unknown, we will use
590 // kMaxSequenceNumber.
591 get_context->SaveValue(
592 ParsedInternalKey(user_key, kMaxSequenceNumber, type), value,
593 &dont_care, value_pinner);
594 }
595 #else // ROCKSDB_LITE
596 (void)replay_log;
597 (void)user_key;
598 (void)get_context;
599 (void)value_pinner;
600 assert(false);
601 #endif // ROCKSDB_LITE
602 }
603
604 } // namespace ROCKSDB_NAMESPACE