#include <chrono>
#include <sstream>
#include <thread>
+
#include "db/db_impl/db_impl.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
#include "rocksdb/slice.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/write_batch.h"
#include "util/coding.h"
#include "util/string_util.h"
-#include "util/threadpool_imp.h"
namespace ROCKSDB_NAMESPACE {
const std::string kTraceMagic = "feedcafedeadbeef";
namespace {
-void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
- PutFixed32(dst, cf_id);
- PutLengthPrefixedSlice(dst, key);
-}
-
void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
Slice buf(buffer);
GetFixed32(&buf, cf_id);
}
} // namespace
+Status TracerHelper::ParseVersionStr(std::string& v_string, int* v_num) {
+ if (v_string.find_first_of('.') == std::string::npos ||
+ v_string.find_first_of('.') != v_string.find_last_of('.')) {
+ return Status::Corruption(
+ "Corrupted trace file. Incorrect version format.");
+ }
+ int tmp_num = 0;
+ for (int i = 0; i < static_cast<int>(v_string.size()); i++) {
+ if (v_string[i] == '.') {
+ continue;
+ } else if (isdigit(v_string[i])) {
+ tmp_num = tmp_num * 10 + (v_string[i] - '0');
+ } else {
+ return Status::Corruption(
+ "Corrupted trace file. Incorrect version format");
+ }
+ }
+ *v_num = tmp_num;
+ return Status::OK();
+}
+
+Status TracerHelper::ParseTraceHeader(const Trace& header, int* trace_version,
+ int* db_version) {
+ std::vector<std::string> s_vec;
+ int begin = 0, end;
+ for (int i = 0; i < 3; i++) {
+ assert(header.payload.find("\t", begin) != std::string::npos);
+ end = static_cast<int>(header.payload.find("\t", begin));
+ s_vec.push_back(header.payload.substr(begin, end - begin));
+ begin = end + 1;
+ }
+
+ std::string t_v_str, db_v_str;
+ assert(s_vec.size() == 3);
+ assert(s_vec[1].find("Trace Version: ") != std::string::npos);
+ t_v_str = s_vec[1].substr(15);
+ assert(s_vec[2].find("RocksDB Version: ") != std::string::npos);
+ db_v_str = s_vec[2].substr(17);
+
+ Status s;
+ s = ParseVersionStr(t_v_str, trace_version);
+ if (s != Status::OK()) {
+ return s;
+ }
+ s = ParseVersionStr(db_v_str, db_version);
+ return s;
+}
+
void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
assert(encoded_trace);
PutFixed64(encoded_trace, trace.ts);
return Status::OK();
}
-Tracer::Tracer(Env* env, const TraceOptions& trace_options,
+Status TracerHelper::DecodeHeader(const std::string& encoded_trace,
+ Trace* header) {
+ Status s = TracerHelper::DecodeTrace(encoded_trace, header);
+
+ if (header->type != kTraceBegin) {
+ return Status::Corruption("Corrupted trace file. Incorrect header.");
+ }
+ if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
+ return Status::Corruption("Corrupted trace file. Incorrect magic.");
+ }
+
+ return s;
+}
+
+bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
+ const TracePayloadType payload_type) {
+ uint64_t old_state = payload_map;
+ uint64_t tmp = 1;
+ payload_map |= (tmp << payload_type);
+ return old_state != payload_map;
+}
+
+Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record) {
+ assert(trace != nullptr);
+
+ if (record != nullptr) {
+ record->reset(nullptr);
+ }
+
+ switch (trace->type) {
+ // Write
+ case kTraceWrite: {
+ PinnableSlice rep;
+ if (trace_file_version < 2) {
+ rep.PinSelf(trace->payload);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ Slice write_batch_data;
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kWriteBatchData: {
+ GetLengthPrefixedSlice(&buf, &write_batch_data);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ rep.PinSelf(write_batch_data);
+ }
+
+ if (record != nullptr) {
+ record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
+ }
+
+ return Status::OK();
+ }
+ // Get
+ case kTraceGet: {
+ uint32_t cf_id = 0;
+ Slice get_key;
+
+ if (trace_file_version < 2) {
+ DecodeCFAndKey(trace->payload, &cf_id, &get_key);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kGetCFID: {
+ GetFixed32(&buf, &cf_id);
+ break;
+ }
+ case TracePayloadType::kGetKey: {
+ GetLengthPrefixedSlice(&buf, &get_key);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ }
+
+ if (record != nullptr) {
+ PinnableSlice ps;
+ ps.PinSelf(get_key);
+ record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
+ }
+
+ return Status::OK();
+ }
+ // Iterator Seek and SeekForPrev
+ case kTraceIteratorSeek:
+ case kTraceIteratorSeekForPrev: {
+ uint32_t cf_id = 0;
+ Slice iter_key;
+ Slice lower_bound;
+ Slice upper_bound;
+
+ if (trace_file_version < 2) {
+ DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kIterCFID: {
+ GetFixed32(&buf, &cf_id);
+ break;
+ }
+ case TracePayloadType::kIterKey: {
+ GetLengthPrefixedSlice(&buf, &iter_key);
+ break;
+ }
+ case TracePayloadType::kIterLowerBound: {
+ GetLengthPrefixedSlice(&buf, &lower_bound);
+ break;
+ }
+ case TracePayloadType::kIterUpperBound: {
+ GetLengthPrefixedSlice(&buf, &upper_bound);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ }
+
+ if (record != nullptr) {
+ PinnableSlice ps_key;
+ ps_key.PinSelf(iter_key);
+ PinnableSlice ps_lower;
+ ps_lower.PinSelf(lower_bound);
+ PinnableSlice ps_upper;
+ ps_upper.PinSelf(upper_bound);
+ record->reset(new IteratorSeekQueryTraceRecord(
+ static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type),
+ cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
+ trace->ts));
+ }
+
+ return Status::OK();
+ }
+ // MultiGet
+ case kTraceMultiGet: {
+ if (trace_file_version < 2) {
+ return Status::Corruption("MultiGet is not supported.");
+ }
+
+ uint32_t multiget_size = 0;
+ std::vector<uint32_t> cf_ids;
+ std::vector<PinnableSlice> multiget_keys;
+
+ Slice cfids_payload;
+ Slice keys_payload;
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kMultiGetSize: {
+ GetFixed32(&buf, &multiget_size);
+ break;
+ }
+ case TracePayloadType::kMultiGetCFIDs: {
+ GetLengthPrefixedSlice(&buf, &cfids_payload);
+ break;
+ }
+ case TracePayloadType::kMultiGetKeys: {
+ GetLengthPrefixedSlice(&buf, &keys_payload);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ if (multiget_size == 0) {
+ return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
+ }
+
+ // Decode the cfids_payload and keys_payload
+ cf_ids.reserve(multiget_size);
+ multiget_keys.reserve(multiget_size);
+ for (uint32_t i = 0; i < multiget_size; i++) {
+ uint32_t tmp_cfid;
+ Slice tmp_key;
+ GetFixed32(&cfids_payload, &tmp_cfid);
+ GetLengthPrefixedSlice(&keys_payload, &tmp_key);
+ cf_ids.push_back(tmp_cfid);
+ Slice s(tmp_key);
+ PinnableSlice ps;
+ ps.PinSelf(s);
+ multiget_keys.push_back(std::move(ps));
+ }
+
+ if (record != nullptr) {
+ record->reset(new MultiGetQueryTraceRecord(
+ std::move(cf_ids), std::move(multiget_keys), trace->ts));
+ }
+
+ return Status::OK();
+ }
+ default:
+ return Status::NotSupported("Unsupported trace type.");
+ }
+}
+
+Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
- : env_(env),
+ : clock_(clock),
trace_options_(trace_options),
trace_writer_(std::move(trace_writer)),
- trace_request_count_ (0) {
+ trace_request_count_(0) {
// TODO: What if this fails?
WriteHeader().PermitUncheckedError();
}
return Status::OK();
}
Trace trace;
- trace.ts = env_->NowMicros();
+ trace.ts = clock_->NowMicros();
trace.type = trace_type;
- trace.payload = write_batch->Data();
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kWriteBatchData);
+ PutFixed64(&trace.payload, trace.payload_map);
+ PutLengthPrefixedSlice(&trace.payload, Slice(write_batch->Data()));
return WriteTrace(trace);
}
return Status::OK();
}
Trace trace;
- trace.ts = env_->NowMicros();
+ trace.ts = clock_->NowMicros();
trace.type = trace_type;
- EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
+ // Set the payloadmap of the struct member that will be encoded in the
+ // payload.
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetCFID);
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetKey);
+ // Encode the Get struct members into payload. Make sure add them in order.
+ PutFixed64(&trace.payload, trace.payload_map);
+ PutFixed32(&trace.payload, column_family->GetID());
+ PutLengthPrefixedSlice(&trace.payload, key);
return WriteTrace(trace);
}
-Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
+Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key,
+ const Slice& lower_bound, const Slice upper_bound) {
TraceType trace_type = kTraceIteratorSeek;
if (ShouldSkipTrace(trace_type)) {
return Status::OK();
}
Trace trace;
- trace.ts = env_->NowMicros();
+ trace.ts = clock_->NowMicros();
trace.type = trace_type;
- EncodeCFAndKey(&trace.payload, cf_id, key);
+ // Set the payloadmap of the struct member that will be encoded in the
+ // payload.
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
+ if (lower_bound.size() > 0) {
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kIterLowerBound);
+ }
+ if (upper_bound.size() > 0) {
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kIterUpperBound);
+ }
+ // Encode the Iterator struct members into payload. Make sure add them in
+ // order.
+ PutFixed64(&trace.payload, trace.payload_map);
+ PutFixed32(&trace.payload, cf_id);
+ PutLengthPrefixedSlice(&trace.payload, key);
+ if (lower_bound.size() > 0) {
+ PutLengthPrefixedSlice(&trace.payload, lower_bound);
+ }
+ if (upper_bound.size() > 0) {
+ PutLengthPrefixedSlice(&trace.payload, upper_bound);
+ }
return WriteTrace(trace);
}
-Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
+Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
+ const Slice& lower_bound,
+ const Slice upper_bound) {
TraceType trace_type = kTraceIteratorSeekForPrev;
if (ShouldSkipTrace(trace_type)) {
return Status::OK();
}
Trace trace;
- trace.ts = env_->NowMicros();
+ trace.ts = clock_->NowMicros();
+ trace.type = trace_type;
+ // Set the payloadmap of the struct member that will be encoded in the
+ // payload.
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
+ if (lower_bound.size() > 0) {
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kIterLowerBound);
+ }
+ if (upper_bound.size() > 0) {
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kIterUpperBound);
+ }
+ // Encode the Iterator struct members into payload. Make sure add them in
+ // order.
+ PutFixed64(&trace.payload, trace.payload_map);
+ PutFixed32(&trace.payload, cf_id);
+ PutLengthPrefixedSlice(&trace.payload, key);
+ if (lower_bound.size() > 0) {
+ PutLengthPrefixedSlice(&trace.payload, lower_bound);
+ }
+ if (upper_bound.size() > 0) {
+ PutLengthPrefixedSlice(&trace.payload, upper_bound);
+ }
+ return WriteTrace(trace);
+}
+
+Status Tracer::MultiGet(const size_t num_keys,
+ ColumnFamilyHandle** column_families,
+ const Slice* keys) {
+ if (num_keys == 0) {
+ return Status::OK();
+ }
+ std::vector<ColumnFamilyHandle*> v_column_families;
+ std::vector<Slice> v_keys;
+ v_column_families.resize(num_keys);
+ v_keys.resize(num_keys);
+ for (size_t i = 0; i < num_keys; i++) {
+ v_column_families[i] = column_families[i];
+ v_keys[i] = keys[i];
+ }
+ return MultiGet(v_column_families, v_keys);
+}
+
+Status Tracer::MultiGet(const size_t num_keys,
+ ColumnFamilyHandle* column_family, const Slice* keys) {
+ if (num_keys == 0) {
+ return Status::OK();
+ }
+ std::vector<ColumnFamilyHandle*> column_families;
+ std::vector<Slice> v_keys;
+ column_families.resize(num_keys);
+ v_keys.resize(num_keys);
+ for (size_t i = 0; i < num_keys; i++) {
+ column_families[i] = column_family;
+ v_keys[i] = keys[i];
+ }
+ return MultiGet(column_families, v_keys);
+}
+
+Status Tracer::MultiGet(const std::vector<ColumnFamilyHandle*>& column_families,
+ const std::vector<Slice>& keys) {
+ if (column_families.size() != keys.size()) {
+ return Status::Corruption("the CFs size and keys size does not match!");
+ }
+ TraceType trace_type = kTraceMultiGet;
+ if (ShouldSkipTrace(trace_type)) {
+ return Status::OK();
+ }
+ uint32_t multiget_size = static_cast<uint32_t>(keys.size());
+ Trace trace;
+ trace.ts = clock_->NowMicros();
trace.type = trace_type;
- EncodeCFAndKey(&trace.payload, cf_id, key);
+ // Set the payloadmap of the struct member that will be encoded in the
+ // payload.
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kMultiGetSize);
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kMultiGetCFIDs);
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kMultiGetKeys);
+ // Encode the CFIDs inorder
+ std::string cfids_payload;
+ std::string keys_payload;
+ for (uint32_t i = 0; i < multiget_size; i++) {
+ assert(i < column_families.size());
+ assert(i < keys.size());
+ PutFixed32(&cfids_payload, column_families[i]->GetID());
+ PutLengthPrefixedSlice(&keys_payload, keys[i]);
+ }
+ // Encode the Get struct members into payload. Make sure add them in order.
+ PutFixed64(&trace.payload, trace.payload_map);
+ PutFixed32(&trace.payload, multiget_size);
+ PutLengthPrefixedSlice(&trace.payload, cfids_payload);
+ PutLengthPrefixedSlice(&trace.payload, keys_payload);
return WriteTrace(trace);
}
if (IsTraceFileOverMax()) {
return true;
}
- if ((trace_options_.filter & kTraceFilterGet
- && trace_type == kTraceGet)
- || (trace_options_.filter & kTraceFilterWrite
- && trace_type == kTraceWrite)) {
+
+ TraceFilterType filter_mask = kTraceFilterNone;
+ switch (trace_type) {
+ case kTraceNone:
+ case kTraceBegin:
+ case kTraceEnd:
+ filter_mask = kTraceFilterNone;
+ break;
+ case kTraceWrite:
+ filter_mask = kTraceFilterWrite;
+ break;
+ case kTraceGet:
+ filter_mask = kTraceFilterGet;
+ break;
+ case kTraceIteratorSeek:
+ filter_mask = kTraceFilterIteratorSeek;
+ break;
+ case kTraceIteratorSeekForPrev:
+ filter_mask = kTraceFilterIteratorSeekForPrev;
+ break;
+ case kBlockTraceIndexBlock:
+ case kBlockTraceFilterBlock:
+ case kBlockTraceDataBlock:
+ case kBlockTraceUncompressionDictBlock:
+ case kBlockTraceRangeDeletionBlock:
+ case kIOTracer:
+ filter_mask = kTraceFilterNone;
+ break;
+ case kTraceMultiGet:
+ filter_mask = kTraceFilterMultiGet;
+ break;
+ case kTraceMax:
+ assert(false);
+ filter_mask = kTraceFilterNone;
+ break;
+ }
+ if (filter_mask != kTraceFilterNone && trace_options_.filter & filter_mask) {
return true;
}
+
++trace_request_count_;
if (trace_request_count_ < trace_options_.sampling_frequency) {
return true;
Status Tracer::WriteHeader() {
std::ostringstream s;
s << kTraceMagic << "\t"
- << "Trace Version: 0.1\t"
+ << "Trace Version: " << kTraceFileMajorVersion << "."
+ << kTraceFileMinorVersion << "\t"
<< "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
<< "Format: Timestamp OpType Payload\n";
std::string header(s.str());
Trace trace;
- trace.ts = env_->NowMicros();
+ trace.ts = clock_->NowMicros();
trace.type = kTraceBegin;
trace.payload = header;
return WriteTrace(trace);
Status Tracer::WriteFooter() {
Trace trace;
- trace.ts = env_->NowMicros();
+ trace.ts = clock_->NowMicros();
trace.type = kTraceEnd;
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kEmptyPayload);
trace.payload = "";
return WriteTrace(trace);
}
Status Tracer::Close() { return WriteFooter(); }
-Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
- std::unique_ptr<TraceReader>&& reader)
- : trace_reader_(std::move(reader)) {
- assert(db != nullptr);
- db_ = static_cast<DBImpl*>(db->GetRootDB());
- env_ = Env::Default();
- for (ColumnFamilyHandle* cfh : handles) {
- cf_map_[cfh->GetID()] = cfh;
- }
- fast_forward_ = 1;
-}
-
-Replayer::~Replayer() { trace_reader_.reset(); }
-
-Status Replayer::SetFastForward(uint32_t fast_forward) {
- Status s;
- if (fast_forward < 1) {
- s = Status::InvalidArgument("Wrong fast forward speed!");
- } else {
- fast_forward_ = fast_forward;
- s = Status::OK();
- }
- return s;
-}
-
-Status Replayer::Replay() {
- Status s;
- Trace header;
- s = ReadHeader(&header);
- if (!s.ok()) {
- return s;
- }
-
- std::chrono::system_clock::time_point replay_epoch =
- std::chrono::system_clock::now();
- WriteOptions woptions;
- ReadOptions roptions;
- Trace trace;
- uint64_t ops = 0;
- Iterator* single_iter = nullptr;
- while (s.ok()) {
- trace.reset();
- s = ReadTrace(&trace);
- if (!s.ok()) {
- break;
- }
-
- std::this_thread::sleep_until(
- replay_epoch +
- std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
- if (trace.type == kTraceWrite) {
- WriteBatch batch(trace.payload);
- db_->Write(woptions, &batch);
- ops++;
- } else if (trace.type == kTraceGet) {
- uint32_t cf_id = 0;
- Slice key;
- DecodeCFAndKey(trace.payload, &cf_id, &key);
- if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
- return Status::Corruption("Invalid Column Family ID.");
- }
-
- std::string value;
- if (cf_id == 0) {
- db_->Get(roptions, key, &value);
- } else {
- db_->Get(roptions, cf_map_[cf_id], key, &value);
- }
- ops++;
- } else if (trace.type == kTraceIteratorSeek) {
- uint32_t cf_id = 0;
- Slice key;
- DecodeCFAndKey(trace.payload, &cf_id, &key);
- if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
- return Status::Corruption("Invalid Column Family ID.");
- }
-
- if (cf_id == 0) {
- single_iter = db_->NewIterator(roptions);
- } else {
- single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
- }
- single_iter->Seek(key);
- ops++;
- delete single_iter;
- } else if (trace.type == kTraceIteratorSeekForPrev) {
- // Currently, only support to call the Seek()
- uint32_t cf_id = 0;
- Slice key;
- DecodeCFAndKey(trace.payload, &cf_id, &key);
- if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
- return Status::Corruption("Invalid Column Family ID.");
- }
-
- if (cf_id == 0) {
- single_iter = db_->NewIterator(roptions);
- } else {
- single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
- }
- single_iter->SeekForPrev(key);
- ops++;
- delete single_iter;
- } else if (trace.type == kTraceEnd) {
- // Do nothing for now.
- // TODO: Add some validations later.
- break;
- }
- }
-
- if (s.IsIncomplete()) {
- // Reaching eof returns Incomplete status at the moment.
- // Could happen when killing a process without calling EndTrace() API.
- // TODO: Add better error handling.
- return Status::OK();
- }
- return s;
-}
-
-// The trace can be replayed with multithread by configurnge the number of
-// threads in the thread pool. Trace records are read from the trace file
-// sequentially and the corresponding queries are scheduled in the task
-// queue based on the timestamp. Currently, we support Write_batch (Put,
-// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
-Status Replayer::MultiThreadReplay(uint32_t threads_num) {
- Status s;
- Trace header;
- s = ReadHeader(&header);
- if (!s.ok()) {
- return s;
- }
-
- ThreadPoolImpl thread_pool;
- thread_pool.SetHostEnv(env_);
-
- if (threads_num > 1) {
- thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
- } else {
- thread_pool.SetBackgroundThreads(1);
- }
-
- std::chrono::system_clock::time_point replay_epoch =
- std::chrono::system_clock::now();
- WriteOptions woptions;
- ReadOptions roptions;
- uint64_t ops = 0;
- while (s.ok()) {
- std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
- ra->db = db_;
- s = ReadTrace(&(ra->trace_entry));
- if (!s.ok()) {
- break;
- }
- ra->cf_map = &cf_map_;
- ra->woptions = woptions;
- ra->roptions = roptions;
-
- std::this_thread::sleep_until(
- replay_epoch + std::chrono::microseconds(
- (ra->trace_entry.ts - header.ts) / fast_forward_));
- if (ra->trace_entry.type == kTraceWrite) {
- thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
- nullptr);
- ops++;
- } else if (ra->trace_entry.type == kTraceGet) {
- thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
- nullptr);
- ops++;
- } else if (ra->trace_entry.type == kTraceIteratorSeek) {
- thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
- nullptr);
- ops++;
- } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
- thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
- nullptr, nullptr);
- ops++;
- } else if (ra->trace_entry.type == kTraceEnd) {
- // Do nothing for now.
- // TODO: Add some validations later.
- break;
- } else {
- // Other trace entry types that are not implemented for replay.
- // To finish the replay, we continue the process.
- continue;
- }
- }
-
- if (s.IsIncomplete()) {
- // Reaching eof returns Incomplete status at the moment.
- // Could happen when killing a process without calling EndTrace() API.
- // TODO: Add better error handling.
- s = Status::OK();
- }
- thread_pool.JoinAllThreads();
- return s;
-}
-
-Status Replayer::ReadHeader(Trace* header) {
- assert(header != nullptr);
- Status s = ReadTrace(header);
- if (!s.ok()) {
- return s;
- }
- if (header->type != kTraceBegin) {
- return Status::Corruption("Corrupted trace file. Incorrect header.");
- }
- if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
- return Status::Corruption("Corrupted trace file. Incorrect magic.");
- }
-
- return s;
-}
-
-Status Replayer::ReadFooter(Trace* footer) {
- assert(footer != nullptr);
- Status s = ReadTrace(footer);
- if (!s.ok()) {
- return s;
- }
- if (footer->type != kTraceEnd) {
- return Status::Corruption("Corrupted trace file. Incorrect footer.");
- }
-
- // TODO: Add more validations later
- return s;
-}
-
-Status Replayer::ReadTrace(Trace* trace) {
- assert(trace != nullptr);
- std::string encoded_trace;
- Status s = trace_reader_->Read(&encoded_trace);
- if (!s.ok()) {
- return s;
- }
- return TracerHelper::DecodeTrace(encoded_trace, trace);
-}
-
-void Replayer::BGWorkGet(void* arg) {
- std::unique_ptr<ReplayerWorkerArg> ra(
- reinterpret_cast<ReplayerWorkerArg*>(arg));
- assert(ra != nullptr);
- auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
- ra->cf_map);
- uint32_t cf_id = 0;
- Slice key;
- DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
- if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
- return;
- }
-
- std::string value;
- if (cf_id == 0) {
- ra->db->Get(ra->roptions, key, &value);
- } else {
- ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value);
- }
-
- return;
-}
-
-void Replayer::BGWorkWriteBatch(void* arg) {
- std::unique_ptr<ReplayerWorkerArg> ra(
- reinterpret_cast<ReplayerWorkerArg*>(arg));
- assert(ra != nullptr);
- WriteBatch batch(ra->trace_entry.payload);
- ra->db->Write(ra->woptions, &batch);
- return;
-}
-
-void Replayer::BGWorkIterSeek(void* arg) {
- std::unique_ptr<ReplayerWorkerArg> ra(
- reinterpret_cast<ReplayerWorkerArg*>(arg));
- assert(ra != nullptr);
- auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
- ra->cf_map);
- uint32_t cf_id = 0;
- Slice key;
- DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
- if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
- return;
- }
-
- std::string value;
- Iterator* single_iter = nullptr;
- if (cf_id == 0) {
- single_iter = ra->db->NewIterator(ra->roptions);
- } else {
- single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
- }
- single_iter->Seek(key);
- delete single_iter;
- return;
-}
-
-void Replayer::BGWorkIterSeekForPrev(void* arg) {
- std::unique_ptr<ReplayerWorkerArg> ra(
- reinterpret_cast<ReplayerWorkerArg*>(arg));
- assert(ra != nullptr);
- auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
- ra->cf_map);
- uint32_t cf_id = 0;
- Slice key;
- DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
- if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
- return;
- }
-
- std::string value;
- Iterator* single_iter = nullptr;
- if (cf_id == 0) {
- single_iter = ra->db->NewIterator(ra->roptions);
- } else {
- single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
- }
- single_iter->SeekForPrev(key);
- delete single_iter;
- return;
-}
-
} // namespace ROCKSDB_NAMESPACE