// generated by WriteUnprepared write policy is not mistakenly read by
// another.
kTypeBeginUnprepareXID = 0x13, // WAL only.
- kMaxValue = 0x7F // Not used for storing records.
+ kTypeDeletionWithTimestamp = 0x14,
+ kMaxValue = 0x7F // Not used for storing records.
};
// Defined in dbformat.cc
// Checks whether a type is an inline value type
// (i.e. a type used in memtable skiplist and sst file datablock).
inline bool IsValueType(ValueType t) {
- return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex;
+ return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex ||
+ kTypeDeletionWithTimestamp == t;
}
// Checks whether a type is from user operation
static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64;
+constexpr uint64_t kNumInternalBytes = 8;
+
// The data structure that represents an internal key in the way that user_key,
// sequence number and type are stored in separated forms.
struct ParsedInternalKey {
ValueType type;
ParsedInternalKey()
- : sequence(kMaxSequenceNumber) // Make code analyzer happy
- {} // Intentionally left uninitialized (for speed)
+ : sequence(kMaxSequenceNumber),
+ type(kTypeDeletion) // Make code analyzer happy
+ {} // Intentionally left uninitialized (for speed)
+ // u contains timestamp if user timestamp feature is enabled.
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
: user_key(u), sequence(seq), type(t) {}
- std::string DebugString(bool hex = false) const;
+ std::string DebugString(bool log_err_key, bool hex) const;
void clear() {
user_key.clear();
sequence = 0;
type = kTypeDeletion;
}
+
+ void SetTimestamp(const Slice& ts) {
+ assert(ts.size() <= user_key.size());
+ const char* addr = user_key.data() - ts.size();
+ memcpy(const_cast<char*>(addr), ts.data(), ts.size());
+ }
};
// Return the length of the encoding of "key".
inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) {
- return key.user_key.size() + 8;
+ return key.user_key.size() + kNumInternalBytes;
}
// Pack a sequence number and a ValueType into a uint64_t
-extern uint64_t PackSequenceAndType(uint64_t seq, ValueType t);
+inline uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
+ assert(seq <= kMaxSequenceNumber);
+ assert(IsExtendedValueType(t));
+ return (seq << 8) | t;
+}
// Given the result of PackSequenceAndType, store the sequence number in *seq
// and the ValueType in *t.
-extern void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t);
+inline void UnPackSequenceAndType(uint64_t packed, uint64_t* seq,
+ ValueType* t) {
+ *seq = packed >> 8;
+ *t = static_cast<ValueType>(packed & 0xff);
+
+ assert(*seq <= kMaxSequenceNumber);
+ assert(IsExtendedValueType(*t));
+}
EntryType GetEntryType(ValueType value_type);
// Append the serialization of "key" to *result.
extern void AppendInternalKey(std::string* result,
const ParsedInternalKey& key);
+
+// Append the serialization of "key" to *result, replacing the original
+// timestamp with argument ts.
+extern void AppendInternalKeyWithDifferentTimestamp(
+ std::string* result, const ParsedInternalKey& key, const Slice& ts);
+
// Serialized internal key consists of user key followed by footer.
// This function appends the footer to *result, assuming that *result already
// contains the user key at the end.
// stores the parsed data in "*result", and returns true.
//
// On error, returns false, leaves "*result" in an undefined state.
-extern bool ParseInternalKey(const Slice& internal_key,
- ParsedInternalKey* result);
+extern Status ParseInternalKey(const Slice& internal_key,
+ ParsedInternalKey* result, bool log_err_key);
// Returns the user key portion of an internal key.
inline Slice ExtractUserKey(const Slice& internal_key) {
- assert(internal_key.size() >= 8);
- return Slice(internal_key.data(), internal_key.size() - 8);
+ assert(internal_key.size() >= kNumInternalBytes);
+ return Slice(internal_key.data(), internal_key.size() - kNumInternalBytes);
}
inline Slice ExtractUserKeyAndStripTimestamp(const Slice& internal_key,
size_t ts_sz) {
- assert(internal_key.size() >= 8 + ts_sz);
- return Slice(internal_key.data(), internal_key.size() - 8 - ts_sz);
+ assert(internal_key.size() >= kNumInternalBytes + ts_sz);
+ return Slice(internal_key.data(),
+ internal_key.size() - kNumInternalBytes - ts_sz);
}
inline Slice StripTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
return Slice(user_key.data(), user_key.size() - ts_sz);
}
+inline Slice ExtractTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
+ assert(user_key.size() >= ts_sz);
+ return Slice(user_key.data() + user_key.size() - ts_sz, ts_sz);
+}
+
inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) {
- assert(internal_key.size() >= 8);
+ assert(internal_key.size() >= kNumInternalBytes);
const size_t n = internal_key.size();
- return DecodeFixed64(internal_key.data() + n - 8);
+ return DecodeFixed64(internal_key.data() + n - kNumInternalBytes);
}
inline ValueType ExtractValueType(const Slice& internal_key) {
std::string name_;
public:
- explicit InternalKeyComparator(const Comparator* c)
- : user_comparator_(c),
- name_("rocksdb.InternalKeyComparator:" +
- std::string(user_comparator_.Name())) {}
+ // `InternalKeyComparator`s constructed with the default constructor are not
+ // usable and will segfault on any attempt to use them for comparisons.
+ InternalKeyComparator() = default;
+
+ // @param named If true, assign a name to this comparator based on the
+ // underlying comparator's name. This involves an allocation and copy in
+ // this constructor to precompute the result of `Name()`. To avoid this
+ // overhead, set `named` to false. In that case, `Name()` will return a
+ // generic name that is non-specific to the underlying comparator.
+ explicit InternalKeyComparator(const Comparator* c, bool named = true)
+ : Comparator(c->timestamp_size()), user_comparator_(c) {
+ if (named) {
+ name_ = "rocksdb.InternalKeyComparator:" +
+ std::string(user_comparator_.Name());
+ }
+ }
virtual ~InternalKeyComparator() {}
virtual const char* Name() const override;
int Compare(const InternalKey& a, const InternalKey& b) const;
int Compare(const ParsedInternalKey& a, const ParsedInternalKey& b) const;
+ // In this `Compare()` overload, the sequence numbers provided in
+ // `a_global_seqno` and `b_global_seqno` override the sequence numbers in `a`
+ // and `b`, respectively. To disable sequence number override(s), provide the
+ // value `kDisableGlobalSequenceNumber`.
+ int Compare(const Slice& a, SequenceNumber a_global_seqno, const Slice& b,
+ SequenceNumber b_global_seqno) const;
virtual const Comparator* GetRootComparator() const override {
return user_comparator_.GetRootComparator();
}
bool Valid() const {
ParsedInternalKey parsed;
- return ParseInternalKey(Slice(rep_), &parsed);
+ return (ParseInternalKey(Slice(rep_), &parsed, false /* log_err_key */)
+ .ok()); // TODO
}
void DecodeFrom(const Slice& s) { rep_.assign(s.data(), s.size()); }
AppendInternalKeyFooter(&rep_, s, t);
}
- std::string DebugString(bool hex = false) const;
+ std::string DebugString(bool hex) const;
};
inline int InternalKeyComparator::Compare(const InternalKey& a,
return Compare(a.Encode(), b.Encode());
}
-inline bool ParseInternalKey(const Slice& internal_key,
- ParsedInternalKey* result) {
+inline Status ParseInternalKey(const Slice& internal_key,
+ ParsedInternalKey* result, bool log_err_key) {
const size_t n = internal_key.size();
- if (n < 8) return false;
- uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
+
+ if (n < kNumInternalBytes) {
+ return Status::Corruption("Corrupted Key: Internal Key too small. Size=" +
+ std::to_string(n) + ". ");
+ }
+
+ uint64_t num = DecodeFixed64(internal_key.data() + n - kNumInternalBytes);
unsigned char c = num & 0xff;
result->sequence = num >> 8;
result->type = static_cast<ValueType>(c);
assert(result->type <= ValueType::kMaxValue);
- result->user_key = Slice(internal_key.data(), n - 8);
- return IsExtendedValueType(result->type);
+ result->user_key = Slice(internal_key.data(), n - kNumInternalBytes);
+
+ if (IsExtendedValueType(result->type)) {
+ return Status::OK();
+ } else {
+ return Status::Corruption("Corrupted Key",
+ result->DebugString(log_err_key, true));
+ }
}
// Update the sequence number in the internal key.
// Guarantees not to invalidate ikey.data().
inline void UpdateInternalKey(std::string* ikey, uint64_t seq, ValueType t) {
size_t ikey_sz = ikey->size();
- assert(ikey_sz >= 8);
+ assert(ikey_sz >= kNumInternalBytes);
uint64_t newval = (seq << 8) | t;
// Note: Since C++11, strings are guaranteed to be stored contiguously and
// string::operator[]() is guaranteed not to change ikey.data().
- EncodeFixed64(&(*ikey)[ikey_sz - 8], newval);
+ EncodeFixed64(&(*ikey)[ikey_sz - kNumInternalBytes], newval);
}
// Get the sequence number from the internal key
inline uint64_t GetInternalKeySeqno(const Slice& internal_key) {
const size_t n = internal_key.size();
- assert(n >= 8);
- uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
+ assert(n >= kNumInternalBytes);
+ uint64_t num = DecodeFixed64(internal_key.data() + n - kNumInternalBytes);
return num >> 8;
}
if (IsUserKey()) {
return Slice(key_, key_size_);
} else {
- assert(key_size_ >= 8);
- return Slice(key_, key_size_ - 8);
+ assert(key_size_ >= kNumInternalBytes);
+ return Slice(key_, key_size_ - kNumInternalBytes);
}
}
// and returns a Slice referencing the new copy.
Slice SetInternalKey(const Slice& key, ParsedInternalKey* ikey) {
size_t key_n = key.size();
- assert(key_n >= 8);
+ assert(key_n >= kNumInternalBytes);
SetInternalKey(key);
- ikey->user_key = Slice(key_, key_n - 8);
+ ikey->user_key = Slice(key_, key_n - kNumInternalBytes);
return Slice(key_, key_n);
}
// Update the sequence number in the internal key. Guarantees not to
// invalidate slices to the key (and the user key).
- void UpdateInternalKey(uint64_t seq, ValueType t) {
+ void UpdateInternalKey(uint64_t seq, ValueType t, const Slice* ts = nullptr) {
assert(!IsKeyPinned());
- assert(key_size_ >= 8);
+ assert(key_size_ >= kNumInternalBytes);
+ if (ts) {
+ assert(key_size_ >= kNumInternalBytes + ts->size());
+ memcpy(&buf_[key_size_ - kNumInternalBytes - ts->size()], ts->data(),
+ ts->size());
+ }
uint64_t newval = (seq << 8) | t;
- EncodeFixed64(&buf_[key_size_ - 8], newval);
+ EncodeFixed64(&buf_[key_size_ - kNumInternalBytes], newval);
}
bool IsKeyPinned() const { return (key_ != buf_); }
void SetInternalKey(const Slice& key_prefix, const Slice& user_key,
SequenceNumber s,
- ValueType value_type = kValueTypeForSeek) {
+ ValueType value_type = kValueTypeForSeek,
+ const Slice* ts = nullptr) {
size_t psize = key_prefix.size();
size_t usize = user_key.size();
- EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t));
+ size_t ts_sz = (ts != nullptr ? ts->size() : 0);
+ EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t) + ts_sz);
if (psize > 0) {
memcpy(buf_, key_prefix.data(), psize);
}
memcpy(buf_ + psize, user_key.data(), usize);
- EncodeFixed64(buf_ + usize + psize, PackSequenceAndType(s, value_type));
+ if (ts) {
+ memcpy(buf_ + psize + usize, ts->data(), ts_sz);
+ }
+ EncodeFixed64(buf_ + usize + psize + ts_sz,
+ PackSequenceAndType(s, value_type));
key_ = buf_;
- key_size_ = psize + usize + sizeof(uint64_t);
+ key_size_ = psize + usize + sizeof(uint64_t) + ts_sz;
is_user_key_ = false;
}
void SetInternalKey(const Slice& user_key, SequenceNumber s,
- ValueType value_type = kValueTypeForSeek) {
- SetInternalKey(Slice(), user_key, s, value_type);
+ ValueType value_type = kValueTypeForSeek,
+ const Slice* ts = nullptr) {
+ SetInternalKey(Slice(), user_key, s, value_type, ts);
}
void Reserve(size_t size) {
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_.Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
- const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
- const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
+ const uint64_t anum =
+ DecodeFixed64(akey.data() + akey.size() - kNumInternalBytes);
+ const uint64_t bnum =
+ DecodeFixed64(bkey.data() + bkey.size() - kNumInternalBytes);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
int r = user_comparator_.Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
// Shift the number to exclude the last byte which contains the value type
- const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) >> 8;
- const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) >> 8;
+ const uint64_t anum =
+ DecodeFixed64(akey.data() + akey.size() - kNumInternalBytes) >> 8;
+ const uint64_t bnum =
+ DecodeFixed64(bkey.data() + bkey.size() - kNumInternalBytes) >> 8;
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
return r;
}
+inline int InternalKeyComparator::Compare(const Slice& a,
+ SequenceNumber a_global_seqno,
+ const Slice& b,
+ SequenceNumber b_global_seqno) const {
+ int r = user_comparator_.Compare(ExtractUserKey(a), ExtractUserKey(b));
+ if (r == 0) {
+ uint64_t a_footer, b_footer;
+ if (a_global_seqno == kDisableGlobalSequenceNumber) {
+ a_footer = ExtractInternalKeyFooter(a);
+ } else {
+ a_footer = PackSequenceAndType(a_global_seqno, ExtractValueType(a));
+ }
+ if (b_global_seqno == kDisableGlobalSequenceNumber) {
+ b_footer = ExtractInternalKeyFooter(b);
+ } else {
+ b_footer = PackSequenceAndType(b_global_seqno, ExtractValueType(b));
+ }
+ if (a_footer > b_footer) {
+ r = -1;
+ } else if (a_footer < b_footer) {
+ r = +1;
+ }
+ }
+ return r;
+}
+
// Wrap InternalKeyComparator as a comparator class for ParsedInternalKey.
struct ParsedInternalKeyComparator {
explicit ParsedInternalKeyComparator(const InternalKeyComparator* c)