]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/trace_replay/trace_replay.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / trace_replay / trace_replay.cc
index 949cd450d01b8cb6df3c0305beda78806b2906ab..37b95852b7463c09ffd0cbdafa11e53ea110dfd6 100644 (file)
@@ -8,23 +8,23 @@
 #include <chrono>
 #include <sstream>
 #include <thread>
+
 #include "db/db_impl/db_impl.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
 #include "rocksdb/slice.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/trace_reader_writer.h"
 #include "rocksdb/write_batch.h"
 #include "util/coding.h"
 #include "util/string_util.h"
-#include "util/threadpool_imp.h"
 
 namespace ROCKSDB_NAMESPACE {
 
 const std::string kTraceMagic = "feedcafedeadbeef";
 
 namespace {
-void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
-  PutFixed32(dst, cf_id);
-  PutLengthPrefixedSlice(dst, key);
-}
-
 void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
   Slice buf(buffer);
   GetFixed32(&buf, cf_id);
@@ -32,6 +32,54 @@ void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
 }
 }  // namespace
 
+Status TracerHelper::ParseVersionStr(std::string& v_string, int* v_num) {
+  if (v_string.find_first_of('.') == std::string::npos ||
+      v_string.find_first_of('.') != v_string.find_last_of('.')) {
+    return Status::Corruption(
+        "Corrupted trace file. Incorrect version format.");
+  }
+  int tmp_num = 0;
+  for (int i = 0; i < static_cast<int>(v_string.size()); i++) {
+    if (v_string[i] == '.') {
+      continue;
+    } else if (isdigit(v_string[i])) {
+      tmp_num = tmp_num * 10 + (v_string[i] - '0');
+    } else {
+      return Status::Corruption(
+          "Corrupted trace file. Incorrect version format");
+    }
+  }
+  *v_num = tmp_num;
+  return Status::OK();
+}
+
+Status TracerHelper::ParseTraceHeader(const Trace& header, int* trace_version,
+                                      int* db_version) {
+  std::vector<std::string> s_vec;
+  int begin = 0, end;
+  for (int i = 0; i < 3; i++) {
+    assert(header.payload.find("\t", begin) != std::string::npos);
+    end = static_cast<int>(header.payload.find("\t", begin));
+    s_vec.push_back(header.payload.substr(begin, end - begin));
+    begin = end + 1;
+  }
+
+  std::string t_v_str, db_v_str;
+  assert(s_vec.size() == 3);
+  assert(s_vec[1].find("Trace Version: ") != std::string::npos);
+  t_v_str = s_vec[1].substr(15);
+  assert(s_vec[2].find("RocksDB Version: ") != std::string::npos);
+  db_v_str = s_vec[2].substr(17);
+
+  Status s;
+  s = ParseVersionStr(t_v_str, trace_version);
+  if (s != Status::OK()) {
+    return s;
+  }
+  s = ParseVersionStr(db_v_str, db_version);
+  return s;
+}
+
 void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
   assert(encoded_trace);
   PutFixed64(encoded_trace, trace.ts);
@@ -56,12 +104,248 @@ Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
   return Status::OK();
 }
 
-Tracer::Tracer(Env* env, const TraceOptions& trace_options,
+Status TracerHelper::DecodeHeader(const std::string& encoded_trace,
+                                  Trace* header) {
+  Status s = TracerHelper::DecodeTrace(encoded_trace, header);
+
+  if (header->type != kTraceBegin) {
+    return Status::Corruption("Corrupted trace file. Incorrect header.");
+  }
+  if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
+    return Status::Corruption("Corrupted trace file. Incorrect magic.");
+  }
+
+  return s;
+}
+
+bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
+                                 const TracePayloadType payload_type) {
+  uint64_t old_state = payload_map;
+  uint64_t tmp = 1;
+  payload_map |= (tmp << payload_type);
+  return old_state != payload_map;
+}
+
+Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version,
+                                       std::unique_ptr<TraceRecord>* record) {
+  assert(trace != nullptr);
+
+  if (record != nullptr) {
+    record->reset(nullptr);
+  }
+
+  switch (trace->type) {
+    // Write
+    case kTraceWrite: {
+      PinnableSlice rep;
+      if (trace_file_version < 2) {
+        rep.PinSelf(trace->payload);
+      } else {
+        Slice buf(trace->payload);
+        GetFixed64(&buf, &trace->payload_map);
+        int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+        Slice write_batch_data;
+        while (payload_map) {
+          // Find the rightmost set bit.
+          uint32_t set_pos =
+              static_cast<uint32_t>(log2(payload_map & -payload_map));
+          switch (set_pos) {
+            case TracePayloadType::kWriteBatchData: {
+              GetLengthPrefixedSlice(&buf, &write_batch_data);
+              break;
+            }
+            default: {
+              assert(false);
+            }
+          }
+          // unset the rightmost bit.
+          payload_map &= (payload_map - 1);
+        }
+        rep.PinSelf(write_batch_data);
+      }
+
+      if (record != nullptr) {
+        record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
+      }
+
+      return Status::OK();
+    }
+    // Get
+    case kTraceGet: {
+      uint32_t cf_id = 0;
+      Slice get_key;
+
+      if (trace_file_version < 2) {
+        DecodeCFAndKey(trace->payload, &cf_id, &get_key);
+      } else {
+        Slice buf(trace->payload);
+        GetFixed64(&buf, &trace->payload_map);
+        int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+        while (payload_map) {
+          // Find the rightmost set bit.
+          uint32_t set_pos =
+              static_cast<uint32_t>(log2(payload_map & -payload_map));
+          switch (set_pos) {
+            case TracePayloadType::kGetCFID: {
+              GetFixed32(&buf, &cf_id);
+              break;
+            }
+            case TracePayloadType::kGetKey: {
+              GetLengthPrefixedSlice(&buf, &get_key);
+              break;
+            }
+            default: {
+              assert(false);
+            }
+          }
+          // unset the rightmost bit.
+          payload_map &= (payload_map - 1);
+        }
+      }
+
+      if (record != nullptr) {
+        PinnableSlice ps;
+        ps.PinSelf(get_key);
+        record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
+      }
+
+      return Status::OK();
+    }
+    // Iterator Seek and SeekForPrev
+    case kTraceIteratorSeek:
+    case kTraceIteratorSeekForPrev: {
+      uint32_t cf_id = 0;
+      Slice iter_key;
+      Slice lower_bound;
+      Slice upper_bound;
+
+      if (trace_file_version < 2) {
+        DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
+      } else {
+        Slice buf(trace->payload);
+        GetFixed64(&buf, &trace->payload_map);
+        int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+        while (payload_map) {
+          // Find the rightmost set bit.
+          uint32_t set_pos =
+              static_cast<uint32_t>(log2(payload_map & -payload_map));
+          switch (set_pos) {
+            case TracePayloadType::kIterCFID: {
+              GetFixed32(&buf, &cf_id);
+              break;
+            }
+            case TracePayloadType::kIterKey: {
+              GetLengthPrefixedSlice(&buf, &iter_key);
+              break;
+            }
+            case TracePayloadType::kIterLowerBound: {
+              GetLengthPrefixedSlice(&buf, &lower_bound);
+              break;
+            }
+            case TracePayloadType::kIterUpperBound: {
+              GetLengthPrefixedSlice(&buf, &upper_bound);
+              break;
+            }
+            default: {
+              assert(false);
+            }
+          }
+          // unset the rightmost bit.
+          payload_map &= (payload_map - 1);
+        }
+      }
+
+      if (record != nullptr) {
+        PinnableSlice ps_key;
+        ps_key.PinSelf(iter_key);
+        PinnableSlice ps_lower;
+        ps_lower.PinSelf(lower_bound);
+        PinnableSlice ps_upper;
+        ps_upper.PinSelf(upper_bound);
+        record->reset(new IteratorSeekQueryTraceRecord(
+            static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type),
+            cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
+            trace->ts));
+      }
+
+      return Status::OK();
+    }
+    // MultiGet
+    case kTraceMultiGet: {
+      if (trace_file_version < 2) {
+        return Status::Corruption("MultiGet is not supported.");
+      }
+
+      uint32_t multiget_size = 0;
+      std::vector<uint32_t> cf_ids;
+      std::vector<PinnableSlice> multiget_keys;
+
+      Slice cfids_payload;
+      Slice keys_payload;
+      Slice buf(trace->payload);
+      GetFixed64(&buf, &trace->payload_map);
+      int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+      while (payload_map) {
+        // Find the rightmost set bit.
+        uint32_t set_pos =
+            static_cast<uint32_t>(log2(payload_map & -payload_map));
+        switch (set_pos) {
+          case TracePayloadType::kMultiGetSize: {
+            GetFixed32(&buf, &multiget_size);
+            break;
+          }
+          case TracePayloadType::kMultiGetCFIDs: {
+            GetLengthPrefixedSlice(&buf, &cfids_payload);
+            break;
+          }
+          case TracePayloadType::kMultiGetKeys: {
+            GetLengthPrefixedSlice(&buf, &keys_payload);
+            break;
+          }
+          default: {
+            assert(false);
+          }
+        }
+        // unset the rightmost bit.
+        payload_map &= (payload_map - 1);
+      }
+      if (multiget_size == 0) {
+        return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
+      }
+
+      // Decode the cfids_payload and keys_payload
+      cf_ids.reserve(multiget_size);
+      multiget_keys.reserve(multiget_size);
+      for (uint32_t i = 0; i < multiget_size; i++) {
+        uint32_t tmp_cfid;
+        Slice tmp_key;
+        GetFixed32(&cfids_payload, &tmp_cfid);
+        GetLengthPrefixedSlice(&keys_payload, &tmp_key);
+        cf_ids.push_back(tmp_cfid);
+        Slice s(tmp_key);
+        PinnableSlice ps;
+        ps.PinSelf(s);
+        multiget_keys.push_back(std::move(ps));
+      }
+
+      if (record != nullptr) {
+        record->reset(new MultiGetQueryTraceRecord(
+            std::move(cf_ids), std::move(multiget_keys), trace->ts));
+      }
+
+      return Status::OK();
+    }
+    default:
+      return Status::NotSupported("Unsupported trace type.");
+  }
+}
+
+Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
                std::unique_ptr<TraceWriter>&& trace_writer)
-    : env_(env),
+    : clock_(clock),
       trace_options_(trace_options),
       trace_writer_(std::move(trace_writer)),
-      trace_request_count_ (0) {
+      trace_request_count_(0) {
   // TODO: What if this fails?
   WriteHeader().PermitUncheckedError();
 }
@@ -74,9 +358,12 @@ Status Tracer::Write(WriteBatch* write_batch) {
     return Status::OK();
   }
   Trace trace;
-  trace.ts = env_->NowMicros();
+  trace.ts = clock_->NowMicros();
   trace.type = trace_type;
-  trace.payload = write_batch->Data();
+  TracerHelper::SetPayloadMap(trace.payload_map,
+                              TracePayloadType::kWriteBatchData);
+  PutFixed64(&trace.payload, trace.payload_map);
+  PutLengthPrefixedSlice(&trace.payload, Slice(write_batch->Data()));
   return WriteTrace(trace);
 }
 
@@ -86,33 +373,158 @@ Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
     return Status::OK();
   }
   Trace trace;
-  trace.ts = env_->NowMicros();
+  trace.ts = clock_->NowMicros();
   trace.type = trace_type;
-  EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
+  // Set the payloadmap of the struct member that will be encoded in the
+  // payload.
+  TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetCFID);
+  TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetKey);
+  // Encode the Get struct members into payload. Make sure add them in order.
+  PutFixed64(&trace.payload, trace.payload_map);
+  PutFixed32(&trace.payload, column_family->GetID());
+  PutLengthPrefixedSlice(&trace.payload, key);
   return WriteTrace(trace);
 }
 
-Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
+Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key,
+                            const Slice& lower_bound, const Slice upper_bound) {
   TraceType trace_type = kTraceIteratorSeek;
   if (ShouldSkipTrace(trace_type)) {
     return Status::OK();
   }
   Trace trace;
-  trace.ts = env_->NowMicros();
+  trace.ts = clock_->NowMicros();
   trace.type = trace_type;
-  EncodeCFAndKey(&trace.payload, cf_id, key);
+  // Set the payloadmap of the struct member that will be encoded in the
+  // payload.
+  TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
+  TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
+  if (lower_bound.size() > 0) {
+    TracerHelper::SetPayloadMap(trace.payload_map,
+                                TracePayloadType::kIterLowerBound);
+  }
+  if (upper_bound.size() > 0) {
+    TracerHelper::SetPayloadMap(trace.payload_map,
+                                TracePayloadType::kIterUpperBound);
+  }
+  // Encode the Iterator struct members into payload. Make sure add them in
+  // order.
+  PutFixed64(&trace.payload, trace.payload_map);
+  PutFixed32(&trace.payload, cf_id);
+  PutLengthPrefixedSlice(&trace.payload, key);
+  if (lower_bound.size() > 0) {
+    PutLengthPrefixedSlice(&trace.payload, lower_bound);
+  }
+  if (upper_bound.size() > 0) {
+    PutLengthPrefixedSlice(&trace.payload, upper_bound);
+  }
   return WriteTrace(trace);
 }
 
-Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
+Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
+                                   const Slice& lower_bound,
+                                   const Slice upper_bound) {
   TraceType trace_type = kTraceIteratorSeekForPrev;
   if (ShouldSkipTrace(trace_type)) {
     return Status::OK();
   }
   Trace trace;
-  trace.ts = env_->NowMicros();
+  trace.ts = clock_->NowMicros();
+  trace.type = trace_type;
+  // Set the payloadmap of the struct member that will be encoded in the
+  // payload.
+  TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
+  TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
+  if (lower_bound.size() > 0) {
+    TracerHelper::SetPayloadMap(trace.payload_map,
+                                TracePayloadType::kIterLowerBound);
+  }
+  if (upper_bound.size() > 0) {
+    TracerHelper::SetPayloadMap(trace.payload_map,
+                                TracePayloadType::kIterUpperBound);
+  }
+  // Encode the Iterator struct members into payload. Make sure add them in
+  // order.
+  PutFixed64(&trace.payload, trace.payload_map);
+  PutFixed32(&trace.payload, cf_id);
+  PutLengthPrefixedSlice(&trace.payload, key);
+  if (lower_bound.size() > 0) {
+    PutLengthPrefixedSlice(&trace.payload, lower_bound);
+  }
+  if (upper_bound.size() > 0) {
+    PutLengthPrefixedSlice(&trace.payload, upper_bound);
+  }
+  return WriteTrace(trace);
+}
+
+Status Tracer::MultiGet(const size_t num_keys,
+                        ColumnFamilyHandle** column_families,
+                        const Slice* keys) {
+  if (num_keys == 0) {
+    return Status::OK();
+  }
+  std::vector<ColumnFamilyHandle*> v_column_families;
+  std::vector<Slice> v_keys;
+  v_column_families.resize(num_keys);
+  v_keys.resize(num_keys);
+  for (size_t i = 0; i < num_keys; i++) {
+    v_column_families[i] = column_families[i];
+    v_keys[i] = keys[i];
+  }
+  return MultiGet(v_column_families, v_keys);
+}
+
+Status Tracer::MultiGet(const size_t num_keys,
+                        ColumnFamilyHandle* column_family, const Slice* keys) {
+  if (num_keys == 0) {
+    return Status::OK();
+  }
+  std::vector<ColumnFamilyHandle*> column_families;
+  std::vector<Slice> v_keys;
+  column_families.resize(num_keys);
+  v_keys.resize(num_keys);
+  for (size_t i = 0; i < num_keys; i++) {
+    column_families[i] = column_family;
+    v_keys[i] = keys[i];
+  }
+  return MultiGet(column_families, v_keys);
+}
+
+Status Tracer::MultiGet(const std::vector<ColumnFamilyHandle*>& column_families,
+                        const std::vector<Slice>& keys) {
+  if (column_families.size() != keys.size()) {
+    return Status::Corruption("the CFs size and keys size does not match!");
+  }
+  TraceType trace_type = kTraceMultiGet;
+  if (ShouldSkipTrace(trace_type)) {
+    return Status::OK();
+  }
+  uint32_t multiget_size = static_cast<uint32_t>(keys.size());
+  Trace trace;
+  trace.ts = clock_->NowMicros();
   trace.type = trace_type;
-  EncodeCFAndKey(&trace.payload, cf_id, key);
+  // Set the payloadmap of the struct member that will be encoded in the
+  // payload.
+  TracerHelper::SetPayloadMap(trace.payload_map,
+                              TracePayloadType::kMultiGetSize);
+  TracerHelper::SetPayloadMap(trace.payload_map,
+                              TracePayloadType::kMultiGetCFIDs);
+  TracerHelper::SetPayloadMap(trace.payload_map,
+                              TracePayloadType::kMultiGetKeys);
+  // Encode the CFIDs inorder
+  std::string cfids_payload;
+  std::string keys_payload;
+  for (uint32_t i = 0; i < multiget_size; i++) {
+    assert(i < column_families.size());
+    assert(i < keys.size());
+    PutFixed32(&cfids_payload, column_families[i]->GetID());
+    PutLengthPrefixedSlice(&keys_payload, keys[i]);
+  }
+  // Encode the Get struct members into payload. Make sure add them in order.
+  PutFixed64(&trace.payload, trace.payload_map);
+  PutFixed32(&trace.payload, multiget_size);
+  PutLengthPrefixedSlice(&trace.payload, cfids_payload);
+  PutLengthPrefixedSlice(&trace.payload, keys_payload);
   return WriteTrace(trace);
 }
 
@@ -120,12 +532,46 @@ bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
   if (IsTraceFileOverMax()) {
     return true;
   }
-  if ((trace_options_.filter & kTraceFilterGet
-    && trace_type == kTraceGet)
-   || (trace_options_.filter & kTraceFilterWrite
-    && trace_type == kTraceWrite)) {
+
+  TraceFilterType filter_mask = kTraceFilterNone;
+  switch (trace_type) {
+    case kTraceNone:
+    case kTraceBegin:
+    case kTraceEnd:
+      filter_mask = kTraceFilterNone;
+      break;
+    case kTraceWrite:
+      filter_mask = kTraceFilterWrite;
+      break;
+    case kTraceGet:
+      filter_mask = kTraceFilterGet;
+      break;
+    case kTraceIteratorSeek:
+      filter_mask = kTraceFilterIteratorSeek;
+      break;
+    case kTraceIteratorSeekForPrev:
+      filter_mask = kTraceFilterIteratorSeekForPrev;
+      break;
+    case kBlockTraceIndexBlock:
+    case kBlockTraceFilterBlock:
+    case kBlockTraceDataBlock:
+    case kBlockTraceUncompressionDictBlock:
+    case kBlockTraceRangeDeletionBlock:
+    case kIOTracer:
+      filter_mask = kTraceFilterNone;
+      break;
+    case kTraceMultiGet:
+      filter_mask = kTraceFilterMultiGet;
+      break;
+    case kTraceMax:
+      assert(false);
+      filter_mask = kTraceFilterNone;
+      break;
+  }
+  if (filter_mask != kTraceFilterNone && trace_options_.filter & filter_mask) {
     return true;
   }
+
   ++trace_request_count_;
   if (trace_request_count_ < trace_options_.sampling_frequency) {
     return true;
@@ -142,13 +588,14 @@ bool Tracer::IsTraceFileOverMax() {
 Status Tracer::WriteHeader() {
   std::ostringstream s;
   s << kTraceMagic << "\t"
-    << "Trace Version: 0.1\t"
+    << "Trace Version: " << kTraceFileMajorVersion << "."
+    << kTraceFileMinorVersion << "\t"
     << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
     << "Format: Timestamp OpType Payload\n";
   std::string header(s.str());
 
   Trace trace;
-  trace.ts = env_->NowMicros();
+  trace.ts = clock_->NowMicros();
   trace.type = kTraceBegin;
   trace.payload = header;
   return WriteTrace(trace);
@@ -156,8 +603,10 @@ Status Tracer::WriteHeader() {
 
 Status Tracer::WriteFooter() {
   Trace trace;
-  trace.ts = env_->NowMicros();
+  trace.ts = clock_->NowMicros();
   trace.type = kTraceEnd;
+  TracerHelper::SetPayloadMap(trace.payload_map,
+                              TracePayloadType::kEmptyPayload);
   trace.payload = "";
   return WriteTrace(trace);
 }
@@ -170,322 +619,4 @@ Status Tracer::WriteTrace(const Trace& trace) {
 
 Status Tracer::Close() { return WriteFooter(); }
 
-Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
-                   std::unique_ptr<TraceReader>&& reader)
-    : trace_reader_(std::move(reader)) {
-  assert(db != nullptr);
-  db_ = static_cast<DBImpl*>(db->GetRootDB());
-  env_ = Env::Default();
-  for (ColumnFamilyHandle* cfh : handles) {
-    cf_map_[cfh->GetID()] = cfh;
-  }
-  fast_forward_ = 1;
-}
-
-Replayer::~Replayer() { trace_reader_.reset(); }
-
-Status Replayer::SetFastForward(uint32_t fast_forward) {
-  Status s;
-  if (fast_forward < 1) {
-    s = Status::InvalidArgument("Wrong fast forward speed!");
-  } else {
-    fast_forward_ = fast_forward;
-    s = Status::OK();
-  }
-  return s;
-}
-
-Status Replayer::Replay() {
-  Status s;
-  Trace header;
-  s = ReadHeader(&header);
-  if (!s.ok()) {
-    return s;
-  }
-
-  std::chrono::system_clock::time_point replay_epoch =
-      std::chrono::system_clock::now();
-  WriteOptions woptions;
-  ReadOptions roptions;
-  Trace trace;
-  uint64_t ops = 0;
-  Iterator* single_iter = nullptr;
-  while (s.ok()) {
-    trace.reset();
-    s = ReadTrace(&trace);
-    if (!s.ok()) {
-      break;
-    }
-
-    std::this_thread::sleep_until(
-        replay_epoch +
-        std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
-    if (trace.type == kTraceWrite) {
-      WriteBatch batch(trace.payload);
-      db_->Write(woptions, &batch);
-      ops++;
-    } else if (trace.type == kTraceGet) {
-      uint32_t cf_id = 0;
-      Slice key;
-      DecodeCFAndKey(trace.payload, &cf_id, &key);
-      if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
-        return Status::Corruption("Invalid Column Family ID.");
-      }
-
-      std::string value;
-      if (cf_id == 0) {
-        db_->Get(roptions, key, &value);
-      } else {
-        db_->Get(roptions, cf_map_[cf_id], key, &value);
-      }
-      ops++;
-    } else if (trace.type == kTraceIteratorSeek) {
-      uint32_t cf_id = 0;
-      Slice key;
-      DecodeCFAndKey(trace.payload, &cf_id, &key);
-      if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
-        return Status::Corruption("Invalid Column Family ID.");
-      }
-
-      if (cf_id == 0) {
-        single_iter = db_->NewIterator(roptions);
-      } else {
-        single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
-      }
-      single_iter->Seek(key);
-      ops++;
-      delete single_iter;
-    } else if (trace.type == kTraceIteratorSeekForPrev) {
-      // Currently, only support to call the Seek()
-      uint32_t cf_id = 0;
-      Slice key;
-      DecodeCFAndKey(trace.payload, &cf_id, &key);
-      if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
-        return Status::Corruption("Invalid Column Family ID.");
-      }
-
-      if (cf_id == 0) {
-        single_iter = db_->NewIterator(roptions);
-      } else {
-        single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
-      }
-      single_iter->SeekForPrev(key);
-      ops++;
-      delete single_iter;
-    } else if (trace.type == kTraceEnd) {
-      // Do nothing for now.
-      // TODO: Add some validations later.
-      break;
-    }
-  }
-
-  if (s.IsIncomplete()) {
-    // Reaching eof returns Incomplete status at the moment.
-    // Could happen when killing a process without calling EndTrace() API.
-    // TODO: Add better error handling.
-    return Status::OK();
-  }
-  return s;
-}
-
-// The trace can be replayed with multithread by configurnge the number of
-// threads in the thread pool. Trace records are read from the trace file
-// sequentially and the corresponding queries are scheduled in the task
-// queue based on the timestamp. Currently, we support Write_batch (Put,
-// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
-Status Replayer::MultiThreadReplay(uint32_t threads_num) {
-  Status s;
-  Trace header;
-  s = ReadHeader(&header);
-  if (!s.ok()) {
-    return s;
-  }
-
-  ThreadPoolImpl thread_pool;
-  thread_pool.SetHostEnv(env_);
-
-  if (threads_num > 1) {
-    thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
-  } else {
-    thread_pool.SetBackgroundThreads(1);
-  }
-
-  std::chrono::system_clock::time_point replay_epoch =
-      std::chrono::system_clock::now();
-  WriteOptions woptions;
-  ReadOptions roptions;
-  uint64_t ops = 0;
-  while (s.ok()) {
-    std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
-    ra->db = db_;
-    s = ReadTrace(&(ra->trace_entry));
-    if (!s.ok()) {
-      break;
-    }
-    ra->cf_map = &cf_map_;
-    ra->woptions = woptions;
-    ra->roptions = roptions;
-
-    std::this_thread::sleep_until(
-        replay_epoch + std::chrono::microseconds(
-                           (ra->trace_entry.ts - header.ts) / fast_forward_));
-    if (ra->trace_entry.type == kTraceWrite) {
-      thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
-                           nullptr);
-      ops++;
-    } else if (ra->trace_entry.type == kTraceGet) {
-      thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
-                           nullptr);
-      ops++;
-    } else if (ra->trace_entry.type == kTraceIteratorSeek) {
-      thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
-                           nullptr);
-      ops++;
-    } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
-      thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
-                           nullptr, nullptr);
-      ops++;
-    } else if (ra->trace_entry.type == kTraceEnd) {
-      // Do nothing for now.
-      // TODO: Add some validations later.
-      break;
-    } else {
-      // Other trace entry types that are not implemented for replay.
-      // To finish the replay, we continue the process.
-      continue;
-    }
-  }
-
-  if (s.IsIncomplete()) {
-    // Reaching eof returns Incomplete status at the moment.
-    // Could happen when killing a process without calling EndTrace() API.
-    // TODO: Add better error handling.
-    s = Status::OK();
-  }
-  thread_pool.JoinAllThreads();
-  return s;
-}
-
-Status Replayer::ReadHeader(Trace* header) {
-  assert(header != nullptr);
-  Status s = ReadTrace(header);
-  if (!s.ok()) {
-    return s;
-  }
-  if (header->type != kTraceBegin) {
-    return Status::Corruption("Corrupted trace file. Incorrect header.");
-  }
-  if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
-    return Status::Corruption("Corrupted trace file. Incorrect magic.");
-  }
-
-  return s;
-}
-
-Status Replayer::ReadFooter(Trace* footer) {
-  assert(footer != nullptr);
-  Status s = ReadTrace(footer);
-  if (!s.ok()) {
-    return s;
-  }
-  if (footer->type != kTraceEnd) {
-    return Status::Corruption("Corrupted trace file. Incorrect footer.");
-  }
-
-  // TODO: Add more validations later
-  return s;
-}
-
-Status Replayer::ReadTrace(Trace* trace) {
-  assert(trace != nullptr);
-  std::string encoded_trace;
-  Status s = trace_reader_->Read(&encoded_trace);
-  if (!s.ok()) {
-    return s;
-  }
-  return TracerHelper::DecodeTrace(encoded_trace, trace);
-}
-
-void Replayer::BGWorkGet(void* arg) {
-  std::unique_ptr<ReplayerWorkerArg> ra(
-      reinterpret_cast<ReplayerWorkerArg*>(arg));
-  assert(ra != nullptr);
-  auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
-      ra->cf_map);
-  uint32_t cf_id = 0;
-  Slice key;
-  DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
-  if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
-    return;
-  }
-
-  std::string value;
-  if (cf_id == 0) {
-    ra->db->Get(ra->roptions, key, &value);
-  } else {
-    ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value);
-  }
-
-  return;
-}
-
-void Replayer::BGWorkWriteBatch(void* arg) {
-  std::unique_ptr<ReplayerWorkerArg> ra(
-      reinterpret_cast<ReplayerWorkerArg*>(arg));
-  assert(ra != nullptr);
-  WriteBatch batch(ra->trace_entry.payload);
-  ra->db->Write(ra->woptions, &batch);
-  return;
-}
-
-void Replayer::BGWorkIterSeek(void* arg) {
-  std::unique_ptr<ReplayerWorkerArg> ra(
-      reinterpret_cast<ReplayerWorkerArg*>(arg));
-  assert(ra != nullptr);
-  auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
-      ra->cf_map);
-  uint32_t cf_id = 0;
-  Slice key;
-  DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
-  if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
-    return;
-  }
-
-  std::string value;
-  Iterator* single_iter = nullptr;
-  if (cf_id == 0) {
-    single_iter = ra->db->NewIterator(ra->roptions);
-  } else {
-    single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
-  }
-  single_iter->Seek(key);
-  delete single_iter;
-  return;
-}
-
-void Replayer::BGWorkIterSeekForPrev(void* arg) {
-  std::unique_ptr<ReplayerWorkerArg> ra(
-      reinterpret_cast<ReplayerWorkerArg*>(arg));
-  assert(ra != nullptr);
-  auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
-      ra->cf_map);
-  uint32_t cf_id = 0;
-  Slice key;
-  DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
-  if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
-    return;
-  }
-
-  std::string value;
-  Iterator* single_iter = nullptr;
-  if (cf_id == 0) {
-    single_iter = ra->db->NewIterator(ra->roptions);
-  } else {
-    single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
-  }
-  single_iter->SeekForPrev(key);
-  delete single_iter;
-  return;
-}
-
 }  // namespace ROCKSDB_NAMESPACE