]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/trace_replay/trace_replay.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / trace_replay / trace_replay.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
f67539c2 6#include "trace_replay/trace_replay.h"
11fdf7f2
TL
7
8#include <chrono>
9#include <sstream>
10#include <thread>
f67539c2 11#include "db/db_impl/db_impl.h"
11fdf7f2
TL
12#include "rocksdb/slice.h"
13#include "rocksdb/write_batch.h"
14#include "util/coding.h"
15#include "util/string_util.h"
f67539c2 16#include "util/threadpool_imp.h"
11fdf7f2 17
f67539c2 18namespace ROCKSDB_NAMESPACE {
11fdf7f2 19
494da23a
TL
20const std::string kTraceMagic = "feedcafedeadbeef";
21
11fdf7f2
TL
22namespace {
23void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
24 PutFixed32(dst, cf_id);
25 PutLengthPrefixedSlice(dst, key);
26}
27
28void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
29 Slice buf(buffer);
30 GetFixed32(&buf, cf_id);
31 GetLengthPrefixedSlice(&buf, key);
32}
33} // namespace
34
f67539c2
TL
35void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
36 assert(encoded_trace);
37 PutFixed64(encoded_trace, trace.ts);
38 encoded_trace->push_back(trace.type);
39 PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
40 encoded_trace->append(trace.payload);
41}
42
43Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
44 Trace* trace) {
45 assert(trace != nullptr);
46 Slice enc_slice = Slice(encoded_trace);
47 if (!GetFixed64(&enc_slice, &trace->ts)) {
48 return Status::Incomplete("Decode trace string failed");
49 }
50 if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
51 return Status::Incomplete("Decode trace string failed");
52 }
53 trace->type = static_cast<TraceType>(enc_slice[0]);
54 enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
55 trace->payload = enc_slice.ToString();
56 return Status::OK();
57}
58
494da23a
TL
59Tracer::Tracer(Env* env, const TraceOptions& trace_options,
60 std::unique_ptr<TraceWriter>&& trace_writer)
61 : env_(env),
62 trace_options_(trace_options),
63 trace_writer_(std::move(trace_writer)),
64 trace_request_count_ (0) {
20effc67
TL
65 // TODO: What if this fails?
66 WriteHeader().PermitUncheckedError();
11fdf7f2
TL
67}
68
69Tracer::~Tracer() { trace_writer_.reset(); }
70
71Status Tracer::Write(WriteBatch* write_batch) {
494da23a
TL
72 TraceType trace_type = kTraceWrite;
73 if (ShouldSkipTrace(trace_type)) {
74 return Status::OK();
75 }
11fdf7f2
TL
76 Trace trace;
77 trace.ts = env_->NowMicros();
494da23a 78 trace.type = trace_type;
11fdf7f2
TL
79 trace.payload = write_batch->Data();
80 return WriteTrace(trace);
81}
82
83Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
494da23a
TL
84 TraceType trace_type = kTraceGet;
85 if (ShouldSkipTrace(trace_type)) {
86 return Status::OK();
87 }
11fdf7f2
TL
88 Trace trace;
89 trace.ts = env_->NowMicros();
494da23a 90 trace.type = trace_type;
11fdf7f2
TL
91 EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
92 return WriteTrace(trace);
93}
94
95Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
494da23a
TL
96 TraceType trace_type = kTraceIteratorSeek;
97 if (ShouldSkipTrace(trace_type)) {
98 return Status::OK();
99 }
11fdf7f2
TL
100 Trace trace;
101 trace.ts = env_->NowMicros();
494da23a 102 trace.type = trace_type;
11fdf7f2
TL
103 EncodeCFAndKey(&trace.payload, cf_id, key);
104 return WriteTrace(trace);
105}
106
107Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
494da23a
TL
108 TraceType trace_type = kTraceIteratorSeekForPrev;
109 if (ShouldSkipTrace(trace_type)) {
110 return Status::OK();
111 }
11fdf7f2
TL
112 Trace trace;
113 trace.ts = env_->NowMicros();
494da23a 114 trace.type = trace_type;
11fdf7f2
TL
115 EncodeCFAndKey(&trace.payload, cf_id, key);
116 return WriteTrace(trace);
117}
118
494da23a
TL
119bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
120 if (IsTraceFileOverMax()) {
121 return true;
122 }
123 if ((trace_options_.filter & kTraceFilterGet
124 && trace_type == kTraceGet)
125 || (trace_options_.filter & kTraceFilterWrite
126 && trace_type == kTraceWrite)) {
127 return true;
128 }
129 ++trace_request_count_;
130 if (trace_request_count_ < trace_options_.sampling_frequency) {
131 return true;
132 }
133 trace_request_count_ = 0;
134 return false;
135}
136
137bool Tracer::IsTraceFileOverMax() {
138 uint64_t trace_file_size = trace_writer_->GetFileSize();
139 return (trace_file_size > trace_options_.max_trace_file_size);
140}
141
11fdf7f2
TL
142Status Tracer::WriteHeader() {
143 std::ostringstream s;
144 s << kTraceMagic << "\t"
145 << "Trace Version: 0.1\t"
146 << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
147 << "Format: Timestamp OpType Payload\n";
148 std::string header(s.str());
149
150 Trace trace;
151 trace.ts = env_->NowMicros();
152 trace.type = kTraceBegin;
153 trace.payload = header;
154 return WriteTrace(trace);
155}
156
157Status Tracer::WriteFooter() {
158 Trace trace;
159 trace.ts = env_->NowMicros();
160 trace.type = kTraceEnd;
161 trace.payload = "";
162 return WriteTrace(trace);
163}
164
165Status Tracer::WriteTrace(const Trace& trace) {
166 std::string encoded_trace;
f67539c2 167 TracerHelper::EncodeTrace(trace, &encoded_trace);
11fdf7f2
TL
168 return trace_writer_->Write(Slice(encoded_trace));
169}
170
171Status Tracer::Close() { return WriteFooter(); }
172
173Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
494da23a 174 std::unique_ptr<TraceReader>&& reader)
11fdf7f2
TL
175 : trace_reader_(std::move(reader)) {
176 assert(db != nullptr);
177 db_ = static_cast<DBImpl*>(db->GetRootDB());
f67539c2 178 env_ = Env::Default();
11fdf7f2
TL
179 for (ColumnFamilyHandle* cfh : handles) {
180 cf_map_[cfh->GetID()] = cfh;
181 }
f67539c2 182 fast_forward_ = 1;
11fdf7f2
TL
183}
184
185Replayer::~Replayer() { trace_reader_.reset(); }
186
f67539c2
TL
187Status Replayer::SetFastForward(uint32_t fast_forward) {
188 Status s;
189 if (fast_forward < 1) {
190 s = Status::InvalidArgument("Wrong fast forward speed!");
191 } else {
192 fast_forward_ = fast_forward;
193 s = Status::OK();
194 }
195 return s;
196}
197
11fdf7f2
TL
198Status Replayer::Replay() {
199 Status s;
200 Trace header;
201 s = ReadHeader(&header);
202 if (!s.ok()) {
203 return s;
204 }
205
206 std::chrono::system_clock::time_point replay_epoch =
207 std::chrono::system_clock::now();
208 WriteOptions woptions;
209 ReadOptions roptions;
210 Trace trace;
211 uint64_t ops = 0;
212 Iterator* single_iter = nullptr;
213 while (s.ok()) {
214 trace.reset();
215 s = ReadTrace(&trace);
216 if (!s.ok()) {
217 break;
218 }
219
220 std::this_thread::sleep_until(
f67539c2
TL
221 replay_epoch +
222 std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
11fdf7f2
TL
223 if (trace.type == kTraceWrite) {
224 WriteBatch batch(trace.payload);
225 db_->Write(woptions, &batch);
226 ops++;
227 } else if (trace.type == kTraceGet) {
228 uint32_t cf_id = 0;
229 Slice key;
230 DecodeCFAndKey(trace.payload, &cf_id, &key);
231 if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
232 return Status::Corruption("Invalid Column Family ID.");
233 }
234
235 std::string value;
236 if (cf_id == 0) {
237 db_->Get(roptions, key, &value);
238 } else {
239 db_->Get(roptions, cf_map_[cf_id], key, &value);
240 }
241 ops++;
242 } else if (trace.type == kTraceIteratorSeek) {
243 uint32_t cf_id = 0;
244 Slice key;
245 DecodeCFAndKey(trace.payload, &cf_id, &key);
246 if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
247 return Status::Corruption("Invalid Column Family ID.");
248 }
249
250 if (cf_id == 0) {
251 single_iter = db_->NewIterator(roptions);
252 } else {
253 single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
254 }
255 single_iter->Seek(key);
256 ops++;
257 delete single_iter;
258 } else if (trace.type == kTraceIteratorSeekForPrev) {
259 // Currently, only support to call the Seek()
260 uint32_t cf_id = 0;
261 Slice key;
262 DecodeCFAndKey(trace.payload, &cf_id, &key);
263 if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
264 return Status::Corruption("Invalid Column Family ID.");
265 }
266
267 if (cf_id == 0) {
268 single_iter = db_->NewIterator(roptions);
269 } else {
270 single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
271 }
272 single_iter->SeekForPrev(key);
273 ops++;
274 delete single_iter;
275 } else if (trace.type == kTraceEnd) {
276 // Do nothing for now.
277 // TODO: Add some validations later.
278 break;
279 }
280 }
281
282 if (s.IsIncomplete()) {
283 // Reaching eof returns Incomplete status at the moment.
284 // Could happen when killing a process without calling EndTrace() API.
285 // TODO: Add better error handling.
286 return Status::OK();
287 }
288 return s;
289}
290
f67539c2
TL
291// The trace can be replayed with multithread by configurnge the number of
292// threads in the thread pool. Trace records are read from the trace file
293// sequentially and the corresponding queries are scheduled in the task
294// queue based on the timestamp. Currently, we support Write_batch (Put,
295// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
296Status Replayer::MultiThreadReplay(uint32_t threads_num) {
297 Status s;
298 Trace header;
299 s = ReadHeader(&header);
300 if (!s.ok()) {
301 return s;
302 }
303
304 ThreadPoolImpl thread_pool;
305 thread_pool.SetHostEnv(env_);
306
307 if (threads_num > 1) {
308 thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
309 } else {
310 thread_pool.SetBackgroundThreads(1);
311 }
312
313 std::chrono::system_clock::time_point replay_epoch =
314 std::chrono::system_clock::now();
315 WriteOptions woptions;
316 ReadOptions roptions;
317 uint64_t ops = 0;
318 while (s.ok()) {
319 std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
320 ra->db = db_;
321 s = ReadTrace(&(ra->trace_entry));
322 if (!s.ok()) {
323 break;
324 }
20effc67 325 ra->cf_map = &cf_map_;
f67539c2
TL
326 ra->woptions = woptions;
327 ra->roptions = roptions;
328
329 std::this_thread::sleep_until(
330 replay_epoch + std::chrono::microseconds(
331 (ra->trace_entry.ts - header.ts) / fast_forward_));
332 if (ra->trace_entry.type == kTraceWrite) {
333 thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
334 nullptr);
335 ops++;
336 } else if (ra->trace_entry.type == kTraceGet) {
337 thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
338 nullptr);
339 ops++;
340 } else if (ra->trace_entry.type == kTraceIteratorSeek) {
341 thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
342 nullptr);
343 ops++;
344 } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
345 thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
346 nullptr, nullptr);
347 ops++;
348 } else if (ra->trace_entry.type == kTraceEnd) {
349 // Do nothing for now.
350 // TODO: Add some validations later.
351 break;
352 } else {
353 // Other trace entry types that are not implemented for replay.
354 // To finish the replay, we continue the process.
355 continue;
356 }
357 }
358
359 if (s.IsIncomplete()) {
360 // Reaching eof returns Incomplete status at the moment.
361 // Could happen when killing a process without calling EndTrace() API.
362 // TODO: Add better error handling.
363 s = Status::OK();
364 }
365 thread_pool.JoinAllThreads();
366 return s;
367}
368
11fdf7f2
TL
369Status Replayer::ReadHeader(Trace* header) {
370 assert(header != nullptr);
371 Status s = ReadTrace(header);
372 if (!s.ok()) {
373 return s;
374 }
375 if (header->type != kTraceBegin) {
376 return Status::Corruption("Corrupted trace file. Incorrect header.");
377 }
378 if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
379 return Status::Corruption("Corrupted trace file. Incorrect magic.");
380 }
381
382 return s;
383}
384
385Status Replayer::ReadFooter(Trace* footer) {
386 assert(footer != nullptr);
387 Status s = ReadTrace(footer);
388 if (!s.ok()) {
389 return s;
390 }
391 if (footer->type != kTraceEnd) {
392 return Status::Corruption("Corrupted trace file. Incorrect footer.");
393 }
394
395 // TODO: Add more validations later
396 return s;
397}
398
399Status Replayer::ReadTrace(Trace* trace) {
400 assert(trace != nullptr);
401 std::string encoded_trace;
402 Status s = trace_reader_->Read(&encoded_trace);
403 if (!s.ok()) {
404 return s;
405 }
f67539c2
TL
406 return TracerHelper::DecodeTrace(encoded_trace, trace);
407}
11fdf7f2 408
f67539c2
TL
409void Replayer::BGWorkGet(void* arg) {
410 std::unique_ptr<ReplayerWorkerArg> ra(
411 reinterpret_cast<ReplayerWorkerArg*>(arg));
20effc67 412 assert(ra != nullptr);
f67539c2
TL
413 auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
414 ra->cf_map);
415 uint32_t cf_id = 0;
416 Slice key;
417 DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
418 if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
419 return;
420 }
421
422 std::string value;
423 if (cf_id == 0) {
424 ra->db->Get(ra->roptions, key, &value);
425 } else {
426 ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value);
427 }
428
429 return;
430}
431
432void Replayer::BGWorkWriteBatch(void* arg) {
433 std::unique_ptr<ReplayerWorkerArg> ra(
434 reinterpret_cast<ReplayerWorkerArg*>(arg));
20effc67 435 assert(ra != nullptr);
f67539c2
TL
436 WriteBatch batch(ra->trace_entry.payload);
437 ra->db->Write(ra->woptions, &batch);
438 return;
439}
440
441void Replayer::BGWorkIterSeek(void* arg) {
442 std::unique_ptr<ReplayerWorkerArg> ra(
443 reinterpret_cast<ReplayerWorkerArg*>(arg));
20effc67 444 assert(ra != nullptr);
f67539c2
TL
445 auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
446 ra->cf_map);
447 uint32_t cf_id = 0;
448 Slice key;
449 DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
450 if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
451 return;
452 }
453
454 std::string value;
455 Iterator* single_iter = nullptr;
456 if (cf_id == 0) {
457 single_iter = ra->db->NewIterator(ra->roptions);
458 } else {
459 single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
460 }
461 single_iter->Seek(key);
462 delete single_iter;
463 return;
464}
465
466void Replayer::BGWorkIterSeekForPrev(void* arg) {
467 std::unique_ptr<ReplayerWorkerArg> ra(
468 reinterpret_cast<ReplayerWorkerArg*>(arg));
20effc67 469 assert(ra != nullptr);
f67539c2
TL
470 auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
471 ra->cf_map);
472 uint32_t cf_id = 0;
473 Slice key;
474 DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
475 if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
476 return;
477 }
478
479 std::string value;
480 Iterator* single_iter = nullptr;
481 if (cf_id == 0) {
482 single_iter = ra->db->NewIterator(ra->roptions);
483 } else {
484 single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
485 }
486 single_iter->SeekForPrev(key);
487 delete single_iter;
488 return;
11fdf7f2
TL
489}
490
f67539c2 491} // namespace ROCKSDB_NAMESPACE