]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/trace_replay/trace_replay.cc
a0f9a504f08e37a99d36ada32b724e23841cfc3f
[ceph.git] / ceph / src / rocksdb / trace_replay / trace_replay.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "trace_replay/trace_replay.h"
7
8 #include <chrono>
9 #include <sstream>
10 #include <thread>
11 #include "db/db_impl/db_impl.h"
12 #include "rocksdb/slice.h"
13 #include "rocksdb/write_batch.h"
14 #include "util/coding.h"
15 #include "util/string_util.h"
16 #include "util/threadpool_imp.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
20 const std::string kTraceMagic = "feedcafedeadbeef";
21
22 namespace {
23 void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
24 PutFixed32(dst, cf_id);
25 PutLengthPrefixedSlice(dst, key);
26 }
27
28 void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
29 Slice buf(buffer);
30 GetFixed32(&buf, cf_id);
31 GetLengthPrefixedSlice(&buf, key);
32 }
33 } // namespace
34
35 void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
36 assert(encoded_trace);
37 PutFixed64(encoded_trace, trace.ts);
38 encoded_trace->push_back(trace.type);
39 PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
40 encoded_trace->append(trace.payload);
41 }
42
43 Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
44 Trace* trace) {
45 assert(trace != nullptr);
46 Slice enc_slice = Slice(encoded_trace);
47 if (!GetFixed64(&enc_slice, &trace->ts)) {
48 return Status::Incomplete("Decode trace string failed");
49 }
50 if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
51 return Status::Incomplete("Decode trace string failed");
52 }
53 trace->type = static_cast<TraceType>(enc_slice[0]);
54 enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
55 trace->payload = enc_slice.ToString();
56 return Status::OK();
57 }
58
59 Tracer::Tracer(Env* env, const TraceOptions& trace_options,
60 std::unique_ptr<TraceWriter>&& trace_writer)
61 : env_(env),
62 trace_options_(trace_options),
63 trace_writer_(std::move(trace_writer)),
64 trace_request_count_ (0) {
65 WriteHeader();
66 }
67
68 Tracer::~Tracer() { trace_writer_.reset(); }
69
70 Status Tracer::Write(WriteBatch* write_batch) {
71 TraceType trace_type = kTraceWrite;
72 if (ShouldSkipTrace(trace_type)) {
73 return Status::OK();
74 }
75 Trace trace;
76 trace.ts = env_->NowMicros();
77 trace.type = trace_type;
78 trace.payload = write_batch->Data();
79 return WriteTrace(trace);
80 }
81
82 Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
83 TraceType trace_type = kTraceGet;
84 if (ShouldSkipTrace(trace_type)) {
85 return Status::OK();
86 }
87 Trace trace;
88 trace.ts = env_->NowMicros();
89 trace.type = trace_type;
90 EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
91 return WriteTrace(trace);
92 }
93
94 Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
95 TraceType trace_type = kTraceIteratorSeek;
96 if (ShouldSkipTrace(trace_type)) {
97 return Status::OK();
98 }
99 Trace trace;
100 trace.ts = env_->NowMicros();
101 trace.type = trace_type;
102 EncodeCFAndKey(&trace.payload, cf_id, key);
103 return WriteTrace(trace);
104 }
105
106 Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
107 TraceType trace_type = kTraceIteratorSeekForPrev;
108 if (ShouldSkipTrace(trace_type)) {
109 return Status::OK();
110 }
111 Trace trace;
112 trace.ts = env_->NowMicros();
113 trace.type = trace_type;
114 EncodeCFAndKey(&trace.payload, cf_id, key);
115 return WriteTrace(trace);
116 }
117
118 bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
119 if (IsTraceFileOverMax()) {
120 return true;
121 }
122 if ((trace_options_.filter & kTraceFilterGet
123 && trace_type == kTraceGet)
124 || (trace_options_.filter & kTraceFilterWrite
125 && trace_type == kTraceWrite)) {
126 return true;
127 }
128 ++trace_request_count_;
129 if (trace_request_count_ < trace_options_.sampling_frequency) {
130 return true;
131 }
132 trace_request_count_ = 0;
133 return false;
134 }
135
136 bool Tracer::IsTraceFileOverMax() {
137 uint64_t trace_file_size = trace_writer_->GetFileSize();
138 return (trace_file_size > trace_options_.max_trace_file_size);
139 }
140
141 Status Tracer::WriteHeader() {
142 std::ostringstream s;
143 s << kTraceMagic << "\t"
144 << "Trace Version: 0.1\t"
145 << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
146 << "Format: Timestamp OpType Payload\n";
147 std::string header(s.str());
148
149 Trace trace;
150 trace.ts = env_->NowMicros();
151 trace.type = kTraceBegin;
152 trace.payload = header;
153 return WriteTrace(trace);
154 }
155
156 Status Tracer::WriteFooter() {
157 Trace trace;
158 trace.ts = env_->NowMicros();
159 trace.type = kTraceEnd;
160 trace.payload = "";
161 return WriteTrace(trace);
162 }
163
164 Status Tracer::WriteTrace(const Trace& trace) {
165 std::string encoded_trace;
166 TracerHelper::EncodeTrace(trace, &encoded_trace);
167 return trace_writer_->Write(Slice(encoded_trace));
168 }
169
170 Status Tracer::Close() { return WriteFooter(); }
171
172 Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
173 std::unique_ptr<TraceReader>&& reader)
174 : trace_reader_(std::move(reader)) {
175 assert(db != nullptr);
176 db_ = static_cast<DBImpl*>(db->GetRootDB());
177 env_ = Env::Default();
178 for (ColumnFamilyHandle* cfh : handles) {
179 cf_map_[cfh->GetID()] = cfh;
180 }
181 fast_forward_ = 1;
182 }
183
184 Replayer::~Replayer() { trace_reader_.reset(); }
185
186 Status Replayer::SetFastForward(uint32_t fast_forward) {
187 Status s;
188 if (fast_forward < 1) {
189 s = Status::InvalidArgument("Wrong fast forward speed!");
190 } else {
191 fast_forward_ = fast_forward;
192 s = Status::OK();
193 }
194 return s;
195 }
196
197 Status Replayer::Replay() {
198 Status s;
199 Trace header;
200 s = ReadHeader(&header);
201 if (!s.ok()) {
202 return s;
203 }
204
205 std::chrono::system_clock::time_point replay_epoch =
206 std::chrono::system_clock::now();
207 WriteOptions woptions;
208 ReadOptions roptions;
209 Trace trace;
210 uint64_t ops = 0;
211 Iterator* single_iter = nullptr;
212 while (s.ok()) {
213 trace.reset();
214 s = ReadTrace(&trace);
215 if (!s.ok()) {
216 break;
217 }
218
219 std::this_thread::sleep_until(
220 replay_epoch +
221 std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
222 if (trace.type == kTraceWrite) {
223 WriteBatch batch(trace.payload);
224 db_->Write(woptions, &batch);
225 ops++;
226 } else if (trace.type == kTraceGet) {
227 uint32_t cf_id = 0;
228 Slice key;
229 DecodeCFAndKey(trace.payload, &cf_id, &key);
230 if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
231 return Status::Corruption("Invalid Column Family ID.");
232 }
233
234 std::string value;
235 if (cf_id == 0) {
236 db_->Get(roptions, key, &value);
237 } else {
238 db_->Get(roptions, cf_map_[cf_id], key, &value);
239 }
240 ops++;
241 } else if (trace.type == kTraceIteratorSeek) {
242 uint32_t cf_id = 0;
243 Slice key;
244 DecodeCFAndKey(trace.payload, &cf_id, &key);
245 if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
246 return Status::Corruption("Invalid Column Family ID.");
247 }
248
249 if (cf_id == 0) {
250 single_iter = db_->NewIterator(roptions);
251 } else {
252 single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
253 }
254 single_iter->Seek(key);
255 ops++;
256 delete single_iter;
257 } else if (trace.type == kTraceIteratorSeekForPrev) {
258 // Currently, only support to call the Seek()
259 uint32_t cf_id = 0;
260 Slice key;
261 DecodeCFAndKey(trace.payload, &cf_id, &key);
262 if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
263 return Status::Corruption("Invalid Column Family ID.");
264 }
265
266 if (cf_id == 0) {
267 single_iter = db_->NewIterator(roptions);
268 } else {
269 single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
270 }
271 single_iter->SeekForPrev(key);
272 ops++;
273 delete single_iter;
274 } else if (trace.type == kTraceEnd) {
275 // Do nothing for now.
276 // TODO: Add some validations later.
277 break;
278 }
279 }
280
281 if (s.IsIncomplete()) {
282 // Reaching eof returns Incomplete status at the moment.
283 // Could happen when killing a process without calling EndTrace() API.
284 // TODO: Add better error handling.
285 return Status::OK();
286 }
287 return s;
288 }
289
290 // The trace can be replayed with multithread by configurnge the number of
291 // threads in the thread pool. Trace records are read from the trace file
292 // sequentially and the corresponding queries are scheduled in the task
293 // queue based on the timestamp. Currently, we support Write_batch (Put,
294 // Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
295 Status Replayer::MultiThreadReplay(uint32_t threads_num) {
296 Status s;
297 Trace header;
298 s = ReadHeader(&header);
299 if (!s.ok()) {
300 return s;
301 }
302
303 ThreadPoolImpl thread_pool;
304 thread_pool.SetHostEnv(env_);
305
306 if (threads_num > 1) {
307 thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
308 } else {
309 thread_pool.SetBackgroundThreads(1);
310 }
311
312 std::chrono::system_clock::time_point replay_epoch =
313 std::chrono::system_clock::now();
314 WriteOptions woptions;
315 ReadOptions roptions;
316 uint64_t ops = 0;
317 while (s.ok()) {
318 std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
319 ra->db = db_;
320 s = ReadTrace(&(ra->trace_entry));
321 if (!s.ok()) {
322 break;
323 }
324 ra->woptions = woptions;
325 ra->roptions = roptions;
326
327 std::this_thread::sleep_until(
328 replay_epoch + std::chrono::microseconds(
329 (ra->trace_entry.ts - header.ts) / fast_forward_));
330 if (ra->trace_entry.type == kTraceWrite) {
331 thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
332 nullptr);
333 ops++;
334 } else if (ra->trace_entry.type == kTraceGet) {
335 thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
336 nullptr);
337 ops++;
338 } else if (ra->trace_entry.type == kTraceIteratorSeek) {
339 thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
340 nullptr);
341 ops++;
342 } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
343 thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
344 nullptr, nullptr);
345 ops++;
346 } else if (ra->trace_entry.type == kTraceEnd) {
347 // Do nothing for now.
348 // TODO: Add some validations later.
349 break;
350 } else {
351 // Other trace entry types that are not implemented for replay.
352 // To finish the replay, we continue the process.
353 continue;
354 }
355 }
356
357 if (s.IsIncomplete()) {
358 // Reaching eof returns Incomplete status at the moment.
359 // Could happen when killing a process without calling EndTrace() API.
360 // TODO: Add better error handling.
361 s = Status::OK();
362 }
363 thread_pool.JoinAllThreads();
364 return s;
365 }
366
367 Status Replayer::ReadHeader(Trace* header) {
368 assert(header != nullptr);
369 Status s = ReadTrace(header);
370 if (!s.ok()) {
371 return s;
372 }
373 if (header->type != kTraceBegin) {
374 return Status::Corruption("Corrupted trace file. Incorrect header.");
375 }
376 if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
377 return Status::Corruption("Corrupted trace file. Incorrect magic.");
378 }
379
380 return s;
381 }
382
383 Status Replayer::ReadFooter(Trace* footer) {
384 assert(footer != nullptr);
385 Status s = ReadTrace(footer);
386 if (!s.ok()) {
387 return s;
388 }
389 if (footer->type != kTraceEnd) {
390 return Status::Corruption("Corrupted trace file. Incorrect footer.");
391 }
392
393 // TODO: Add more validations later
394 return s;
395 }
396
397 Status Replayer::ReadTrace(Trace* trace) {
398 assert(trace != nullptr);
399 std::string encoded_trace;
400 Status s = trace_reader_->Read(&encoded_trace);
401 if (!s.ok()) {
402 return s;
403 }
404 return TracerHelper::DecodeTrace(encoded_trace, trace);
405 }
406
407 void Replayer::BGWorkGet(void* arg) {
408 std::unique_ptr<ReplayerWorkerArg> ra(
409 reinterpret_cast<ReplayerWorkerArg*>(arg));
410 auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
411 ra->cf_map);
412 uint32_t cf_id = 0;
413 Slice key;
414 DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
415 if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
416 return;
417 }
418
419 std::string value;
420 if (cf_id == 0) {
421 ra->db->Get(ra->roptions, key, &value);
422 } else {
423 ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value);
424 }
425
426 return;
427 }
428
429 void Replayer::BGWorkWriteBatch(void* arg) {
430 std::unique_ptr<ReplayerWorkerArg> ra(
431 reinterpret_cast<ReplayerWorkerArg*>(arg));
432 WriteBatch batch(ra->trace_entry.payload);
433 ra->db->Write(ra->woptions, &batch);
434 return;
435 }
436
437 void Replayer::BGWorkIterSeek(void* arg) {
438 std::unique_ptr<ReplayerWorkerArg> ra(
439 reinterpret_cast<ReplayerWorkerArg*>(arg));
440 auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
441 ra->cf_map);
442 uint32_t cf_id = 0;
443 Slice key;
444 DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
445 if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
446 return;
447 }
448
449 std::string value;
450 Iterator* single_iter = nullptr;
451 if (cf_id == 0) {
452 single_iter = ra->db->NewIterator(ra->roptions);
453 } else {
454 single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
455 }
456 single_iter->Seek(key);
457 delete single_iter;
458 return;
459 }
460
461 void Replayer::BGWorkIterSeekForPrev(void* arg) {
462 std::unique_ptr<ReplayerWorkerArg> ra(
463 reinterpret_cast<ReplayerWorkerArg*>(arg));
464 auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
465 ra->cf_map);
466 uint32_t cf_id = 0;
467 Slice key;
468 DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
469 if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
470 return;
471 }
472
473 std::string value;
474 Iterator* single_iter = nullptr;
475 if (cf_id == 0) {
476 single_iter = ra->db->NewIterator(ra->roptions);
477 } else {
478 single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
479 }
480 single_iter->SeekForPrev(key);
481 delete single_iter;
482 return;
483 }
484
485 } // namespace ROCKSDB_NAMESPACE