1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "trace_replay/trace_replay.h"
11 #include "db/db_impl/db_impl.h"
12 #include "rocksdb/slice.h"
13 #include "rocksdb/write_batch.h"
14 #include "util/coding.h"
15 #include "util/string_util.h"
16 #include "util/threadpool_imp.h"
18 namespace ROCKSDB_NAMESPACE
{
20 const std::string kTraceMagic
= "feedcafedeadbeef";
23 void EncodeCFAndKey(std::string
* dst
, uint32_t cf_id
, const Slice
& key
) {
24 PutFixed32(dst
, cf_id
);
25 PutLengthPrefixedSlice(dst
, key
);
28 void DecodeCFAndKey(std::string
& buffer
, uint32_t* cf_id
, Slice
* key
) {
30 GetFixed32(&buf
, cf_id
);
31 GetLengthPrefixedSlice(&buf
, key
);
35 void TracerHelper::EncodeTrace(const Trace
& trace
, std::string
* encoded_trace
) {
36 assert(encoded_trace
);
37 PutFixed64(encoded_trace
, trace
.ts
);
38 encoded_trace
->push_back(trace
.type
);
39 PutFixed32(encoded_trace
, static_cast<uint32_t>(trace
.payload
.size()));
40 encoded_trace
->append(trace
.payload
);
43 Status
TracerHelper::DecodeTrace(const std::string
& encoded_trace
,
45 assert(trace
!= nullptr);
46 Slice enc_slice
= Slice(encoded_trace
);
47 if (!GetFixed64(&enc_slice
, &trace
->ts
)) {
48 return Status::Incomplete("Decode trace string failed");
50 if (enc_slice
.size() < kTraceTypeSize
+ kTracePayloadLengthSize
) {
51 return Status::Incomplete("Decode trace string failed");
53 trace
->type
= static_cast<TraceType
>(enc_slice
[0]);
54 enc_slice
.remove_prefix(kTraceTypeSize
+ kTracePayloadLengthSize
);
55 trace
->payload
= enc_slice
.ToString();
59 Tracer::Tracer(Env
* env
, const TraceOptions
& trace_options
,
60 std::unique_ptr
<TraceWriter
>&& trace_writer
)
62 trace_options_(trace_options
),
63 trace_writer_(std::move(trace_writer
)),
64 trace_request_count_ (0) {
68 Tracer::~Tracer() { trace_writer_
.reset(); }
70 Status
Tracer::Write(WriteBatch
* write_batch
) {
71 TraceType trace_type
= kTraceWrite
;
72 if (ShouldSkipTrace(trace_type
)) {
76 trace
.ts
= env_
->NowMicros();
77 trace
.type
= trace_type
;
78 trace
.payload
= write_batch
->Data();
79 return WriteTrace(trace
);
82 Status
Tracer::Get(ColumnFamilyHandle
* column_family
, const Slice
& key
) {
83 TraceType trace_type
= kTraceGet
;
84 if (ShouldSkipTrace(trace_type
)) {
88 trace
.ts
= env_
->NowMicros();
89 trace
.type
= trace_type
;
90 EncodeCFAndKey(&trace
.payload
, column_family
->GetID(), key
);
91 return WriteTrace(trace
);
94 Status
Tracer::IteratorSeek(const uint32_t& cf_id
, const Slice
& key
) {
95 TraceType trace_type
= kTraceIteratorSeek
;
96 if (ShouldSkipTrace(trace_type
)) {
100 trace
.ts
= env_
->NowMicros();
101 trace
.type
= trace_type
;
102 EncodeCFAndKey(&trace
.payload
, cf_id
, key
);
103 return WriteTrace(trace
);
106 Status
Tracer::IteratorSeekForPrev(const uint32_t& cf_id
, const Slice
& key
) {
107 TraceType trace_type
= kTraceIteratorSeekForPrev
;
108 if (ShouldSkipTrace(trace_type
)) {
112 trace
.ts
= env_
->NowMicros();
113 trace
.type
= trace_type
;
114 EncodeCFAndKey(&trace
.payload
, cf_id
, key
);
115 return WriteTrace(trace
);
118 bool Tracer::ShouldSkipTrace(const TraceType
& trace_type
) {
119 if (IsTraceFileOverMax()) {
122 if ((trace_options_
.filter
& kTraceFilterGet
123 && trace_type
== kTraceGet
)
124 || (trace_options_
.filter
& kTraceFilterWrite
125 && trace_type
== kTraceWrite
)) {
128 ++trace_request_count_
;
129 if (trace_request_count_
< trace_options_
.sampling_frequency
) {
132 trace_request_count_
= 0;
136 bool Tracer::IsTraceFileOverMax() {
137 uint64_t trace_file_size
= trace_writer_
->GetFileSize();
138 return (trace_file_size
> trace_options_
.max_trace_file_size
);
141 Status
Tracer::WriteHeader() {
142 std::ostringstream s
;
143 s
<< kTraceMagic
<< "\t"
144 << "Trace Version: 0.1\t"
145 << "RocksDB Version: " << kMajorVersion
<< "." << kMinorVersion
<< "\t"
146 << "Format: Timestamp OpType Payload\n";
147 std::string
header(s
.str());
150 trace
.ts
= env_
->NowMicros();
151 trace
.type
= kTraceBegin
;
152 trace
.payload
= header
;
153 return WriteTrace(trace
);
156 Status
Tracer::WriteFooter() {
158 trace
.ts
= env_
->NowMicros();
159 trace
.type
= kTraceEnd
;
161 return WriteTrace(trace
);
164 Status
Tracer::WriteTrace(const Trace
& trace
) {
165 std::string encoded_trace
;
166 TracerHelper::EncodeTrace(trace
, &encoded_trace
);
167 return trace_writer_
->Write(Slice(encoded_trace
));
170 Status
Tracer::Close() { return WriteFooter(); }
172 Replayer::Replayer(DB
* db
, const std::vector
<ColumnFamilyHandle
*>& handles
,
173 std::unique_ptr
<TraceReader
>&& reader
)
174 : trace_reader_(std::move(reader
)) {
175 assert(db
!= nullptr);
176 db_
= static_cast<DBImpl
*>(db
->GetRootDB());
177 env_
= Env::Default();
178 for (ColumnFamilyHandle
* cfh
: handles
) {
179 cf_map_
[cfh
->GetID()] = cfh
;
184 Replayer::~Replayer() { trace_reader_
.reset(); }
186 Status
Replayer::SetFastForward(uint32_t fast_forward
) {
188 if (fast_forward
< 1) {
189 s
= Status::InvalidArgument("Wrong fast forward speed!");
191 fast_forward_
= fast_forward
;
197 Status
Replayer::Replay() {
200 s
= ReadHeader(&header
);
205 std::chrono::system_clock::time_point replay_epoch
=
206 std::chrono::system_clock::now();
207 WriteOptions woptions
;
208 ReadOptions roptions
;
211 Iterator
* single_iter
= nullptr;
214 s
= ReadTrace(&trace
);
219 std::this_thread::sleep_until(
221 std::chrono::microseconds((trace
.ts
- header
.ts
) / fast_forward_
));
222 if (trace
.type
== kTraceWrite
) {
223 WriteBatch
batch(trace
.payload
);
224 db_
->Write(woptions
, &batch
);
226 } else if (trace
.type
== kTraceGet
) {
229 DecodeCFAndKey(trace
.payload
, &cf_id
, &key
);
230 if (cf_id
> 0 && cf_map_
.find(cf_id
) == cf_map_
.end()) {
231 return Status::Corruption("Invalid Column Family ID.");
236 db_
->Get(roptions
, key
, &value
);
238 db_
->Get(roptions
, cf_map_
[cf_id
], key
, &value
);
241 } else if (trace
.type
== kTraceIteratorSeek
) {
244 DecodeCFAndKey(trace
.payload
, &cf_id
, &key
);
245 if (cf_id
> 0 && cf_map_
.find(cf_id
) == cf_map_
.end()) {
246 return Status::Corruption("Invalid Column Family ID.");
250 single_iter
= db_
->NewIterator(roptions
);
252 single_iter
= db_
->NewIterator(roptions
, cf_map_
[cf_id
]);
254 single_iter
->Seek(key
);
257 } else if (trace
.type
== kTraceIteratorSeekForPrev
) {
258 // Currently, only support to call the Seek()
261 DecodeCFAndKey(trace
.payload
, &cf_id
, &key
);
262 if (cf_id
> 0 && cf_map_
.find(cf_id
) == cf_map_
.end()) {
263 return Status::Corruption("Invalid Column Family ID.");
267 single_iter
= db_
->NewIterator(roptions
);
269 single_iter
= db_
->NewIterator(roptions
, cf_map_
[cf_id
]);
271 single_iter
->SeekForPrev(key
);
274 } else if (trace
.type
== kTraceEnd
) {
275 // Do nothing for now.
276 // TODO: Add some validations later.
281 if (s
.IsIncomplete()) {
282 // Reaching eof returns Incomplete status at the moment.
283 // Could happen when killing a process without calling EndTrace() API.
284 // TODO: Add better error handling.
290 // The trace can be replayed with multithread by configurnge the number of
291 // threads in the thread pool. Trace records are read from the trace file
292 // sequentially and the corresponding queries are scheduled in the task
293 // queue based on the timestamp. Currently, we support Write_batch (Put,
294 // Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
295 Status
Replayer::MultiThreadReplay(uint32_t threads_num
) {
298 s
= ReadHeader(&header
);
303 ThreadPoolImpl thread_pool
;
304 thread_pool
.SetHostEnv(env_
);
306 if (threads_num
> 1) {
307 thread_pool
.SetBackgroundThreads(static_cast<int>(threads_num
));
309 thread_pool
.SetBackgroundThreads(1);
312 std::chrono::system_clock::time_point replay_epoch
=
313 std::chrono::system_clock::now();
314 WriteOptions woptions
;
315 ReadOptions roptions
;
318 std::unique_ptr
<ReplayerWorkerArg
> ra(new ReplayerWorkerArg
);
320 s
= ReadTrace(&(ra
->trace_entry
));
324 ra
->woptions
= woptions
;
325 ra
->roptions
= roptions
;
327 std::this_thread::sleep_until(
328 replay_epoch
+ std::chrono::microseconds(
329 (ra
->trace_entry
.ts
- header
.ts
) / fast_forward_
));
330 if (ra
->trace_entry
.type
== kTraceWrite
) {
331 thread_pool
.Schedule(&Replayer::BGWorkWriteBatch
, ra
.release(), nullptr,
334 } else if (ra
->trace_entry
.type
== kTraceGet
) {
335 thread_pool
.Schedule(&Replayer::BGWorkGet
, ra
.release(), nullptr,
338 } else if (ra
->trace_entry
.type
== kTraceIteratorSeek
) {
339 thread_pool
.Schedule(&Replayer::BGWorkIterSeek
, ra
.release(), nullptr,
342 } else if (ra
->trace_entry
.type
== kTraceIteratorSeekForPrev
) {
343 thread_pool
.Schedule(&Replayer::BGWorkIterSeekForPrev
, ra
.release(),
346 } else if (ra
->trace_entry
.type
== kTraceEnd
) {
347 // Do nothing for now.
348 // TODO: Add some validations later.
351 // Other trace entry types that are not implemented for replay.
352 // To finish the replay, we continue the process.
357 if (s
.IsIncomplete()) {
358 // Reaching eof returns Incomplete status at the moment.
359 // Could happen when killing a process without calling EndTrace() API.
360 // TODO: Add better error handling.
363 thread_pool
.JoinAllThreads();
367 Status
Replayer::ReadHeader(Trace
* header
) {
368 assert(header
!= nullptr);
369 Status s
= ReadTrace(header
);
373 if (header
->type
!= kTraceBegin
) {
374 return Status::Corruption("Corrupted trace file. Incorrect header.");
376 if (header
->payload
.substr(0, kTraceMagic
.length()) != kTraceMagic
) {
377 return Status::Corruption("Corrupted trace file. Incorrect magic.");
383 Status
Replayer::ReadFooter(Trace
* footer
) {
384 assert(footer
!= nullptr);
385 Status s
= ReadTrace(footer
);
389 if (footer
->type
!= kTraceEnd
) {
390 return Status::Corruption("Corrupted trace file. Incorrect footer.");
393 // TODO: Add more validations later
397 Status
Replayer::ReadTrace(Trace
* trace
) {
398 assert(trace
!= nullptr);
399 std::string encoded_trace
;
400 Status s
= trace_reader_
->Read(&encoded_trace
);
404 return TracerHelper::DecodeTrace(encoded_trace
, trace
);
407 void Replayer::BGWorkGet(void* arg
) {
408 std::unique_ptr
<ReplayerWorkerArg
> ra(
409 reinterpret_cast<ReplayerWorkerArg
*>(arg
));
410 auto cf_map
= static_cast<std::unordered_map
<uint32_t, ColumnFamilyHandle
*>*>(
414 DecodeCFAndKey(ra
->trace_entry
.payload
, &cf_id
, &key
);
415 if (cf_id
> 0 && cf_map
->find(cf_id
) == cf_map
->end()) {
421 ra
->db
->Get(ra
->roptions
, key
, &value
);
423 ra
->db
->Get(ra
->roptions
, (*cf_map
)[cf_id
], key
, &value
);
429 void Replayer::BGWorkWriteBatch(void* arg
) {
430 std::unique_ptr
<ReplayerWorkerArg
> ra(
431 reinterpret_cast<ReplayerWorkerArg
*>(arg
));
432 WriteBatch
batch(ra
->trace_entry
.payload
);
433 ra
->db
->Write(ra
->woptions
, &batch
);
437 void Replayer::BGWorkIterSeek(void* arg
) {
438 std::unique_ptr
<ReplayerWorkerArg
> ra(
439 reinterpret_cast<ReplayerWorkerArg
*>(arg
));
440 auto cf_map
= static_cast<std::unordered_map
<uint32_t, ColumnFamilyHandle
*>*>(
444 DecodeCFAndKey(ra
->trace_entry
.payload
, &cf_id
, &key
);
445 if (cf_id
> 0 && cf_map
->find(cf_id
) == cf_map
->end()) {
450 Iterator
* single_iter
= nullptr;
452 single_iter
= ra
->db
->NewIterator(ra
->roptions
);
454 single_iter
= ra
->db
->NewIterator(ra
->roptions
, (*cf_map
)[cf_id
]);
456 single_iter
->Seek(key
);
461 void Replayer::BGWorkIterSeekForPrev(void* arg
) {
462 std::unique_ptr
<ReplayerWorkerArg
> ra(
463 reinterpret_cast<ReplayerWorkerArg
*>(arg
));
464 auto cf_map
= static_cast<std::unordered_map
<uint32_t, ColumnFamilyHandle
*>*>(
468 DecodeCFAndKey(ra
->trace_entry
.payload
, &cf_id
, &key
);
469 if (cf_id
> 0 && cf_map
->find(cf_id
) == cf_map
->end()) {
474 Iterator
* single_iter
= nullptr;
476 single_iter
= ra
->db
->NewIterator(ra
->roptions
);
478 single_iter
= ra
->db
->NewIterator(ra
->roptions
, (*cf_map
)[cf_id
]);
480 single_iter
->SeekForPrev(key
);
485 } // namespace ROCKSDB_NAMESPACE