]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/trace/replayer_impl.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / trace / replayer_impl.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/trace/replayer_impl.h"
9
10 #include <cmath>
11 #include <thread>
12
13 #include "rocksdb/options.h"
14 #include "rocksdb/slice.h"
15 #include "rocksdb/system_clock.h"
16 #include "util/threadpool_imp.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
20 ReplayerImpl::ReplayerImpl(DB* db,
21 const std::vector<ColumnFamilyHandle*>& handles,
22 std::unique_ptr<TraceReader>&& reader)
23 : Replayer(),
24 trace_reader_(std::move(reader)),
25 prepared_(false),
26 trace_end_(false),
27 header_ts_(0),
28 exec_handler_(TraceRecord::NewExecutionHandler(db, handles)),
29 env_(db->GetEnv()),
30 trace_file_version_(-1) {}
31
32 ReplayerImpl::~ReplayerImpl() {
33 exec_handler_.reset();
34 trace_reader_.reset();
35 }
36
37 Status ReplayerImpl::Prepare() {
38 Trace header;
39 int db_version;
40 Status s = ReadHeader(&header);
41 if (!s.ok()) {
42 return s;
43 }
44 s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
45 if (!s.ok()) {
46 return s;
47 }
48 header_ts_ = header.ts;
49 prepared_ = true;
50 trace_end_ = false;
51 return Status::OK();
52 }
53
54 Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
55 if (!prepared_) {
56 return Status::Incomplete("Not prepared!");
57 }
58 if (trace_end_) {
59 return Status::Incomplete("Trace end.");
60 }
61
62 Trace trace;
63 Status s = ReadTrace(&trace); // ReadTrace is atomic
64 // Reached the trace end.
65 if (s.ok() && trace.type == kTraceEnd) {
66 trace_end_ = true;
67 return Status::Incomplete("Trace end.");
68 }
69 if (!s.ok() || record == nullptr) {
70 return s;
71 }
72
73 return TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, record);
74 }
75
76 Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record,
77 std::unique_ptr<TraceRecordResult>* result) {
78 return record->Accept(exec_handler_.get(), result);
79 }
80
81 Status ReplayerImpl::Replay(
82 const ReplayOptions& options,
83 const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
84 result_callback) {
85 if (options.fast_forward <= 0.0) {
86 return Status::InvalidArgument("Wrong fast forward speed!");
87 }
88
89 if (!prepared_) {
90 return Status::Incomplete("Not prepared!");
91 }
92 if (trace_end_) {
93 return Status::Incomplete("Trace end.");
94 }
95
96 Status s = Status::OK();
97
98 if (options.num_threads <= 1) {
99 // num_threads == 0 or num_threads == 1 uses single thread.
100 std::chrono::system_clock::time_point replay_epoch =
101 std::chrono::system_clock::now();
102
103 while (s.ok()) {
104 Trace trace;
105 s = ReadTrace(&trace);
106 // If already at trace end, ReadTrace should return Status::Incomplete().
107 if (!s.ok()) {
108 break;
109 }
110
111 // No need to sleep before breaking the loop if at the trace end.
112 if (trace.type == kTraceEnd) {
113 trace_end_ = true;
114 s = Status::Incomplete("Trace end.");
115 break;
116 }
117
118 // In single-threaded replay, decode first then sleep.
119 std::unique_ptr<TraceRecord> record;
120 s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, &record);
121 if (!s.ok() && !s.IsNotSupported()) {
122 break;
123 }
124
125 std::chrono::system_clock::time_point sleep_to =
126 replay_epoch +
127 std::chrono::microseconds(static_cast<uint64_t>(std::llround(
128 1.0 * (trace.ts - header_ts_) / options.fast_forward)));
129 if (sleep_to > std::chrono::system_clock::now()) {
130 std::this_thread::sleep_until(sleep_to);
131 }
132
133 // Skip unsupported traces, stop for other errors.
134 if (s.IsNotSupported()) {
135 if (result_callback != nullptr) {
136 result_callback(s, nullptr);
137 }
138 s = Status::OK();
139 continue;
140 }
141
142 if (result_callback == nullptr) {
143 s = Execute(record, nullptr);
144 } else {
145 std::unique_ptr<TraceRecordResult> res;
146 s = Execute(record, &res);
147 result_callback(s, std::move(res));
148 }
149 }
150 } else {
151 // Multi-threaded replay.
152 ThreadPoolImpl thread_pool;
153 thread_pool.SetHostEnv(env_);
154 thread_pool.SetBackgroundThreads(static_cast<int>(options.num_threads));
155
156 std::mutex mtx;
157 // Background decoding and execution status.
158 Status bg_s = Status::OK();
159 uint64_t last_err_ts = static_cast<uint64_t>(-1);
160 // Callback function used in background work to update bg_s for the ealiest
161 // TraceRecord which has execution error. This is different from the
162 // timestamp of the first execution error (either start or end timestamp).
163 //
164 // Suppose TraceRecord R1, R2, with timestamps T1 < T2. Their execution
165 // timestamps are T1_start, T1_end, T2_start, T2_end.
166 // Single-thread: there must be T1_start < T1_end < T2_start < T2_end.
167 // Multi-thread: T1_start < T2_start may not be enforced. Orders of them are
168 // totally unknown.
169 // In order to report the same `first` error in both single-thread and
170 // multi-thread replay, we can only rely on the TraceRecords' timestamps,
171 // rather than their executin timestamps. Although in single-thread replay,
172 // the first error is also the last error, while in multi-thread replay, the
173 // first error may not be the first error in execution, and it may not be
174 // the last error in exeution as well.
175 auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) {
176 std::lock_guard<std::mutex> gd(mtx);
177 // Only record the first error.
178 if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) {
179 bg_s = err;
180 last_err_ts = err_ts;
181 }
182 };
183
184 std::chrono::system_clock::time_point replay_epoch =
185 std::chrono::system_clock::now();
186
187 while (bg_s.ok() && s.ok()) {
188 Trace trace;
189 s = ReadTrace(&trace);
190 // If already at trace end, ReadTrace should return Status::Incomplete().
191 if (!s.ok()) {
192 break;
193 }
194
195 TraceType trace_type = trace.type;
196
197 // No need to sleep before breaking the loop if at the trace end.
198 if (trace_type == kTraceEnd) {
199 trace_end_ = true;
200 s = Status::Incomplete("Trace end.");
201 break;
202 }
203
204 // In multi-threaded replay, sleep first then start decoding and
205 // execution in a thread.
206 std::chrono::system_clock::time_point sleep_to =
207 replay_epoch +
208 std::chrono::microseconds(static_cast<uint64_t>(std::llround(
209 1.0 * (trace.ts - header_ts_) / options.fast_forward)));
210 if (sleep_to > std::chrono::system_clock::now()) {
211 std::this_thread::sleep_until(sleep_to);
212 }
213
214 if (trace_type == kTraceWrite || trace_type == kTraceGet ||
215 trace_type == kTraceIteratorSeek ||
216 trace_type == kTraceIteratorSeekForPrev ||
217 trace_type == kTraceMultiGet) {
218 std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
219 ra->trace_entry = std::move(trace);
220 ra->handler = exec_handler_.get();
221 ra->trace_file_version = trace_file_version_;
222 ra->error_cb = error_cb;
223 ra->result_cb = result_callback;
224 thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
225 nullptr, nullptr);
226 } else {
227 // Skip unsupported traces.
228 if (result_callback != nullptr) {
229 result_callback(Status::NotSupported("Unsupported trace type."),
230 nullptr);
231 }
232 }
233 }
234
235 thread_pool.WaitForJobsAndJoinAllThreads();
236 if (!bg_s.ok()) {
237 s = bg_s;
238 }
239 }
240
241 if (s.IsIncomplete()) {
242 // Reaching eof returns Incomplete status at the moment.
243 // Could happen when killing a process without calling EndTrace() API.
244 // TODO: Add better error handling.
245 trace_end_ = true;
246 return Status::OK();
247 }
248 return s;
249 }
250
251 uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; }
252
253 Status ReplayerImpl::ReadHeader(Trace* header) {
254 assert(header != nullptr);
255 Status s = trace_reader_->Reset();
256 if (!s.ok()) {
257 return s;
258 }
259 std::string encoded_trace;
260 // Read the trace head
261 s = trace_reader_->Read(&encoded_trace);
262 if (!s.ok()) {
263 return s;
264 }
265
266 return TracerHelper::DecodeHeader(encoded_trace, header);
267 }
268
269 Status ReplayerImpl::ReadTrace(Trace* trace) {
270 assert(trace != nullptr);
271 std::string encoded_trace;
272 // We don't know if TraceReader is implemented thread-safe, so we protect the
273 // reading trace part with a mutex. The decoding part does not need to be
274 // protected since it's local.
275 {
276 std::lock_guard<std::mutex> guard(mutex_);
277 Status s = trace_reader_->Read(&encoded_trace);
278 if (!s.ok()) {
279 return s;
280 }
281 }
282 return TracerHelper::DecodeTrace(encoded_trace, trace);
283 }
284
285 void ReplayerImpl::BackgroundWork(void* arg) {
286 std::unique_ptr<ReplayerWorkerArg> ra(
287 reinterpret_cast<ReplayerWorkerArg*>(arg));
288 assert(ra != nullptr);
289
290 std::unique_ptr<TraceRecord> record;
291 Status s = TracerHelper::DecodeTraceRecord(&(ra->trace_entry),
292 ra->trace_file_version, &record);
293 if (!s.ok()) {
294 // Stop the replay
295 if (ra->error_cb != nullptr) {
296 ra->error_cb(s, ra->trace_entry.ts);
297 }
298 // Report the result
299 if (ra->result_cb != nullptr) {
300 ra->result_cb(s, nullptr);
301 }
302 return;
303 }
304
305 if (ra->result_cb == nullptr) {
306 s = record->Accept(ra->handler, nullptr);
307 } else {
308 std::unique_ptr<TraceRecordResult> res;
309 s = record->Accept(ra->handler, &res);
310 ra->result_cb(s, std::move(res));
311 }
312 record.reset();
313 }
314
315 } // namespace ROCKSDB_NAMESPACE
316 #endif // ROCKSDB_LITE