1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "trace_replay/trace_replay.h"
11 #include "db/db_impl/db_impl.h"
12 #include "rocksdb/slice.h"
13 #include "rocksdb/write_batch.h"
14 #include "util/coding.h"
15 #include "util/string_util.h"
16 #include "util/threadpool_imp.h"
18 namespace ROCKSDB_NAMESPACE
{
20 const std::string kTraceMagic
= "feedcafedeadbeef";
23 void EncodeCFAndKey(std::string
* dst
, uint32_t cf_id
, const Slice
& key
) {
24 PutFixed32(dst
, cf_id
);
25 PutLengthPrefixedSlice(dst
, key
);
28 void DecodeCFAndKey(std::string
& buffer
, uint32_t* cf_id
, Slice
* key
) {
30 GetFixed32(&buf
, cf_id
);
31 GetLengthPrefixedSlice(&buf
, key
);
35 void TracerHelper::EncodeTrace(const Trace
& trace
, std::string
* encoded_trace
) {
36 assert(encoded_trace
);
37 PutFixed64(encoded_trace
, trace
.ts
);
38 encoded_trace
->push_back(trace
.type
);
39 PutFixed32(encoded_trace
, static_cast<uint32_t>(trace
.payload
.size()));
40 encoded_trace
->append(trace
.payload
);
43 Status
TracerHelper::DecodeTrace(const std::string
& encoded_trace
,
45 assert(trace
!= nullptr);
46 Slice enc_slice
= Slice(encoded_trace
);
47 if (!GetFixed64(&enc_slice
, &trace
->ts
)) {
48 return Status::Incomplete("Decode trace string failed");
50 if (enc_slice
.size() < kTraceTypeSize
+ kTracePayloadLengthSize
) {
51 return Status::Incomplete("Decode trace string failed");
53 trace
->type
= static_cast<TraceType
>(enc_slice
[0]);
54 enc_slice
.remove_prefix(kTraceTypeSize
+ kTracePayloadLengthSize
);
55 trace
->payload
= enc_slice
.ToString();
59 Tracer::Tracer(Env
* env
, const TraceOptions
& trace_options
,
60 std::unique_ptr
<TraceWriter
>&& trace_writer
)
62 trace_options_(trace_options
),
63 trace_writer_(std::move(trace_writer
)),
64 trace_request_count_ (0) {
65 // TODO: What if this fails?
66 WriteHeader().PermitUncheckedError();
69 Tracer::~Tracer() { trace_writer_
.reset(); }
71 Status
Tracer::Write(WriteBatch
* write_batch
) {
72 TraceType trace_type
= kTraceWrite
;
73 if (ShouldSkipTrace(trace_type
)) {
77 trace
.ts
= env_
->NowMicros();
78 trace
.type
= trace_type
;
79 trace
.payload
= write_batch
->Data();
80 return WriteTrace(trace
);
83 Status
Tracer::Get(ColumnFamilyHandle
* column_family
, const Slice
& key
) {
84 TraceType trace_type
= kTraceGet
;
85 if (ShouldSkipTrace(trace_type
)) {
89 trace
.ts
= env_
->NowMicros();
90 trace
.type
= trace_type
;
91 EncodeCFAndKey(&trace
.payload
, column_family
->GetID(), key
);
92 return WriteTrace(trace
);
95 Status
Tracer::IteratorSeek(const uint32_t& cf_id
, const Slice
& key
) {
96 TraceType trace_type
= kTraceIteratorSeek
;
97 if (ShouldSkipTrace(trace_type
)) {
101 trace
.ts
= env_
->NowMicros();
102 trace
.type
= trace_type
;
103 EncodeCFAndKey(&trace
.payload
, cf_id
, key
);
104 return WriteTrace(trace
);
107 Status
Tracer::IteratorSeekForPrev(const uint32_t& cf_id
, const Slice
& key
) {
108 TraceType trace_type
= kTraceIteratorSeekForPrev
;
109 if (ShouldSkipTrace(trace_type
)) {
113 trace
.ts
= env_
->NowMicros();
114 trace
.type
= trace_type
;
115 EncodeCFAndKey(&trace
.payload
, cf_id
, key
);
116 return WriteTrace(trace
);
119 bool Tracer::ShouldSkipTrace(const TraceType
& trace_type
) {
120 if (IsTraceFileOverMax()) {
123 if ((trace_options_
.filter
& kTraceFilterGet
124 && trace_type
== kTraceGet
)
125 || (trace_options_
.filter
& kTraceFilterWrite
126 && trace_type
== kTraceWrite
)) {
129 ++trace_request_count_
;
130 if (trace_request_count_
< trace_options_
.sampling_frequency
) {
133 trace_request_count_
= 0;
137 bool Tracer::IsTraceFileOverMax() {
138 uint64_t trace_file_size
= trace_writer_
->GetFileSize();
139 return (trace_file_size
> trace_options_
.max_trace_file_size
);
142 Status
Tracer::WriteHeader() {
143 std::ostringstream s
;
144 s
<< kTraceMagic
<< "\t"
145 << "Trace Version: 0.1\t"
146 << "RocksDB Version: " << kMajorVersion
<< "." << kMinorVersion
<< "\t"
147 << "Format: Timestamp OpType Payload\n";
148 std::string
header(s
.str());
151 trace
.ts
= env_
->NowMicros();
152 trace
.type
= kTraceBegin
;
153 trace
.payload
= header
;
154 return WriteTrace(trace
);
157 Status
Tracer::WriteFooter() {
159 trace
.ts
= env_
->NowMicros();
160 trace
.type
= kTraceEnd
;
162 return WriteTrace(trace
);
165 Status
Tracer::WriteTrace(const Trace
& trace
) {
166 std::string encoded_trace
;
167 TracerHelper::EncodeTrace(trace
, &encoded_trace
);
168 return trace_writer_
->Write(Slice(encoded_trace
));
171 Status
Tracer::Close() { return WriteFooter(); }
173 Replayer::Replayer(DB
* db
, const std::vector
<ColumnFamilyHandle
*>& handles
,
174 std::unique_ptr
<TraceReader
>&& reader
)
175 : trace_reader_(std::move(reader
)) {
176 assert(db
!= nullptr);
177 db_
= static_cast<DBImpl
*>(db
->GetRootDB());
178 env_
= Env::Default();
179 for (ColumnFamilyHandle
* cfh
: handles
) {
180 cf_map_
[cfh
->GetID()] = cfh
;
185 Replayer::~Replayer() { trace_reader_
.reset(); }
187 Status
Replayer::SetFastForward(uint32_t fast_forward
) {
189 if (fast_forward
< 1) {
190 s
= Status::InvalidArgument("Wrong fast forward speed!");
192 fast_forward_
= fast_forward
;
198 Status
Replayer::Replay() {
201 s
= ReadHeader(&header
);
206 std::chrono::system_clock::time_point replay_epoch
=
207 std::chrono::system_clock::now();
208 WriteOptions woptions
;
209 ReadOptions roptions
;
212 Iterator
* single_iter
= nullptr;
215 s
= ReadTrace(&trace
);
220 std::this_thread::sleep_until(
222 std::chrono::microseconds((trace
.ts
- header
.ts
) / fast_forward_
));
223 if (trace
.type
== kTraceWrite
) {
224 WriteBatch
batch(trace
.payload
);
225 db_
->Write(woptions
, &batch
);
227 } else if (trace
.type
== kTraceGet
) {
230 DecodeCFAndKey(trace
.payload
, &cf_id
, &key
);
231 if (cf_id
> 0 && cf_map_
.find(cf_id
) == cf_map_
.end()) {
232 return Status::Corruption("Invalid Column Family ID.");
237 db_
->Get(roptions
, key
, &value
);
239 db_
->Get(roptions
, cf_map_
[cf_id
], key
, &value
);
242 } else if (trace
.type
== kTraceIteratorSeek
) {
245 DecodeCFAndKey(trace
.payload
, &cf_id
, &key
);
246 if (cf_id
> 0 && cf_map_
.find(cf_id
) == cf_map_
.end()) {
247 return Status::Corruption("Invalid Column Family ID.");
251 single_iter
= db_
->NewIterator(roptions
);
253 single_iter
= db_
->NewIterator(roptions
, cf_map_
[cf_id
]);
255 single_iter
->Seek(key
);
258 } else if (trace
.type
== kTraceIteratorSeekForPrev
) {
259 // Currently, only support to call the Seek()
262 DecodeCFAndKey(trace
.payload
, &cf_id
, &key
);
263 if (cf_id
> 0 && cf_map_
.find(cf_id
) == cf_map_
.end()) {
264 return Status::Corruption("Invalid Column Family ID.");
268 single_iter
= db_
->NewIterator(roptions
);
270 single_iter
= db_
->NewIterator(roptions
, cf_map_
[cf_id
]);
272 single_iter
->SeekForPrev(key
);
275 } else if (trace
.type
== kTraceEnd
) {
276 // Do nothing for now.
277 // TODO: Add some validations later.
282 if (s
.IsIncomplete()) {
283 // Reaching eof returns Incomplete status at the moment.
284 // Could happen when killing a process without calling EndTrace() API.
285 // TODO: Add better error handling.
291 // The trace can be replayed with multithread by configurnge the number of
292 // threads in the thread pool. Trace records are read from the trace file
293 // sequentially and the corresponding queries are scheduled in the task
294 // queue based on the timestamp. Currently, we support Write_batch (Put,
295 // Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
296 Status
Replayer::MultiThreadReplay(uint32_t threads_num
) {
299 s
= ReadHeader(&header
);
304 ThreadPoolImpl thread_pool
;
305 thread_pool
.SetHostEnv(env_
);
307 if (threads_num
> 1) {
308 thread_pool
.SetBackgroundThreads(static_cast<int>(threads_num
));
310 thread_pool
.SetBackgroundThreads(1);
313 std::chrono::system_clock::time_point replay_epoch
=
314 std::chrono::system_clock::now();
315 WriteOptions woptions
;
316 ReadOptions roptions
;
319 std::unique_ptr
<ReplayerWorkerArg
> ra(new ReplayerWorkerArg
);
321 s
= ReadTrace(&(ra
->trace_entry
));
325 ra
->cf_map
= &cf_map_
;
326 ra
->woptions
= woptions
;
327 ra
->roptions
= roptions
;
329 std::this_thread::sleep_until(
330 replay_epoch
+ std::chrono::microseconds(
331 (ra
->trace_entry
.ts
- header
.ts
) / fast_forward_
));
332 if (ra
->trace_entry
.type
== kTraceWrite
) {
333 thread_pool
.Schedule(&Replayer::BGWorkWriteBatch
, ra
.release(), nullptr,
336 } else if (ra
->trace_entry
.type
== kTraceGet
) {
337 thread_pool
.Schedule(&Replayer::BGWorkGet
, ra
.release(), nullptr,
340 } else if (ra
->trace_entry
.type
== kTraceIteratorSeek
) {
341 thread_pool
.Schedule(&Replayer::BGWorkIterSeek
, ra
.release(), nullptr,
344 } else if (ra
->trace_entry
.type
== kTraceIteratorSeekForPrev
) {
345 thread_pool
.Schedule(&Replayer::BGWorkIterSeekForPrev
, ra
.release(),
348 } else if (ra
->trace_entry
.type
== kTraceEnd
) {
349 // Do nothing for now.
350 // TODO: Add some validations later.
353 // Other trace entry types that are not implemented for replay.
354 // To finish the replay, we continue the process.
359 if (s
.IsIncomplete()) {
360 // Reaching eof returns Incomplete status at the moment.
361 // Could happen when killing a process without calling EndTrace() API.
362 // TODO: Add better error handling.
365 thread_pool
.JoinAllThreads();
369 Status
Replayer::ReadHeader(Trace
* header
) {
370 assert(header
!= nullptr);
371 Status s
= ReadTrace(header
);
375 if (header
->type
!= kTraceBegin
) {
376 return Status::Corruption("Corrupted trace file. Incorrect header.");
378 if (header
->payload
.substr(0, kTraceMagic
.length()) != kTraceMagic
) {
379 return Status::Corruption("Corrupted trace file. Incorrect magic.");
385 Status
Replayer::ReadFooter(Trace
* footer
) {
386 assert(footer
!= nullptr);
387 Status s
= ReadTrace(footer
);
391 if (footer
->type
!= kTraceEnd
) {
392 return Status::Corruption("Corrupted trace file. Incorrect footer.");
395 // TODO: Add more validations later
399 Status
Replayer::ReadTrace(Trace
* trace
) {
400 assert(trace
!= nullptr);
401 std::string encoded_trace
;
402 Status s
= trace_reader_
->Read(&encoded_trace
);
406 return TracerHelper::DecodeTrace(encoded_trace
, trace
);
409 void Replayer::BGWorkGet(void* arg
) {
410 std::unique_ptr
<ReplayerWorkerArg
> ra(
411 reinterpret_cast<ReplayerWorkerArg
*>(arg
));
412 assert(ra
!= nullptr);
413 auto cf_map
= static_cast<std::unordered_map
<uint32_t, ColumnFamilyHandle
*>*>(
417 DecodeCFAndKey(ra
->trace_entry
.payload
, &cf_id
, &key
);
418 if (cf_id
> 0 && cf_map
->find(cf_id
) == cf_map
->end()) {
424 ra
->db
->Get(ra
->roptions
, key
, &value
);
426 ra
->db
->Get(ra
->roptions
, (*cf_map
)[cf_id
], key
, &value
);
432 void Replayer::BGWorkWriteBatch(void* arg
) {
433 std::unique_ptr
<ReplayerWorkerArg
> ra(
434 reinterpret_cast<ReplayerWorkerArg
*>(arg
));
435 assert(ra
!= nullptr);
436 WriteBatch
batch(ra
->trace_entry
.payload
);
437 ra
->db
->Write(ra
->woptions
, &batch
);
441 void Replayer::BGWorkIterSeek(void* arg
) {
442 std::unique_ptr
<ReplayerWorkerArg
> ra(
443 reinterpret_cast<ReplayerWorkerArg
*>(arg
));
444 assert(ra
!= nullptr);
445 auto cf_map
= static_cast<std::unordered_map
<uint32_t, ColumnFamilyHandle
*>*>(
449 DecodeCFAndKey(ra
->trace_entry
.payload
, &cf_id
, &key
);
450 if (cf_id
> 0 && cf_map
->find(cf_id
) == cf_map
->end()) {
455 Iterator
* single_iter
= nullptr;
457 single_iter
= ra
->db
->NewIterator(ra
->roptions
);
459 single_iter
= ra
->db
->NewIterator(ra
->roptions
, (*cf_map
)[cf_id
]);
461 single_iter
->Seek(key
);
466 void Replayer::BGWorkIterSeekForPrev(void* arg
) {
467 std::unique_ptr
<ReplayerWorkerArg
> ra(
468 reinterpret_cast<ReplayerWorkerArg
*>(arg
));
469 assert(ra
!= nullptr);
470 auto cf_map
= static_cast<std::unordered_map
<uint32_t, ColumnFamilyHandle
*>*>(
474 DecodeCFAndKey(ra
->trace_entry
.payload
, &cf_id
, &key
);
475 if (cf_id
> 0 && cf_map
->find(cf_id
) == cf_map
->end()) {
480 Iterator
* single_iter
= nullptr;
482 single_iter
= ra
->db
->NewIterator(ra
->roptions
);
484 single_iter
= ra
->db
->NewIterator(ra
->roptions
, (*cf_map
)[cf_id
]);
486 single_iter
->SeekForPrev(key
);
491 } // namespace ROCKSDB_NAMESPACE