1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/trace/replayer_impl.h"
13 #include "rocksdb/options.h"
14 #include "rocksdb/slice.h"
15 #include "rocksdb/system_clock.h"
16 #include "util/threadpool_imp.h"
18 namespace ROCKSDB_NAMESPACE
{
20 ReplayerImpl::ReplayerImpl(DB
* db
,
21 const std::vector
<ColumnFamilyHandle
*>& handles
,
22 std::unique_ptr
<TraceReader
>&& reader
)
24 trace_reader_(std::move(reader
)),
28 exec_handler_(TraceRecord::NewExecutionHandler(db
, handles
)),
30 trace_file_version_(-1) {}
32 ReplayerImpl::~ReplayerImpl() {
33 exec_handler_
.reset();
34 trace_reader_
.reset();
37 Status
ReplayerImpl::Prepare() {
40 Status s
= ReadHeader(&header
);
44 s
= TracerHelper::ParseTraceHeader(header
, &trace_file_version_
, &db_version
);
48 header_ts_
= header
.ts
;
54 Status
ReplayerImpl::Next(std::unique_ptr
<TraceRecord
>* record
) {
56 return Status::Incomplete("Not prepared!");
59 return Status::Incomplete("Trace end.");
63 Status s
= ReadTrace(&trace
); // ReadTrace is atomic
64 // Reached the trace end.
65 if (s
.ok() && trace
.type
== kTraceEnd
) {
67 return Status::Incomplete("Trace end.");
69 if (!s
.ok() || record
== nullptr) {
73 return TracerHelper::DecodeTraceRecord(&trace
, trace_file_version_
, record
);
76 Status
ReplayerImpl::Execute(const std::unique_ptr
<TraceRecord
>& record
,
77 std::unique_ptr
<TraceRecordResult
>* result
) {
78 return record
->Accept(exec_handler_
.get(), result
);
81 Status
ReplayerImpl::Replay(
82 const ReplayOptions
& options
,
83 const std::function
<void(Status
, std::unique_ptr
<TraceRecordResult
>&&)>&
85 if (options
.fast_forward
<= 0.0) {
86 return Status::InvalidArgument("Wrong fast forward speed!");
90 return Status::Incomplete("Not prepared!");
93 return Status::Incomplete("Trace end.");
96 Status s
= Status::OK();
98 if (options
.num_threads
<= 1) {
99 // num_threads == 0 or num_threads == 1 uses single thread.
100 std::chrono::system_clock::time_point replay_epoch
=
101 std::chrono::system_clock::now();
105 s
= ReadTrace(&trace
);
106 // If already at trace end, ReadTrace should return Status::Incomplete().
111 // No need to sleep before breaking the loop if at the trace end.
112 if (trace
.type
== kTraceEnd
) {
114 s
= Status::Incomplete("Trace end.");
118 // In single-threaded replay, decode first then sleep.
119 std::unique_ptr
<TraceRecord
> record
;
120 s
= TracerHelper::DecodeTraceRecord(&trace
, trace_file_version_
, &record
);
121 if (!s
.ok() && !s
.IsNotSupported()) {
125 std::chrono::system_clock::time_point sleep_to
=
127 std::chrono::microseconds(static_cast<uint64_t>(std::llround(
128 1.0 * (trace
.ts
- header_ts_
) / options
.fast_forward
)));
129 if (sleep_to
> std::chrono::system_clock::now()) {
130 std::this_thread::sleep_until(sleep_to
);
133 // Skip unsupported traces, stop for other errors.
134 if (s
.IsNotSupported()) {
135 if (result_callback
!= nullptr) {
136 result_callback(s
, nullptr);
142 if (result_callback
== nullptr) {
143 s
= Execute(record
, nullptr);
145 std::unique_ptr
<TraceRecordResult
> res
;
146 s
= Execute(record
, &res
);
147 result_callback(s
, std::move(res
));
151 // Multi-threaded replay.
152 ThreadPoolImpl thread_pool
;
153 thread_pool
.SetHostEnv(env_
);
154 thread_pool
.SetBackgroundThreads(static_cast<int>(options
.num_threads
));
157 // Background decoding and execution status.
158 Status bg_s
= Status::OK();
159 uint64_t last_err_ts
= static_cast<uint64_t>(-1);
160 // Callback function used in background work to update bg_s for the ealiest
161 // TraceRecord which has execution error. This is different from the
162 // timestamp of the first execution error (either start or end timestamp).
164 // Suppose TraceRecord R1, R2, with timestamps T1 < T2. Their execution
165 // timestamps are T1_start, T1_end, T2_start, T2_end.
166 // Single-thread: there must be T1_start < T1_end < T2_start < T2_end.
167 // Multi-thread: T1_start < T2_start may not be enforced. Orders of them are
169 // In order to report the same `first` error in both single-thread and
170 // multi-thread replay, we can only rely on the TraceRecords' timestamps,
171 // rather than their executin timestamps. Although in single-thread replay,
172 // the first error is also the last error, while in multi-thread replay, the
173 // first error may not be the first error in execution, and it may not be
174 // the last error in exeution as well.
175 auto error_cb
= [&mtx
, &bg_s
, &last_err_ts
](Status err
, uint64_t err_ts
) {
176 std::lock_guard
<std::mutex
> gd(mtx
);
177 // Only record the first error.
178 if (!err
.ok() && !err
.IsNotSupported() && err_ts
< last_err_ts
) {
180 last_err_ts
= err_ts
;
184 std::chrono::system_clock::time_point replay_epoch
=
185 std::chrono::system_clock::now();
187 while (bg_s
.ok() && s
.ok()) {
189 s
= ReadTrace(&trace
);
190 // If already at trace end, ReadTrace should return Status::Incomplete().
195 TraceType trace_type
= trace
.type
;
197 // No need to sleep before breaking the loop if at the trace end.
198 if (trace_type
== kTraceEnd
) {
200 s
= Status::Incomplete("Trace end.");
204 // In multi-threaded replay, sleep first then start decoding and
205 // execution in a thread.
206 std::chrono::system_clock::time_point sleep_to
=
208 std::chrono::microseconds(static_cast<uint64_t>(std::llround(
209 1.0 * (trace
.ts
- header_ts_
) / options
.fast_forward
)));
210 if (sleep_to
> std::chrono::system_clock::now()) {
211 std::this_thread::sleep_until(sleep_to
);
214 if (trace_type
== kTraceWrite
|| trace_type
== kTraceGet
||
215 trace_type
== kTraceIteratorSeek
||
216 trace_type
== kTraceIteratorSeekForPrev
||
217 trace_type
== kTraceMultiGet
) {
218 std::unique_ptr
<ReplayerWorkerArg
> ra(new ReplayerWorkerArg
);
219 ra
->trace_entry
= std::move(trace
);
220 ra
->handler
= exec_handler_
.get();
221 ra
->trace_file_version
= trace_file_version_
;
222 ra
->error_cb
= error_cb
;
223 ra
->result_cb
= result_callback
;
224 thread_pool
.Schedule(&ReplayerImpl::BackgroundWork
, ra
.release(),
227 // Skip unsupported traces.
228 if (result_callback
!= nullptr) {
229 result_callback(Status::NotSupported("Unsupported trace type."),
235 thread_pool
.WaitForJobsAndJoinAllThreads();
241 if (s
.IsIncomplete()) {
242 // Reaching eof returns Incomplete status at the moment.
243 // Could happen when killing a process without calling EndTrace() API.
244 // TODO: Add better error handling.
251 uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_
; }
253 Status
ReplayerImpl::ReadHeader(Trace
* header
) {
254 assert(header
!= nullptr);
255 Status s
= trace_reader_
->Reset();
259 std::string encoded_trace
;
260 // Read the trace head
261 s
= trace_reader_
->Read(&encoded_trace
);
266 return TracerHelper::DecodeHeader(encoded_trace
, header
);
269 Status
ReplayerImpl::ReadTrace(Trace
* trace
) {
270 assert(trace
!= nullptr);
271 std::string encoded_trace
;
272 // We don't know if TraceReader is implemented thread-safe, so we protect the
273 // reading trace part with a mutex. The decoding part does not need to be
274 // protected since it's local.
276 std::lock_guard
<std::mutex
> guard(mutex_
);
277 Status s
= trace_reader_
->Read(&encoded_trace
);
282 return TracerHelper::DecodeTrace(encoded_trace
, trace
);
285 void ReplayerImpl::BackgroundWork(void* arg
) {
286 std::unique_ptr
<ReplayerWorkerArg
> ra(
287 reinterpret_cast<ReplayerWorkerArg
*>(arg
));
288 assert(ra
!= nullptr);
290 std::unique_ptr
<TraceRecord
> record
;
291 Status s
= TracerHelper::DecodeTraceRecord(&(ra
->trace_entry
),
292 ra
->trace_file_version
, &record
);
295 if (ra
->error_cb
!= nullptr) {
296 ra
->error_cb(s
, ra
->trace_entry
.ts
);
299 if (ra
->result_cb
!= nullptr) {
300 ra
->result_cb(s
, nullptr);
305 if (ra
->result_cb
== nullptr) {
306 s
= record
->Accept(ra
->handler
, nullptr);
308 std::unique_ptr
<TraceRecordResult
> res
;
309 s
= record
->Accept(ra
->handler
, &res
);
310 ra
->result_cb(s
, std::move(res
));
315 } // namespace ROCKSDB_NAMESPACE
316 #endif // ROCKSDB_LITE