]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/dbformat.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / dbformat.h
index de98be8dfa0f14d8fdd6bafdd98d68adef3eec99..ae866a13610d17f0870a3a817c2c6205e7ef391f 100644 (file)
@@ -69,7 +69,8 @@ enum ValueType : unsigned char {
   // generated by WriteUnprepared write policy is not mistakenly read by
   // another.
   kTypeBeginUnprepareXID = 0x13,  // WAL only.
-  kMaxValue = 0x7F                // Not used for storing records.
+  kTypeDeletionWithTimestamp = 0x14,
+  kMaxValue = 0x7F  // Not used for storing records.
 };
 
 // Defined in dbformat.cc
@@ -79,7 +80,8 @@ extern const ValueType kValueTypeForSeekForPrev;
 // Checks whether a type is an inline value type
 // (i.e. a type used in memtable skiplist and sst file datablock).
 inline bool IsValueType(ValueType t) {
-  return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex;
+  return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex ||
+         kTypeDeletionWithTimestamp == t;
 }
 
 // Checks whether a type is from user operation
@@ -94,6 +96,8 @@ static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);
 
 static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64;
 
+constexpr uint64_t kNumInternalBytes = 8;
+
 // The data structure that represents an internal key in the way that user_key,
 // sequence number and type are stored in separated forms.
 struct ParsedInternalKey {
@@ -102,36 +106,61 @@ struct ParsedInternalKey {
   ValueType type;
 
   ParsedInternalKey()
-      : sequence(kMaxSequenceNumber)  // Make code analyzer happy
-  {}  // Intentionally left uninitialized (for speed)
+      : sequence(kMaxSequenceNumber),
+        type(kTypeDeletion)  // Make code analyzer happy
+  {}                         // Intentionally left uninitialized (for speed)
+  // u contains timestamp if user timestamp feature is enabled.
   ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
       : user_key(u), sequence(seq), type(t) {}
-  std::string DebugString(bool hex = false) const;
+  std::string DebugString(bool log_err_key, bool hex) const;
 
   void clear() {
     user_key.clear();
     sequence = 0;
     type = kTypeDeletion;
   }
+
+  void SetTimestamp(const Slice& ts) {
+    assert(ts.size() <= user_key.size());
+    const char* addr = user_key.data() - ts.size();
+    memcpy(const_cast<char*>(addr), ts.data(), ts.size());
+  }
 };
 
 // Return the length of the encoding of "key".
 inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) {
-  return key.user_key.size() + 8;
+  return key.user_key.size() + kNumInternalBytes;
 }
 
 // Pack a sequence number and a ValueType into a uint64_t
-extern uint64_t PackSequenceAndType(uint64_t seq, ValueType t);
+inline uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
+  assert(seq <= kMaxSequenceNumber);
+  assert(IsExtendedValueType(t));
+  return (seq << 8) | t;
+}
 
 // Given the result of PackSequenceAndType, store the sequence number in *seq
 // and the ValueType in *t.
-extern void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t);
+inline void UnPackSequenceAndType(uint64_t packed, uint64_t* seq,
+                                  ValueType* t) {
+  *seq = packed >> 8;
+  *t = static_cast<ValueType>(packed & 0xff);
+
+  assert(*seq <= kMaxSequenceNumber);
+  assert(IsExtendedValueType(*t));
+}
 
 EntryType GetEntryType(ValueType value_type);
 
 // Append the serialization of "key" to *result.
 extern void AppendInternalKey(std::string* result,
                               const ParsedInternalKey& key);
+
+// Append the serialization of "key" to *result, replacing the original
+// timestamp with argument ts.
+extern void AppendInternalKeyWithDifferentTimestamp(
+    std::string* result, const ParsedInternalKey& key, const Slice& ts);
+
 // Serialized internal key consists of user key followed by footer.
 // This function appends the footer to *result, assuming that *result already
 // contains the user key at the end.
@@ -142,19 +171,20 @@ extern void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
 // stores the parsed data in "*result", and returns true.
 //
 // On error, returns false, leaves "*result" in an undefined state.
-extern bool ParseInternalKey(const Slice& internal_key,
-                             ParsedInternalKey* result);
+extern Status ParseInternalKey(const Slice& internal_key,
+                               ParsedInternalKey* result, bool log_err_key);
 
 // Returns the user key portion of an internal key.
 inline Slice ExtractUserKey(const Slice& internal_key) {
-  assert(internal_key.size() >= 8);
-  return Slice(internal_key.data(), internal_key.size() - 8);
+  assert(internal_key.size() >= kNumInternalBytes);
+  return Slice(internal_key.data(), internal_key.size() - kNumInternalBytes);
 }
 
 inline Slice ExtractUserKeyAndStripTimestamp(const Slice& internal_key,
                                              size_t ts_sz) {
-  assert(internal_key.size() >= 8 + ts_sz);
-  return Slice(internal_key.data(), internal_key.size() - 8 - ts_sz);
+  assert(internal_key.size() >= kNumInternalBytes + ts_sz);
+  return Slice(internal_key.data(),
+               internal_key.size() - kNumInternalBytes - ts_sz);
 }
 
 inline Slice StripTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
@@ -162,10 +192,15 @@ inline Slice StripTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
   return Slice(user_key.data(), user_key.size() - ts_sz);
 }
 
+inline Slice ExtractTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
+  assert(user_key.size() >= ts_sz);
+  return Slice(user_key.data() + user_key.size() - ts_sz, ts_sz);
+}
+
 inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) {
-  assert(internal_key.size() >= 8);
+  assert(internal_key.size() >= kNumInternalBytes);
   const size_t n = internal_key.size();
-  return DecodeFixed64(internal_key.data() + n - 8);
+  return DecodeFixed64(internal_key.data() + n - kNumInternalBytes);
 }
 
 inline ValueType ExtractValueType(const Slice& internal_key) {
@@ -186,10 +221,22 @@ class InternalKeyComparator
   std::string name_;
 
  public:
-  explicit InternalKeyComparator(const Comparator* c)
-      : user_comparator_(c),
-        name_("rocksdb.InternalKeyComparator:" +
-              std::string(user_comparator_.Name())) {}
+  // `InternalKeyComparator`s constructed with the default constructor are not
+  // usable and will segfault on any attempt to use them for comparisons.
+  InternalKeyComparator() = default;
+
+  // @param named If true, assign a name to this comparator based on the
+  //    underlying comparator's name. This involves an allocation and copy in
+  //    this constructor to precompute the result of `Name()`. To avoid this
+  //    overhead, set `named` to false. In that case, `Name()` will return a
+  //    generic name that is non-specific to the underlying comparator.
+  explicit InternalKeyComparator(const Comparator* c, bool named = true)
+      : Comparator(c->timestamp_size()), user_comparator_(c) {
+    if (named) {
+      name_ = "rocksdb.InternalKeyComparator:" +
+              std::string(user_comparator_.Name());
+    }
+  }
   virtual ~InternalKeyComparator() {}
 
   virtual const char* Name() const override;
@@ -206,6 +253,12 @@ class InternalKeyComparator
 
   int Compare(const InternalKey& a, const InternalKey& b) const;
   int Compare(const ParsedInternalKey& a, const ParsedInternalKey& b) const;
+  // In this `Compare()` overload, the sequence numbers provided in
+  // `a_global_seqno` and `b_global_seqno` override the sequence numbers in `a`
+  // and `b`, respectively. To disable sequence number override(s), provide the
+  // value `kDisableGlobalSequenceNumber`.
+  int Compare(const Slice& a, SequenceNumber a_global_seqno, const Slice& b,
+              SequenceNumber b_global_seqno) const;
   virtual const Comparator* GetRootComparator() const override {
     return user_comparator_.GetRootComparator();
   }
@@ -238,7 +291,8 @@ class InternalKey {
 
   bool Valid() const {
     ParsedInternalKey parsed;
-    return ParseInternalKey(Slice(rep_), &parsed);
+    return (ParseInternalKey(Slice(rep_), &parsed, false /* log_err_key */)
+                .ok());  // TODO
   }
 
   void DecodeFrom(const Slice& s) { rep_.assign(s.data(), s.size()); }
@@ -271,7 +325,7 @@ class InternalKey {
     AppendInternalKeyFooter(&rep_, s, t);
   }
 
-  std::string DebugString(bool hex = false) const;
+  std::string DebugString(bool hex) const;
 };
 
 inline int InternalKeyComparator::Compare(const InternalKey& a,
@@ -279,36 +333,47 @@ inline int InternalKeyComparator::Compare(const InternalKey& a,
   return Compare(a.Encode(), b.Encode());
 }
 
-inline bool ParseInternalKey(const Slice& internal_key,
-                             ParsedInternalKey* result) {
+inline Status ParseInternalKey(const Slice& internal_key,
+                               ParsedInternalKey* result, bool log_err_key) {
   const size_t n = internal_key.size();
-  if (n < 8) return false;
-  uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
+
+  if (n < kNumInternalBytes) {
+    return Status::Corruption("Corrupted Key: Internal Key too small. Size=" +
+                              std::to_string(n) + ". ");
+  }
+
+  uint64_t num = DecodeFixed64(internal_key.data() + n - kNumInternalBytes);
   unsigned char c = num & 0xff;
   result->sequence = num >> 8;
   result->type = static_cast<ValueType>(c);
   assert(result->type <= ValueType::kMaxValue);
-  result->user_key = Slice(internal_key.data(), n - 8);
-  return IsExtendedValueType(result->type);
+  result->user_key = Slice(internal_key.data(), n - kNumInternalBytes);
+
+  if (IsExtendedValueType(result->type)) {
+    return Status::OK();
+  } else {
+    return Status::Corruption("Corrupted Key",
+                              result->DebugString(log_err_key, true));
+  }
 }
 
 // Update the sequence number in the internal key.
 // Guarantees not to invalidate ikey.data().
 inline void UpdateInternalKey(std::string* ikey, uint64_t seq, ValueType t) {
   size_t ikey_sz = ikey->size();
-  assert(ikey_sz >= 8);
+  assert(ikey_sz >= kNumInternalBytes);
   uint64_t newval = (seq << 8) | t;
 
   // Note: Since C++11, strings are guaranteed to be stored contiguously and
   // string::operator[]() is guaranteed not to change ikey.data().
-  EncodeFixed64(&(*ikey)[ikey_sz - 8], newval);
+  EncodeFixed64(&(*ikey)[ikey_sz - kNumInternalBytes], newval);
 }
 
 // Get the sequence number from the internal key
 inline uint64_t GetInternalKeySeqno(const Slice& internal_key) {
   const size_t n = internal_key.size();
-  assert(n >= 8);
-  uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
+  assert(n >= kNumInternalBytes);
+  uint64_t num = DecodeFixed64(internal_key.data() + n - kNumInternalBytes);
   return num >> 8;
 }
 
@@ -347,8 +412,8 @@ class IterKey {
     if (IsUserKey()) {
       return Slice(key_, key_size_);
     } else {
-      assert(key_size_ >= 8);
-      return Slice(key_, key_size_ - 8);
+      assert(key_size_ >= kNumInternalBytes);
+      return Slice(key_, key_size_ - kNumInternalBytes);
     }
   }
 
@@ -406,9 +471,9 @@ class IterKey {
   // and returns a Slice referencing the new copy.
   Slice SetInternalKey(const Slice& key, ParsedInternalKey* ikey) {
     size_t key_n = key.size();
-    assert(key_n >= 8);
+    assert(key_n >= kNumInternalBytes);
     SetInternalKey(key);
-    ikey->user_key = Slice(key_, key_n - 8);
+    ikey->user_key = Slice(key_, key_n - kNumInternalBytes);
     return Slice(key_, key_n);
   }
 
@@ -423,35 +488,47 @@ class IterKey {
 
   // Update the sequence number in the internal key.  Guarantees not to
   // invalidate slices to the key (and the user key).
-  void UpdateInternalKey(uint64_t seq, ValueType t) {
+  void UpdateInternalKey(uint64_t seq, ValueType t, const Slice* ts = nullptr) {
     assert(!IsKeyPinned());
-    assert(key_size_ >= 8);
+    assert(key_size_ >= kNumInternalBytes);
+    if (ts) {
+      assert(key_size_ >= kNumInternalBytes + ts->size());
+      memcpy(&buf_[key_size_ - kNumInternalBytes - ts->size()], ts->data(),
+             ts->size());
+    }
     uint64_t newval = (seq << 8) | t;
-    EncodeFixed64(&buf_[key_size_ - 8], newval);
+    EncodeFixed64(&buf_[key_size_ - kNumInternalBytes], newval);
   }
 
   bool IsKeyPinned() const { return (key_ != buf_); }
 
   void SetInternalKey(const Slice& key_prefix, const Slice& user_key,
                       SequenceNumber s,
-                      ValueType value_type = kValueTypeForSeek) {
+                      ValueType value_type = kValueTypeForSeek,
+                      const Slice* ts = nullptr) {
     size_t psize = key_prefix.size();
     size_t usize = user_key.size();
-    EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t));
+    size_t ts_sz = (ts != nullptr ? ts->size() : 0);
+    EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t) + ts_sz);
     if (psize > 0) {
       memcpy(buf_, key_prefix.data(), psize);
     }
     memcpy(buf_ + psize, user_key.data(), usize);
-    EncodeFixed64(buf_ + usize + psize, PackSequenceAndType(s, value_type));
+    if (ts) {
+      memcpy(buf_ + psize + usize, ts->data(), ts_sz);
+    }
+    EncodeFixed64(buf_ + usize + psize + ts_sz,
+                  PackSequenceAndType(s, value_type));
 
     key_ = buf_;
-    key_size_ = psize + usize + sizeof(uint64_t);
+    key_size_ = psize + usize + sizeof(uint64_t) + ts_sz;
     is_user_key_ = false;
   }
 
   void SetInternalKey(const Slice& user_key, SequenceNumber s,
-                      ValueType value_type = kValueTypeForSeek) {
-    SetInternalKey(Slice(), user_key, s, value_type);
+                      ValueType value_type = kValueTypeForSeek,
+                      const Slice* ts = nullptr) {
+    SetInternalKey(Slice(), user_key, s, value_type, ts);
   }
 
   void Reserve(size_t size) {
@@ -625,8 +702,10 @@ inline int InternalKeyComparator::Compare(const Slice& akey,
   //    decreasing type (though sequence# should be enough to disambiguate)
   int r = user_comparator_.Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
   if (r == 0) {
-    const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
-    const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
+    const uint64_t anum =
+        DecodeFixed64(akey.data() + akey.size() - kNumInternalBytes);
+    const uint64_t bnum =
+        DecodeFixed64(bkey.data() + bkey.size() - kNumInternalBytes);
     if (anum > bnum) {
       r = -1;
     } else if (anum < bnum) {
@@ -644,8 +723,10 @@ inline int InternalKeyComparator::CompareKeySeq(const Slice& akey,
   int r = user_comparator_.Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
   if (r == 0) {
     // Shift the number to exclude the last byte which contains the value type
-    const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) >> 8;
-    const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) >> 8;
+    const uint64_t anum =
+        DecodeFixed64(akey.data() + akey.size() - kNumInternalBytes) >> 8;
+    const uint64_t bnum =
+        DecodeFixed64(bkey.data() + bkey.size() - kNumInternalBytes) >> 8;
     if (anum > bnum) {
       r = -1;
     } else if (anum < bnum) {
@@ -655,6 +736,32 @@ inline int InternalKeyComparator::CompareKeySeq(const Slice& akey,
   return r;
 }
 
+inline int InternalKeyComparator::Compare(const Slice& a,
+                                          SequenceNumber a_global_seqno,
+                                          const Slice& b,
+                                          SequenceNumber b_global_seqno) const {
+  int r = user_comparator_.Compare(ExtractUserKey(a), ExtractUserKey(b));
+  if (r == 0) {
+    uint64_t a_footer, b_footer;
+    if (a_global_seqno == kDisableGlobalSequenceNumber) {
+      a_footer = ExtractInternalKeyFooter(a);
+    } else {
+      a_footer = PackSequenceAndType(a_global_seqno, ExtractValueType(a));
+    }
+    if (b_global_seqno == kDisableGlobalSequenceNumber) {
+      b_footer = ExtractInternalKeyFooter(b);
+    } else {
+      b_footer = PackSequenceAndType(b_global_seqno, ExtractValueType(b));
+    }
+    if (a_footer > b_footer) {
+      r = -1;
+    } else if (a_footer < b_footer) {
+      r = +1;
+    }
+  }
+  return r;
+}
+
 // Wrap InternalKeyComparator as a comparator class for ParsedInternalKey.
 struct ParsedInternalKeyComparator {
   explicit ParsedInternalKeyComparator(const InternalKeyComparator* c)