]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
f67539c2 | 6 | #include "trace_replay/trace_replay.h" |
11fdf7f2 TL |
7 | |
8 | #include <chrono> | |
9 | #include <sstream> | |
10 | #include <thread> | |
f67539c2 | 11 | #include "db/db_impl/db_impl.h" |
11fdf7f2 TL |
12 | #include "rocksdb/slice.h" |
13 | #include "rocksdb/write_batch.h" | |
14 | #include "util/coding.h" | |
15 | #include "util/string_util.h" | |
f67539c2 | 16 | #include "util/threadpool_imp.h" |
11fdf7f2 | 17 | |
f67539c2 | 18 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 | 19 | |
494da23a TL |
20 | const std::string kTraceMagic = "feedcafedeadbeef"; |
21 | ||
11fdf7f2 TL |
22 | namespace { |
23 | void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) { | |
24 | PutFixed32(dst, cf_id); | |
25 | PutLengthPrefixedSlice(dst, key); | |
26 | } | |
27 | ||
28 | void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) { | |
29 | Slice buf(buffer); | |
30 | GetFixed32(&buf, cf_id); | |
31 | GetLengthPrefixedSlice(&buf, key); | |
32 | } | |
33 | } // namespace | |
34 | ||
f67539c2 TL |
35 | void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) { |
36 | assert(encoded_trace); | |
37 | PutFixed64(encoded_trace, trace.ts); | |
38 | encoded_trace->push_back(trace.type); | |
39 | PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size())); | |
40 | encoded_trace->append(trace.payload); | |
41 | } | |
42 | ||
43 | Status TracerHelper::DecodeTrace(const std::string& encoded_trace, | |
44 | Trace* trace) { | |
45 | assert(trace != nullptr); | |
46 | Slice enc_slice = Slice(encoded_trace); | |
47 | if (!GetFixed64(&enc_slice, &trace->ts)) { | |
48 | return Status::Incomplete("Decode trace string failed"); | |
49 | } | |
50 | if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) { | |
51 | return Status::Incomplete("Decode trace string failed"); | |
52 | } | |
53 | trace->type = static_cast<TraceType>(enc_slice[0]); | |
54 | enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize); | |
55 | trace->payload = enc_slice.ToString(); | |
56 | return Status::OK(); | |
57 | } | |
58 | ||
494da23a TL |
59 | Tracer::Tracer(Env* env, const TraceOptions& trace_options, |
60 | std::unique_ptr<TraceWriter>&& trace_writer) | |
61 | : env_(env), | |
62 | trace_options_(trace_options), | |
63 | trace_writer_(std::move(trace_writer)), | |
64 | trace_request_count_ (0) { | |
20effc67 TL |
65 | // TODO: What if this fails? |
66 | WriteHeader().PermitUncheckedError(); | |
11fdf7f2 TL |
67 | } |
68 | ||
69 | Tracer::~Tracer() { trace_writer_.reset(); } | |
70 | ||
71 | Status Tracer::Write(WriteBatch* write_batch) { | |
494da23a TL |
72 | TraceType trace_type = kTraceWrite; |
73 | if (ShouldSkipTrace(trace_type)) { | |
74 | return Status::OK(); | |
75 | } | |
11fdf7f2 TL |
76 | Trace trace; |
77 | trace.ts = env_->NowMicros(); | |
494da23a | 78 | trace.type = trace_type; |
11fdf7f2 TL |
79 | trace.payload = write_batch->Data(); |
80 | return WriteTrace(trace); | |
81 | } | |
82 | ||
83 | Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { | |
494da23a TL |
84 | TraceType trace_type = kTraceGet; |
85 | if (ShouldSkipTrace(trace_type)) { | |
86 | return Status::OK(); | |
87 | } | |
11fdf7f2 TL |
88 | Trace trace; |
89 | trace.ts = env_->NowMicros(); | |
494da23a | 90 | trace.type = trace_type; |
11fdf7f2 TL |
91 | EncodeCFAndKey(&trace.payload, column_family->GetID(), key); |
92 | return WriteTrace(trace); | |
93 | } | |
94 | ||
95 | Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { | |
494da23a TL |
96 | TraceType trace_type = kTraceIteratorSeek; |
97 | if (ShouldSkipTrace(trace_type)) { | |
98 | return Status::OK(); | |
99 | } | |
11fdf7f2 TL |
100 | Trace trace; |
101 | trace.ts = env_->NowMicros(); | |
494da23a | 102 | trace.type = trace_type; |
11fdf7f2 TL |
103 | EncodeCFAndKey(&trace.payload, cf_id, key); |
104 | return WriteTrace(trace); | |
105 | } | |
106 | ||
107 | Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { | |
494da23a TL |
108 | TraceType trace_type = kTraceIteratorSeekForPrev; |
109 | if (ShouldSkipTrace(trace_type)) { | |
110 | return Status::OK(); | |
111 | } | |
11fdf7f2 TL |
112 | Trace trace; |
113 | trace.ts = env_->NowMicros(); | |
494da23a | 114 | trace.type = trace_type; |
11fdf7f2 TL |
115 | EncodeCFAndKey(&trace.payload, cf_id, key); |
116 | return WriteTrace(trace); | |
117 | } | |
118 | ||
494da23a TL |
119 | bool Tracer::ShouldSkipTrace(const TraceType& trace_type) { |
120 | if (IsTraceFileOverMax()) { | |
121 | return true; | |
122 | } | |
123 | if ((trace_options_.filter & kTraceFilterGet | |
124 | && trace_type == kTraceGet) | |
125 | || (trace_options_.filter & kTraceFilterWrite | |
126 | && trace_type == kTraceWrite)) { | |
127 | return true; | |
128 | } | |
129 | ++trace_request_count_; | |
130 | if (trace_request_count_ < trace_options_.sampling_frequency) { | |
131 | return true; | |
132 | } | |
133 | trace_request_count_ = 0; | |
134 | return false; | |
135 | } | |
136 | ||
137 | bool Tracer::IsTraceFileOverMax() { | |
138 | uint64_t trace_file_size = trace_writer_->GetFileSize(); | |
139 | return (trace_file_size > trace_options_.max_trace_file_size); | |
140 | } | |
141 | ||
11fdf7f2 TL |
142 | Status Tracer::WriteHeader() { |
143 | std::ostringstream s; | |
144 | s << kTraceMagic << "\t" | |
145 | << "Trace Version: 0.1\t" | |
146 | << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t" | |
147 | << "Format: Timestamp OpType Payload\n"; | |
148 | std::string header(s.str()); | |
149 | ||
150 | Trace trace; | |
151 | trace.ts = env_->NowMicros(); | |
152 | trace.type = kTraceBegin; | |
153 | trace.payload = header; | |
154 | return WriteTrace(trace); | |
155 | } | |
156 | ||
157 | Status Tracer::WriteFooter() { | |
158 | Trace trace; | |
159 | trace.ts = env_->NowMicros(); | |
160 | trace.type = kTraceEnd; | |
161 | trace.payload = ""; | |
162 | return WriteTrace(trace); | |
163 | } | |
164 | ||
165 | Status Tracer::WriteTrace(const Trace& trace) { | |
166 | std::string encoded_trace; | |
f67539c2 | 167 | TracerHelper::EncodeTrace(trace, &encoded_trace); |
11fdf7f2 TL |
168 | return trace_writer_->Write(Slice(encoded_trace)); |
169 | } | |
170 | ||
171 | Status Tracer::Close() { return WriteFooter(); } | |
172 | ||
173 | Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles, | |
494da23a | 174 | std::unique_ptr<TraceReader>&& reader) |
11fdf7f2 TL |
175 | : trace_reader_(std::move(reader)) { |
176 | assert(db != nullptr); | |
177 | db_ = static_cast<DBImpl*>(db->GetRootDB()); | |
f67539c2 | 178 | env_ = Env::Default(); |
11fdf7f2 TL |
179 | for (ColumnFamilyHandle* cfh : handles) { |
180 | cf_map_[cfh->GetID()] = cfh; | |
181 | } | |
f67539c2 | 182 | fast_forward_ = 1; |
11fdf7f2 TL |
183 | } |
184 | ||
185 | Replayer::~Replayer() { trace_reader_.reset(); } | |
186 | ||
f67539c2 TL |
187 | Status Replayer::SetFastForward(uint32_t fast_forward) { |
188 | Status s; | |
189 | if (fast_forward < 1) { | |
190 | s = Status::InvalidArgument("Wrong fast forward speed!"); | |
191 | } else { | |
192 | fast_forward_ = fast_forward; | |
193 | s = Status::OK(); | |
194 | } | |
195 | return s; | |
196 | } | |
197 | ||
11fdf7f2 TL |
198 | Status Replayer::Replay() { |
199 | Status s; | |
200 | Trace header; | |
201 | s = ReadHeader(&header); | |
202 | if (!s.ok()) { | |
203 | return s; | |
204 | } | |
205 | ||
206 | std::chrono::system_clock::time_point replay_epoch = | |
207 | std::chrono::system_clock::now(); | |
208 | WriteOptions woptions; | |
209 | ReadOptions roptions; | |
210 | Trace trace; | |
211 | uint64_t ops = 0; | |
212 | Iterator* single_iter = nullptr; | |
213 | while (s.ok()) { | |
214 | trace.reset(); | |
215 | s = ReadTrace(&trace); | |
216 | if (!s.ok()) { | |
217 | break; | |
218 | } | |
219 | ||
220 | std::this_thread::sleep_until( | |
f67539c2 TL |
221 | replay_epoch + |
222 | std::chrono::microseconds((trace.ts - header.ts) / fast_forward_)); | |
11fdf7f2 TL |
223 | if (trace.type == kTraceWrite) { |
224 | WriteBatch batch(trace.payload); | |
225 | db_->Write(woptions, &batch); | |
226 | ops++; | |
227 | } else if (trace.type == kTraceGet) { | |
228 | uint32_t cf_id = 0; | |
229 | Slice key; | |
230 | DecodeCFAndKey(trace.payload, &cf_id, &key); | |
231 | if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { | |
232 | return Status::Corruption("Invalid Column Family ID."); | |
233 | } | |
234 | ||
235 | std::string value; | |
236 | if (cf_id == 0) { | |
237 | db_->Get(roptions, key, &value); | |
238 | } else { | |
239 | db_->Get(roptions, cf_map_[cf_id], key, &value); | |
240 | } | |
241 | ops++; | |
242 | } else if (trace.type == kTraceIteratorSeek) { | |
243 | uint32_t cf_id = 0; | |
244 | Slice key; | |
245 | DecodeCFAndKey(trace.payload, &cf_id, &key); | |
246 | if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { | |
247 | return Status::Corruption("Invalid Column Family ID."); | |
248 | } | |
249 | ||
250 | if (cf_id == 0) { | |
251 | single_iter = db_->NewIterator(roptions); | |
252 | } else { | |
253 | single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); | |
254 | } | |
255 | single_iter->Seek(key); | |
256 | ops++; | |
257 | delete single_iter; | |
258 | } else if (trace.type == kTraceIteratorSeekForPrev) { | |
259 | // Currently, only support to call the Seek() | |
260 | uint32_t cf_id = 0; | |
261 | Slice key; | |
262 | DecodeCFAndKey(trace.payload, &cf_id, &key); | |
263 | if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { | |
264 | return Status::Corruption("Invalid Column Family ID."); | |
265 | } | |
266 | ||
267 | if (cf_id == 0) { | |
268 | single_iter = db_->NewIterator(roptions); | |
269 | } else { | |
270 | single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); | |
271 | } | |
272 | single_iter->SeekForPrev(key); | |
273 | ops++; | |
274 | delete single_iter; | |
275 | } else if (trace.type == kTraceEnd) { | |
276 | // Do nothing for now. | |
277 | // TODO: Add some validations later. | |
278 | break; | |
279 | } | |
280 | } | |
281 | ||
282 | if (s.IsIncomplete()) { | |
283 | // Reaching eof returns Incomplete status at the moment. | |
284 | // Could happen when killing a process without calling EndTrace() API. | |
285 | // TODO: Add better error handling. | |
286 | return Status::OK(); | |
287 | } | |
288 | return s; | |
289 | } | |
290 | ||
f67539c2 TL |
291 | // The trace can be replayed with multithread by configurnge the number of |
292 | // threads in the thread pool. Trace records are read from the trace file | |
293 | // sequentially and the corresponding queries are scheduled in the task | |
294 | // queue based on the timestamp. Currently, we support Write_batch (Put, | |
295 | // Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev). | |
296 | Status Replayer::MultiThreadReplay(uint32_t threads_num) { | |
297 | Status s; | |
298 | Trace header; | |
299 | s = ReadHeader(&header); | |
300 | if (!s.ok()) { | |
301 | return s; | |
302 | } | |
303 | ||
304 | ThreadPoolImpl thread_pool; | |
305 | thread_pool.SetHostEnv(env_); | |
306 | ||
307 | if (threads_num > 1) { | |
308 | thread_pool.SetBackgroundThreads(static_cast<int>(threads_num)); | |
309 | } else { | |
310 | thread_pool.SetBackgroundThreads(1); | |
311 | } | |
312 | ||
313 | std::chrono::system_clock::time_point replay_epoch = | |
314 | std::chrono::system_clock::now(); | |
315 | WriteOptions woptions; | |
316 | ReadOptions roptions; | |
317 | uint64_t ops = 0; | |
318 | while (s.ok()) { | |
319 | std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg); | |
320 | ra->db = db_; | |
321 | s = ReadTrace(&(ra->trace_entry)); | |
322 | if (!s.ok()) { | |
323 | break; | |
324 | } | |
20effc67 | 325 | ra->cf_map = &cf_map_; |
f67539c2 TL |
326 | ra->woptions = woptions; |
327 | ra->roptions = roptions; | |
328 | ||
329 | std::this_thread::sleep_until( | |
330 | replay_epoch + std::chrono::microseconds( | |
331 | (ra->trace_entry.ts - header.ts) / fast_forward_)); | |
332 | if (ra->trace_entry.type == kTraceWrite) { | |
333 | thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr, | |
334 | nullptr); | |
335 | ops++; | |
336 | } else if (ra->trace_entry.type == kTraceGet) { | |
337 | thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr, | |
338 | nullptr); | |
339 | ops++; | |
340 | } else if (ra->trace_entry.type == kTraceIteratorSeek) { | |
341 | thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr, | |
342 | nullptr); | |
343 | ops++; | |
344 | } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) { | |
345 | thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(), | |
346 | nullptr, nullptr); | |
347 | ops++; | |
348 | } else if (ra->trace_entry.type == kTraceEnd) { | |
349 | // Do nothing for now. | |
350 | // TODO: Add some validations later. | |
351 | break; | |
352 | } else { | |
353 | // Other trace entry types that are not implemented for replay. | |
354 | // To finish the replay, we continue the process. | |
355 | continue; | |
356 | } | |
357 | } | |
358 | ||
359 | if (s.IsIncomplete()) { | |
360 | // Reaching eof returns Incomplete status at the moment. | |
361 | // Could happen when killing a process without calling EndTrace() API. | |
362 | // TODO: Add better error handling. | |
363 | s = Status::OK(); | |
364 | } | |
365 | thread_pool.JoinAllThreads(); | |
366 | return s; | |
367 | } | |
368 | ||
11fdf7f2 TL |
369 | Status Replayer::ReadHeader(Trace* header) { |
370 | assert(header != nullptr); | |
371 | Status s = ReadTrace(header); | |
372 | if (!s.ok()) { | |
373 | return s; | |
374 | } | |
375 | if (header->type != kTraceBegin) { | |
376 | return Status::Corruption("Corrupted trace file. Incorrect header."); | |
377 | } | |
378 | if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) { | |
379 | return Status::Corruption("Corrupted trace file. Incorrect magic."); | |
380 | } | |
381 | ||
382 | return s; | |
383 | } | |
384 | ||
385 | Status Replayer::ReadFooter(Trace* footer) { | |
386 | assert(footer != nullptr); | |
387 | Status s = ReadTrace(footer); | |
388 | if (!s.ok()) { | |
389 | return s; | |
390 | } | |
391 | if (footer->type != kTraceEnd) { | |
392 | return Status::Corruption("Corrupted trace file. Incorrect footer."); | |
393 | } | |
394 | ||
395 | // TODO: Add more validations later | |
396 | return s; | |
397 | } | |
398 | ||
399 | Status Replayer::ReadTrace(Trace* trace) { | |
400 | assert(trace != nullptr); | |
401 | std::string encoded_trace; | |
402 | Status s = trace_reader_->Read(&encoded_trace); | |
403 | if (!s.ok()) { | |
404 | return s; | |
405 | } | |
f67539c2 TL |
406 | return TracerHelper::DecodeTrace(encoded_trace, trace); |
407 | } | |
11fdf7f2 | 408 | |
f67539c2 TL |
409 | void Replayer::BGWorkGet(void* arg) { |
410 | std::unique_ptr<ReplayerWorkerArg> ra( | |
411 | reinterpret_cast<ReplayerWorkerArg*>(arg)); | |
20effc67 | 412 | assert(ra != nullptr); |
f67539c2 TL |
413 | auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>( |
414 | ra->cf_map); | |
415 | uint32_t cf_id = 0; | |
416 | Slice key; | |
417 | DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); | |
418 | if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { | |
419 | return; | |
420 | } | |
421 | ||
422 | std::string value; | |
423 | if (cf_id == 0) { | |
424 | ra->db->Get(ra->roptions, key, &value); | |
425 | } else { | |
426 | ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value); | |
427 | } | |
428 | ||
429 | return; | |
430 | } | |
431 | ||
432 | void Replayer::BGWorkWriteBatch(void* arg) { | |
433 | std::unique_ptr<ReplayerWorkerArg> ra( | |
434 | reinterpret_cast<ReplayerWorkerArg*>(arg)); | |
20effc67 | 435 | assert(ra != nullptr); |
f67539c2 TL |
436 | WriteBatch batch(ra->trace_entry.payload); |
437 | ra->db->Write(ra->woptions, &batch); | |
438 | return; | |
439 | } | |
440 | ||
441 | void Replayer::BGWorkIterSeek(void* arg) { | |
442 | std::unique_ptr<ReplayerWorkerArg> ra( | |
443 | reinterpret_cast<ReplayerWorkerArg*>(arg)); | |
20effc67 | 444 | assert(ra != nullptr); |
f67539c2 TL |
445 | auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>( |
446 | ra->cf_map); | |
447 | uint32_t cf_id = 0; | |
448 | Slice key; | |
449 | DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); | |
450 | if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { | |
451 | return; | |
452 | } | |
453 | ||
454 | std::string value; | |
455 | Iterator* single_iter = nullptr; | |
456 | if (cf_id == 0) { | |
457 | single_iter = ra->db->NewIterator(ra->roptions); | |
458 | } else { | |
459 | single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]); | |
460 | } | |
461 | single_iter->Seek(key); | |
462 | delete single_iter; | |
463 | return; | |
464 | } | |
465 | ||
466 | void Replayer::BGWorkIterSeekForPrev(void* arg) { | |
467 | std::unique_ptr<ReplayerWorkerArg> ra( | |
468 | reinterpret_cast<ReplayerWorkerArg*>(arg)); | |
20effc67 | 469 | assert(ra != nullptr); |
f67539c2 TL |
470 | auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>( |
471 | ra->cf_map); | |
472 | uint32_t cf_id = 0; | |
473 | Slice key; | |
474 | DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); | |
475 | if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { | |
476 | return; | |
477 | } | |
478 | ||
479 | std::string value; | |
480 | Iterator* single_iter = nullptr; | |
481 | if (cf_id == 0) { | |
482 | single_iter = ra->db->NewIterator(ra->roptions); | |
483 | } else { | |
484 | single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]); | |
485 | } | |
486 | single_iter->SeekForPrev(key); | |
487 | delete single_iter; | |
488 | return; | |
11fdf7f2 TL |
489 | } |
490 | ||
f67539c2 | 491 | } // namespace ROCKSDB_NAMESPACE |