1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "trace_replay/trace_replay.h"
12 #include "db/db_impl/db_impl.h"
13 #include "rocksdb/env.h"
14 #include "rocksdb/iterator.h"
15 #include "rocksdb/options.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/system_clock.h"
18 #include "rocksdb/trace_reader_writer.h"
19 #include "rocksdb/write_batch.h"
20 #include "util/coding.h"
21 #include "util/string_util.h"
23 namespace ROCKSDB_NAMESPACE
{
25 const std::string kTraceMagic
= "feedcafedeadbeef";
28 void DecodeCFAndKey(std::string
& buffer
, uint32_t* cf_id
, Slice
* key
) {
30 GetFixed32(&buf
, cf_id
);
31 GetLengthPrefixedSlice(&buf
, key
);
35 Status
TracerHelper::ParseVersionStr(std::string
& v_string
, int* v_num
) {
36 if (v_string
.find_first_of('.') == std::string::npos
||
37 v_string
.find_first_of('.') != v_string
.find_last_of('.')) {
38 return Status::Corruption(
39 "Corrupted trace file. Incorrect version format.");
42 for (int i
= 0; i
< static_cast<int>(v_string
.size()); i
++) {
43 if (v_string
[i
] == '.') {
45 } else if (isdigit(v_string
[i
])) {
46 tmp_num
= tmp_num
* 10 + (v_string
[i
] - '0');
48 return Status::Corruption(
49 "Corrupted trace file. Incorrect version format");
56 Status
TracerHelper::ParseTraceHeader(const Trace
& header
, int* trace_version
,
58 std::vector
<std::string
> s_vec
;
60 for (int i
= 0; i
< 3; i
++) {
61 assert(header
.payload
.find("\t", begin
) != std::string::npos
);
62 end
= static_cast<int>(header
.payload
.find("\t", begin
));
63 s_vec
.push_back(header
.payload
.substr(begin
, end
- begin
));
67 std::string t_v_str
, db_v_str
;
68 assert(s_vec
.size() == 3);
69 assert(s_vec
[1].find("Trace Version: ") != std::string::npos
);
70 t_v_str
= s_vec
[1].substr(15);
71 assert(s_vec
[2].find("RocksDB Version: ") != std::string::npos
);
72 db_v_str
= s_vec
[2].substr(17);
75 s
= ParseVersionStr(t_v_str
, trace_version
);
76 if (s
!= Status::OK()) {
79 s
= ParseVersionStr(db_v_str
, db_version
);
83 void TracerHelper::EncodeTrace(const Trace
& trace
, std::string
* encoded_trace
) {
84 assert(encoded_trace
);
85 PutFixed64(encoded_trace
, trace
.ts
);
86 encoded_trace
->push_back(trace
.type
);
87 PutFixed32(encoded_trace
, static_cast<uint32_t>(trace
.payload
.size()));
88 encoded_trace
->append(trace
.payload
);
91 Status
TracerHelper::DecodeTrace(const std::string
& encoded_trace
,
93 assert(trace
!= nullptr);
94 Slice enc_slice
= Slice(encoded_trace
);
95 if (!GetFixed64(&enc_slice
, &trace
->ts
)) {
96 return Status::Incomplete("Decode trace string failed");
98 if (enc_slice
.size() < kTraceTypeSize
+ kTracePayloadLengthSize
) {
99 return Status::Incomplete("Decode trace string failed");
101 trace
->type
= static_cast<TraceType
>(enc_slice
[0]);
102 enc_slice
.remove_prefix(kTraceTypeSize
+ kTracePayloadLengthSize
);
103 trace
->payload
= enc_slice
.ToString();
107 Status
TracerHelper::DecodeHeader(const std::string
& encoded_trace
,
109 Status s
= TracerHelper::DecodeTrace(encoded_trace
, header
);
111 if (header
->type
!= kTraceBegin
) {
112 return Status::Corruption("Corrupted trace file. Incorrect header.");
114 if (header
->payload
.substr(0, kTraceMagic
.length()) != kTraceMagic
) {
115 return Status::Corruption("Corrupted trace file. Incorrect magic.");
121 bool TracerHelper::SetPayloadMap(uint64_t& payload_map
,
122 const TracePayloadType payload_type
) {
123 uint64_t old_state
= payload_map
;
125 payload_map
|= (tmp
<< payload_type
);
126 return old_state
!= payload_map
;
129 Status
TracerHelper::DecodeTraceRecord(Trace
* trace
, int trace_file_version
,
130 std::unique_ptr
<TraceRecord
>* record
) {
131 assert(trace
!= nullptr);
133 if (record
!= nullptr) {
134 record
->reset(nullptr);
137 switch (trace
->type
) {
141 if (trace_file_version
< 2) {
142 rep
.PinSelf(trace
->payload
);
144 Slice
buf(trace
->payload
);
145 GetFixed64(&buf
, &trace
->payload_map
);
146 int64_t payload_map
= static_cast<int64_t>(trace
->payload_map
);
147 Slice write_batch_data
;
148 while (payload_map
) {
149 // Find the rightmost set bit.
151 static_cast<uint32_t>(log2(payload_map
& -payload_map
));
153 case TracePayloadType::kWriteBatchData
: {
154 GetLengthPrefixedSlice(&buf
, &write_batch_data
);
161 // unset the rightmost bit.
162 payload_map
&= (payload_map
- 1);
164 rep
.PinSelf(write_batch_data
);
167 if (record
!= nullptr) {
168 record
->reset(new WriteQueryTraceRecord(std::move(rep
), trace
->ts
));
178 if (trace_file_version
< 2) {
179 DecodeCFAndKey(trace
->payload
, &cf_id
, &get_key
);
181 Slice
buf(trace
->payload
);
182 GetFixed64(&buf
, &trace
->payload_map
);
183 int64_t payload_map
= static_cast<int64_t>(trace
->payload_map
);
184 while (payload_map
) {
185 // Find the rightmost set bit.
187 static_cast<uint32_t>(log2(payload_map
& -payload_map
));
189 case TracePayloadType::kGetCFID
: {
190 GetFixed32(&buf
, &cf_id
);
193 case TracePayloadType::kGetKey
: {
194 GetLengthPrefixedSlice(&buf
, &get_key
);
201 // unset the rightmost bit.
202 payload_map
&= (payload_map
- 1);
206 if (record
!= nullptr) {
209 record
->reset(new GetQueryTraceRecord(cf_id
, std::move(ps
), trace
->ts
));
214 // Iterator Seek and SeekForPrev
215 case kTraceIteratorSeek
:
216 case kTraceIteratorSeekForPrev
: {
222 if (trace_file_version
< 2) {
223 DecodeCFAndKey(trace
->payload
, &cf_id
, &iter_key
);
225 Slice
buf(trace
->payload
);
226 GetFixed64(&buf
, &trace
->payload_map
);
227 int64_t payload_map
= static_cast<int64_t>(trace
->payload_map
);
228 while (payload_map
) {
229 // Find the rightmost set bit.
231 static_cast<uint32_t>(log2(payload_map
& -payload_map
));
233 case TracePayloadType::kIterCFID
: {
234 GetFixed32(&buf
, &cf_id
);
237 case TracePayloadType::kIterKey
: {
238 GetLengthPrefixedSlice(&buf
, &iter_key
);
241 case TracePayloadType::kIterLowerBound
: {
242 GetLengthPrefixedSlice(&buf
, &lower_bound
);
245 case TracePayloadType::kIterUpperBound
: {
246 GetLengthPrefixedSlice(&buf
, &upper_bound
);
253 // unset the rightmost bit.
254 payload_map
&= (payload_map
- 1);
258 if (record
!= nullptr) {
259 PinnableSlice ps_key
;
260 ps_key
.PinSelf(iter_key
);
261 PinnableSlice ps_lower
;
262 ps_lower
.PinSelf(lower_bound
);
263 PinnableSlice ps_upper
;
264 ps_upper
.PinSelf(upper_bound
);
265 record
->reset(new IteratorSeekQueryTraceRecord(
266 static_cast<IteratorSeekQueryTraceRecord::SeekType
>(trace
->type
),
267 cf_id
, std::move(ps_key
), std::move(ps_lower
), std::move(ps_upper
),
274 case kTraceMultiGet
: {
275 if (trace_file_version
< 2) {
276 return Status::Corruption("MultiGet is not supported.");
279 uint32_t multiget_size
= 0;
280 std::vector
<uint32_t> cf_ids
;
281 std::vector
<PinnableSlice
> multiget_keys
;
285 Slice
buf(trace
->payload
);
286 GetFixed64(&buf
, &trace
->payload_map
);
287 int64_t payload_map
= static_cast<int64_t>(trace
->payload_map
);
288 while (payload_map
) {
289 // Find the rightmost set bit.
291 static_cast<uint32_t>(log2(payload_map
& -payload_map
));
293 case TracePayloadType::kMultiGetSize
: {
294 GetFixed32(&buf
, &multiget_size
);
297 case TracePayloadType::kMultiGetCFIDs
: {
298 GetLengthPrefixedSlice(&buf
, &cfids_payload
);
301 case TracePayloadType::kMultiGetKeys
: {
302 GetLengthPrefixedSlice(&buf
, &keys_payload
);
309 // unset the rightmost bit.
310 payload_map
&= (payload_map
- 1);
312 if (multiget_size
== 0) {
313 return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
316 // Decode the cfids_payload and keys_payload
317 cf_ids
.reserve(multiget_size
);
318 multiget_keys
.reserve(multiget_size
);
319 for (uint32_t i
= 0; i
< multiget_size
; i
++) {
322 GetFixed32(&cfids_payload
, &tmp_cfid
);
323 GetLengthPrefixedSlice(&keys_payload
, &tmp_key
);
324 cf_ids
.push_back(tmp_cfid
);
328 multiget_keys
.push_back(std::move(ps
));
331 if (record
!= nullptr) {
332 record
->reset(new MultiGetQueryTraceRecord(
333 std::move(cf_ids
), std::move(multiget_keys
), trace
->ts
));
339 return Status::NotSupported("Unsupported trace type.");
343 Tracer::Tracer(SystemClock
* clock
, const TraceOptions
& trace_options
,
344 std::unique_ptr
<TraceWriter
>&& trace_writer
)
346 trace_options_(trace_options
),
347 trace_writer_(std::move(trace_writer
)),
348 trace_request_count_(0) {
349 // TODO: What if this fails?
350 WriteHeader().PermitUncheckedError();
353 Tracer::~Tracer() { trace_writer_
.reset(); }
355 Status
Tracer::Write(WriteBatch
* write_batch
) {
356 TraceType trace_type
= kTraceWrite
;
357 if (ShouldSkipTrace(trace_type
)) {
361 trace
.ts
= clock_
->NowMicros();
362 trace
.type
= trace_type
;
363 TracerHelper::SetPayloadMap(trace
.payload_map
,
364 TracePayloadType::kWriteBatchData
);
365 PutFixed64(&trace
.payload
, trace
.payload_map
);
366 PutLengthPrefixedSlice(&trace
.payload
, Slice(write_batch
->Data()));
367 return WriteTrace(trace
);
370 Status
Tracer::Get(ColumnFamilyHandle
* column_family
, const Slice
& key
) {
371 TraceType trace_type
= kTraceGet
;
372 if (ShouldSkipTrace(trace_type
)) {
376 trace
.ts
= clock_
->NowMicros();
377 trace
.type
= trace_type
;
378 // Set the payloadmap of the struct member that will be encoded in the
380 TracerHelper::SetPayloadMap(trace
.payload_map
, TracePayloadType::kGetCFID
);
381 TracerHelper::SetPayloadMap(trace
.payload_map
, TracePayloadType::kGetKey
);
382 // Encode the Get struct members into payload. Make sure add them in order.
383 PutFixed64(&trace
.payload
, trace
.payload_map
);
384 PutFixed32(&trace
.payload
, column_family
->GetID());
385 PutLengthPrefixedSlice(&trace
.payload
, key
);
386 return WriteTrace(trace
);
389 Status
Tracer::IteratorSeek(const uint32_t& cf_id
, const Slice
& key
,
390 const Slice
& lower_bound
, const Slice upper_bound
) {
391 TraceType trace_type
= kTraceIteratorSeek
;
392 if (ShouldSkipTrace(trace_type
)) {
396 trace
.ts
= clock_
->NowMicros();
397 trace
.type
= trace_type
;
398 // Set the payloadmap of the struct member that will be encoded in the
400 TracerHelper::SetPayloadMap(trace
.payload_map
, TracePayloadType::kIterCFID
);
401 TracerHelper::SetPayloadMap(trace
.payload_map
, TracePayloadType::kIterKey
);
402 if (lower_bound
.size() > 0) {
403 TracerHelper::SetPayloadMap(trace
.payload_map
,
404 TracePayloadType::kIterLowerBound
);
406 if (upper_bound
.size() > 0) {
407 TracerHelper::SetPayloadMap(trace
.payload_map
,
408 TracePayloadType::kIterUpperBound
);
410 // Encode the Iterator struct members into payload. Make sure add them in
412 PutFixed64(&trace
.payload
, trace
.payload_map
);
413 PutFixed32(&trace
.payload
, cf_id
);
414 PutLengthPrefixedSlice(&trace
.payload
, key
);
415 if (lower_bound
.size() > 0) {
416 PutLengthPrefixedSlice(&trace
.payload
, lower_bound
);
418 if (upper_bound
.size() > 0) {
419 PutLengthPrefixedSlice(&trace
.payload
, upper_bound
);
421 return WriteTrace(trace
);
424 Status
Tracer::IteratorSeekForPrev(const uint32_t& cf_id
, const Slice
& key
,
425 const Slice
& lower_bound
,
426 const Slice upper_bound
) {
427 TraceType trace_type
= kTraceIteratorSeekForPrev
;
428 if (ShouldSkipTrace(trace_type
)) {
432 trace
.ts
= clock_
->NowMicros();
433 trace
.type
= trace_type
;
434 // Set the payloadmap of the struct member that will be encoded in the
436 TracerHelper::SetPayloadMap(trace
.payload_map
, TracePayloadType::kIterCFID
);
437 TracerHelper::SetPayloadMap(trace
.payload_map
, TracePayloadType::kIterKey
);
438 if (lower_bound
.size() > 0) {
439 TracerHelper::SetPayloadMap(trace
.payload_map
,
440 TracePayloadType::kIterLowerBound
);
442 if (upper_bound
.size() > 0) {
443 TracerHelper::SetPayloadMap(trace
.payload_map
,
444 TracePayloadType::kIterUpperBound
);
446 // Encode the Iterator struct members into payload. Make sure add them in
448 PutFixed64(&trace
.payload
, trace
.payload_map
);
449 PutFixed32(&trace
.payload
, cf_id
);
450 PutLengthPrefixedSlice(&trace
.payload
, key
);
451 if (lower_bound
.size() > 0) {
452 PutLengthPrefixedSlice(&trace
.payload
, lower_bound
);
454 if (upper_bound
.size() > 0) {
455 PutLengthPrefixedSlice(&trace
.payload
, upper_bound
);
457 return WriteTrace(trace
);
460 Status
Tracer::MultiGet(const size_t num_keys
,
461 ColumnFamilyHandle
** column_families
,
466 std::vector
<ColumnFamilyHandle
*> v_column_families
;
467 std::vector
<Slice
> v_keys
;
468 v_column_families
.resize(num_keys
);
469 v_keys
.resize(num_keys
);
470 for (size_t i
= 0; i
< num_keys
; i
++) {
471 v_column_families
[i
] = column_families
[i
];
474 return MultiGet(v_column_families
, v_keys
);
477 Status
Tracer::MultiGet(const size_t num_keys
,
478 ColumnFamilyHandle
* column_family
, const Slice
* keys
) {
482 std::vector
<ColumnFamilyHandle
*> column_families
;
483 std::vector
<Slice
> v_keys
;
484 column_families
.resize(num_keys
);
485 v_keys
.resize(num_keys
);
486 for (size_t i
= 0; i
< num_keys
; i
++) {
487 column_families
[i
] = column_family
;
490 return MultiGet(column_families
, v_keys
);
493 Status
Tracer::MultiGet(const std::vector
<ColumnFamilyHandle
*>& column_families
,
494 const std::vector
<Slice
>& keys
) {
495 if (column_families
.size() != keys
.size()) {
496 return Status::Corruption("the CFs size and keys size does not match!");
498 TraceType trace_type
= kTraceMultiGet
;
499 if (ShouldSkipTrace(trace_type
)) {
502 uint32_t multiget_size
= static_cast<uint32_t>(keys
.size());
504 trace
.ts
= clock_
->NowMicros();
505 trace
.type
= trace_type
;
506 // Set the payloadmap of the struct member that will be encoded in the
508 TracerHelper::SetPayloadMap(trace
.payload_map
,
509 TracePayloadType::kMultiGetSize
);
510 TracerHelper::SetPayloadMap(trace
.payload_map
,
511 TracePayloadType::kMultiGetCFIDs
);
512 TracerHelper::SetPayloadMap(trace
.payload_map
,
513 TracePayloadType::kMultiGetKeys
);
514 // Encode the CFIDs inorder
515 std::string cfids_payload
;
516 std::string keys_payload
;
517 for (uint32_t i
= 0; i
< multiget_size
; i
++) {
518 assert(i
< column_families
.size());
519 assert(i
< keys
.size());
520 PutFixed32(&cfids_payload
, column_families
[i
]->GetID());
521 PutLengthPrefixedSlice(&keys_payload
, keys
[i
]);
523 // Encode the Get struct members into payload. Make sure add them in order.
524 PutFixed64(&trace
.payload
, trace
.payload_map
);
525 PutFixed32(&trace
.payload
, multiget_size
);
526 PutLengthPrefixedSlice(&trace
.payload
, cfids_payload
);
527 PutLengthPrefixedSlice(&trace
.payload
, keys_payload
);
528 return WriteTrace(trace
);
531 bool Tracer::ShouldSkipTrace(const TraceType
& trace_type
) {
532 if (IsTraceFileOverMax()) {
536 TraceFilterType filter_mask
= kTraceFilterNone
;
537 switch (trace_type
) {
541 filter_mask
= kTraceFilterNone
;
544 filter_mask
= kTraceFilterWrite
;
547 filter_mask
= kTraceFilterGet
;
549 case kTraceIteratorSeek
:
550 filter_mask
= kTraceFilterIteratorSeek
;
552 case kTraceIteratorSeekForPrev
:
553 filter_mask
= kTraceFilterIteratorSeekForPrev
;
555 case kBlockTraceIndexBlock
:
556 case kBlockTraceFilterBlock
:
557 case kBlockTraceDataBlock
:
558 case kBlockTraceUncompressionDictBlock
:
559 case kBlockTraceRangeDeletionBlock
:
561 filter_mask
= kTraceFilterNone
;
564 filter_mask
= kTraceFilterMultiGet
;
568 filter_mask
= kTraceFilterNone
;
571 if (filter_mask
!= kTraceFilterNone
&& trace_options_
.filter
& filter_mask
) {
575 ++trace_request_count_
;
576 if (trace_request_count_
< trace_options_
.sampling_frequency
) {
579 trace_request_count_
= 0;
583 bool Tracer::IsTraceFileOverMax() {
584 uint64_t trace_file_size
= trace_writer_
->GetFileSize();
585 return (trace_file_size
> trace_options_
.max_trace_file_size
);
588 Status
Tracer::WriteHeader() {
589 std::ostringstream s
;
590 s
<< kTraceMagic
<< "\t"
591 << "Trace Version: " << kTraceFileMajorVersion
<< "."
592 << kTraceFileMinorVersion
<< "\t"
593 << "RocksDB Version: " << kMajorVersion
<< "." << kMinorVersion
<< "\t"
594 << "Format: Timestamp OpType Payload\n";
595 std::string
header(s
.str());
598 trace
.ts
= clock_
->NowMicros();
599 trace
.type
= kTraceBegin
;
600 trace
.payload
= header
;
601 return WriteTrace(trace
);
604 Status
Tracer::WriteFooter() {
606 trace
.ts
= clock_
->NowMicros();
607 trace
.type
= kTraceEnd
;
608 TracerHelper::SetPayloadMap(trace
.payload_map
,
609 TracePayloadType::kEmptyPayload
);
611 return WriteTrace(trace
);
614 Status
Tracer::WriteTrace(const Trace
& trace
) {
615 std::string encoded_trace
;
616 TracerHelper::EncodeTrace(trace
, &encoded_trace
);
617 return trace_writer_
->Write(Slice(encoded_trace
));
620 Status
Tracer::Close() { return WriteFooter(); }
622 } // namespace ROCKSDB_NAMESPACE