1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
25 #include "db/db_impl/db_impl.h"
26 #include "db/memtable.h"
27 #include "db/write_batch_internal.h"
28 #include "env/composite_env_wrapper.h"
29 #include "file/read_write_util.h"
30 #include "file/writable_file_writer.h"
31 #include "options/cf_options.h"
32 #include "rocksdb/db.h"
33 #include "rocksdb/env.h"
34 #include "rocksdb/iterator.h"
35 #include "rocksdb/slice.h"
36 #include "rocksdb/slice_transform.h"
37 #include "rocksdb/status.h"
38 #include "rocksdb/table_properties.h"
39 #include "rocksdb/utilities/ldb_cmd.h"
40 #include "rocksdb/write_batch.h"
41 #include "table/meta_blocks.h"
42 #include "table/table_reader.h"
43 #include "tools/trace_analyzer_tool.h"
44 #include "trace_replay/trace_replay.h"
45 #include "util/coding.h"
46 #include "util/compression.h"
47 #include "util/gflags_compat.h"
48 #include "util/random.h"
49 #include "util/string_util.h"
51 using GFLAGS_NAMESPACE::ParseCommandLineFlags
;
53 DEFINE_string(trace_path
, "", "The trace file path.");
54 DEFINE_string(output_dir
, "", "The directory to store the output files.");
55 DEFINE_string(output_prefix
, "trace",
56 "The prefix used for all the output files.");
57 DEFINE_bool(output_key_stats
, false,
58 "Output the key access count statistics to file\n"
59 "for accessed keys:\n"
60 "file name: <prefix>-<query_type>-<cf_id>-accessed_key_stats.txt\n"
61 "Format:[cf_id value_size access_keyid access_count]\n"
62 "for the whole key space keys:\n"
63 "File name: <prefix>-<query_type>-<cf_id>-whole_key_stats.txt\n"
64 "Format:[whole_key_space_keyid access_count]");
65 DEFINE_bool(output_access_count_stats
, false,
66 "Output the access count distribution statistics to file.\n"
67 "File name: <prefix>-<query_type>-<cf_id>-accessed_"
68 "key_count_distribution.txt \n"
69 "Format:[access_count number_of_access_count]");
70 DEFINE_bool(output_time_series
, false,
71 "Output the access time in second of each key, "
72 "such that we can have the time series data of the queries \n"
73 "File name: <prefix>-<query_type>-<cf_id>-time_series.txt\n"
74 "Format:[type_id time_in_sec access_keyid].");
75 DEFINE_bool(try_process_corrupted_trace
, false,
76 "In default, trace_analyzer will exit if the trace file is "
77 "corrupted due to the unexpected tracing cases. If this option "
78 "is enabled, trace_analyzer will stop reading the trace file, "
79 "and start analyzing the read-in data.");
80 DEFINE_int32(output_prefix_cut
, 0,
81 "The number of bytes as prefix to cut the keys.\n"
82 "If it is enabled, it will generate the following:\n"
83 "For accessed keys:\n"
84 "File name: <prefix>-<query_type>-<cf_id>-"
85 "accessed_key_prefix_cut.txt \n"
86 "Format:[acessed_keyid access_count_of_prefix "
87 "number_of_keys_in_prefix average_key_access "
88 "prefix_succ_ratio prefix]\n"
89 "For whole key space keys:\n"
90 "File name: <prefix>-<query_type>-<cf_id>"
91 "-whole_key_prefix_cut.txt\n"
92 "Format:[start_keyid_in_whole_keyspace prefix]\n"
93 "if 'output_qps_stats' and 'top_k' are enabled, it will output:\n"
94 "File name: <prefix>-<query_type>-<cf_id>"
95 "-accessed_top_k_qps_prefix_cut.txt\n"
96 "Format:[the_top_ith_qps_time QPS], [prefix qps_of_this_second].");
97 DEFINE_bool(convert_to_human_readable_trace
, false,
98 "Convert the binary trace file to a human readable txt file "
99 "for further processing. "
100 "This file will be extremely large "
101 "(similar size as the original binary trace file). "
102 "You can specify 'no_key' to reduce the size, if key is not "
103 "needed in the next step.\n"
104 "File name: <prefix>_human_readable_trace.txt\n"
105 "Format:[type_id cf_id value_size time_in_micorsec <key>].");
106 DEFINE_bool(output_qps_stats
, false,
107 "Output the query per second(qps) statistics \n"
108 "For the overall qps, it will contain all qps of each query type. "
109 "The time is started from the first trace record\n"
110 "File name: <prefix>_qps_stats.txt\n"
111 "Format: [qps_type_1 qps_type_2 ...... overall_qps]\n"
112 "For each cf and query, it will have its own qps output.\n"
113 "File name: <prefix>-<query_type>-<cf_id>_qps_stats.txt \n"
114 "Format:[query_count_in_this_second].");
115 DEFINE_bool(no_print
, false, "Do not print out any result");
117 print_correlation
, "",
118 "intput format: [correlation pairs][.,.]\n"
119 "Output the query correlations between the pairs of query types "
120 "listed in the parameter, input should select the operations from:\n"
121 "get, put, delete, single_delete, rangle_delete, merge. No space "
122 "between the pairs separated by commar. Example: =[get,get]... "
123 "It will print out the number of pairs of 'A after B' and "
124 "the average time interval between the two query.");
125 DEFINE_string(key_space_dir
, "",
126 "<the directory stores full key space files> \n"
127 "The key space files should be: <column family id>.txt");
128 DEFINE_bool(analyze_get
, false, "Analyze the Get query.");
129 DEFINE_bool(analyze_put
, false, "Analyze the Put query.");
130 DEFINE_bool(analyze_delete
, false, "Analyze the Delete query.");
131 DEFINE_bool(analyze_single_delete
, false, "Analyze the SingleDelete query.");
132 DEFINE_bool(analyze_range_delete
, false, "Analyze the DeleteRange query.");
133 DEFINE_bool(analyze_merge
, false, "Analyze the Merge query.");
134 DEFINE_bool(analyze_iterator
, false,
135 " Analyze the iterate query like seek() and seekForPrev().");
136 DEFINE_bool(no_key
, false,
137 " Does not output the key to the result files to make smaller.");
138 DEFINE_bool(print_overall_stats
, true,
139 " Print the stats of the whole trace, "
140 "like total requests, keys, and etc.");
141 DEFINE_bool(output_key_distribution
, false, "Print the key size distribution.");
143 output_value_distribution
, false,
144 "Out put the value size distribution, only available for Put and Merge.\n"
145 "File name: <prefix>-<query_type>-<cf_id>"
146 "-accessed_value_size_distribution.txt\n"
147 "Format:[Number_of_value_size_between x and "
148 "x+value_interval is: <the count>]");
149 DEFINE_int32(print_top_k_access
, 1,
150 "<top K of the variables to be printed> "
151 "Print the top k accessed keys, top k accessed prefix "
153 DEFINE_int32(output_ignore_count
, 0,
154 "<threshold>, ignores the access count <= this value, "
155 "it will shorter the output.");
156 DEFINE_int32(value_interval
, 8,
157 "To output the value distribution, we need to set the value "
158 "intervals and make the statistic of the value size distribution "
159 "in different intervals. The default is 8.");
160 DEFINE_double(sample_ratio
, 1.0,
161 "If the trace size is extremely huge or user want to sample "
162 "the trace when analyzing, sample ratio can be set (0, 1.0]");
164 namespace ROCKSDB_NAMESPACE
{
166 std::map
<std::string
, int> taOptToIndex
= {
167 {"get", 0}, {"put", 1},
168 {"delete", 2}, {"single_delete", 3},
169 {"range_delete", 4}, {"merge", 5},
170 {"iterator_Seek", 6}, {"iterator_SeekForPrev", 7}};
172 std::map
<int, std::string
> taIndexToOpt
= {
173 {0, "get"}, {1, "put"},
174 {2, "delete"}, {3, "single_delete"},
175 {4, "range_delete"}, {5, "merge"},
176 {6, "iterator_Seek"}, {7, "iterator_SeekForPrev"}};
180 uint64_t MultiplyCheckOverflow(uint64_t op1
, uint64_t op2
) {
181 if (op1
== 0 || op2
== 0) {
184 if (port::kMaxUint64
/ op1
< op2
) {
190 void DecodeCFAndKeyFromString(std::string
& buffer
, uint32_t* cf_id
, Slice
* key
) {
192 GetFixed32(&buf
, cf_id
);
193 GetLengthPrefixedSlice(&buf
, key
);
198 // The default constructor of AnalyzerOptions
199 AnalyzerOptions::AnalyzerOptions()
200 : correlation_map(kTaTypeNum
, std::vector
<int>(kTaTypeNum
, -1)) {}
202 AnalyzerOptions::~AnalyzerOptions() {}
204 void AnalyzerOptions::SparseCorrelationInput(const std::string
& in_str
) {
205 std::string cur
= in_str
;
206 if (cur
.size() == 0) {
209 while (!cur
.empty()) {
210 if (cur
.compare(0, 1, "[") != 0) {
211 fprintf(stderr
, "Invalid correlation input: %s\n", in_str
.c_str());
214 std::string opt1
, opt2
;
215 std::size_t split
= cur
.find_first_of(",");
216 if (split
!= std::string::npos
) {
217 opt1
= cur
.substr(1, split
- 1);
219 fprintf(stderr
, "Invalid correlation input: %s\n", in_str
.c_str());
222 std::size_t end
= cur
.find_first_of("]");
223 if (end
!= std::string::npos
) {
224 opt2
= cur
.substr(split
+ 1, end
- split
- 1);
226 fprintf(stderr
, "Invalid correlation input: %s\n", in_str
.c_str());
229 cur
= cur
.substr(end
+ 1);
231 if (taOptToIndex
.find(opt1
) != taOptToIndex
.end() &&
232 taOptToIndex
.find(opt2
) != taOptToIndex
.end()) {
233 correlation_list
.push_back(
234 std::make_pair(taOptToIndex
[opt1
], taOptToIndex
[opt2
]));
236 fprintf(stderr
, "Invalid correlation input: %s\n", in_str
.c_str());
242 for (auto& it
: correlation_list
) {
243 correlation_map
[it
.first
][it
.second
] = sequence
;
249 // The trace statistic struct constructor
250 TraceStats::TraceStats() {
255 a_key_size_sqsum
= 0;
258 a_value_size_sqsum
= 0;
259 a_value_size_sum
= 0;
265 TraceStats::~TraceStats() {}
267 // The trace analyzer constructor
268 TraceAnalyzer::TraceAnalyzer(std::string
& trace_path
, std::string
& output_path
,
269 AnalyzerOptions _analyzer_opts
)
270 : trace_name_(trace_path
),
271 output_path_(output_path
),
272 analyzer_opts_(_analyzer_opts
) {
273 ROCKSDB_NAMESPACE::EnvOptions env_options
;
274 env_
= ROCKSDB_NAMESPACE::Env::Default();
278 total_access_keys_
= 0;
281 trace_create_time_
= 0;
284 time_series_start_
= 0;
286 if (FLAGS_sample_ratio
> 1.0 || FLAGS_sample_ratio
<= 0) {
289 sample_max_
= static_cast<uint32_t>(1.0 / FLAGS_sample_ratio
);
292 ta_
.resize(kTaTypeNum
);
293 ta_
[0].type_name
= "get";
294 if (FLAGS_analyze_get
) {
295 ta_
[0].enabled
= true;
297 ta_
[0].enabled
= false;
299 ta_
[1].type_name
= "put";
300 if (FLAGS_analyze_put
) {
301 ta_
[1].enabled
= true;
303 ta_
[1].enabled
= false;
305 ta_
[2].type_name
= "delete";
306 if (FLAGS_analyze_delete
) {
307 ta_
[2].enabled
= true;
309 ta_
[2].enabled
= false;
311 ta_
[3].type_name
= "single_delete";
312 if (FLAGS_analyze_single_delete
) {
313 ta_
[3].enabled
= true;
315 ta_
[3].enabled
= false;
317 ta_
[4].type_name
= "range_delete";
318 if (FLAGS_analyze_range_delete
) {
319 ta_
[4].enabled
= true;
321 ta_
[4].enabled
= false;
323 ta_
[5].type_name
= "merge";
324 if (FLAGS_analyze_merge
) {
325 ta_
[5].enabled
= true;
327 ta_
[5].enabled
= false;
329 ta_
[6].type_name
= "iterator_Seek";
330 if (FLAGS_analyze_iterator
) {
331 ta_
[6].enabled
= true;
333 ta_
[6].enabled
= false;
335 ta_
[7].type_name
= "iterator_SeekForPrev";
336 if (FLAGS_analyze_iterator
) {
337 ta_
[7].enabled
= true;
339 ta_
[7].enabled
= false;
341 for (int i
= 0; i
< kTaTypeNum
; i
++) {
342 ta_
[i
].sample_count
= 0;
346 TraceAnalyzer::~TraceAnalyzer() {}
348 // Prepare the processing
349 // Initiate the global trace reader and writer here
350 Status
TraceAnalyzer::PrepareProcessing() {
352 // Prepare the trace reader
353 s
= NewFileTraceReader(env_
, env_options_
, trace_name_
, &trace_reader_
);
358 // Prepare and open the trace sequence file writer if needed
359 if (FLAGS_convert_to_human_readable_trace
) {
360 std::string trace_sequence_name
;
361 trace_sequence_name
=
362 output_path_
+ "/" + FLAGS_output_prefix
+ "-human_readable_trace.txt";
363 s
= env_
->NewWritableFile(trace_sequence_name
, &trace_sequence_f_
,
370 // prepare the general QPS file writer
371 if (FLAGS_output_qps_stats
) {
372 std::string qps_stats_name
;
374 output_path_
+ "/" + FLAGS_output_prefix
+ "-qps_stats.txt";
375 s
= env_
->NewWritableFile(qps_stats_name
, &qps_f_
, env_options_
);
381 output_path_
+ "/" + FLAGS_output_prefix
+ "-cf_qps_stats.txt";
382 s
= env_
->NewWritableFile(qps_stats_name
, &cf_qps_f_
, env_options_
);
390 Status
TraceAnalyzer::ReadTraceHeader(Trace
* header
) {
391 assert(header
!= nullptr);
392 Status s
= ReadTraceRecord(header
);
396 if (header
->type
!= kTraceBegin
) {
397 return Status::Corruption("Corrupted trace file. Incorrect header.");
399 if (header
->payload
.substr(0, kTraceMagic
.length()) != kTraceMagic
) {
400 return Status::Corruption("Corrupted trace file. Incorrect magic.");
406 Status
TraceAnalyzer::ReadTraceFooter(Trace
* footer
) {
407 assert(footer
!= nullptr);
408 Status s
= ReadTraceRecord(footer
);
412 if (footer
->type
!= kTraceEnd
) {
413 return Status::Corruption("Corrupted trace file. Incorrect footer.");
418 Status
TraceAnalyzer::ReadTraceRecord(Trace
* trace
) {
419 assert(trace
!= nullptr);
420 std::string encoded_trace
;
421 Status s
= trace_reader_
->Read(&encoded_trace
);
426 Slice enc_slice
= Slice(encoded_trace
);
427 GetFixed64(&enc_slice
, &trace
->ts
);
428 trace
->type
= static_cast<TraceType
>(enc_slice
[0]);
429 enc_slice
.remove_prefix(kTraceTypeSize
+ kTracePayloadLengthSize
);
430 trace
->payload
= enc_slice
.ToString();
434 // process the trace itself and redirect the trace content
435 // to different operation type handler. With different race
436 // format, this function can be changed
437 Status
TraceAnalyzer::StartProcessing() {
440 s
= ReadTraceHeader(&header
);
442 fprintf(stderr
, "Cannot read the header\n");
445 trace_create_time_
= header
.ts
;
446 if (FLAGS_output_time_series
) {
447 time_series_start_
= header
.ts
;
453 s
= ReadTraceRecord(&trace
);
459 end_time_
= trace
.ts
;
460 if (trace
.type
== kTraceWrite
) {
463 WriteBatch
batch(trace
.payload
);
465 // Note that, if the write happens in a transaction,
466 // 'Write' will be called twice, one for Prepare, one for
467 // Commit. Thus, in the trace, for the same WriteBatch, there
468 // will be two reords if it is in a transaction. Here, we only
469 // process the reord that is committed. If write is non-transaction,
470 // HasBeginPrepare()==false, so we process it normally.
471 if (batch
.HasBeginPrepare() && !batch
.HasCommit()) {
474 TraceWriteHandler
write_handler(this);
475 s
= batch
.Iterate(&write_handler
);
477 fprintf(stderr
, "Cannot process the write batch in the trace\n");
480 } else if (trace
.type
== kTraceGet
) {
483 DecodeCFAndKeyFromString(trace
.payload
, &cf_id
, &key
);
486 s
= HandleGet(cf_id
, key
.ToString(), trace
.ts
, 1);
488 fprintf(stderr
, "Cannot process the get in the trace\n");
491 } else if (trace
.type
== kTraceIteratorSeek
||
492 trace
.type
== kTraceIteratorSeekForPrev
) {
495 DecodeCFAndKeyFromString(trace
.payload
, &cf_id
, &key
);
496 s
= HandleIter(cf_id
, key
.ToString(), trace
.ts
, trace
.type
);
498 fprintf(stderr
, "Cannot process the iterator in the trace\n");
501 } else if (trace
.type
== kTraceEnd
) {
505 if (s
.IsIncomplete()) {
506 // Fix it: Reaching eof returns Incomplete status at the moment.
513 // After the trace is processed by StartProcessing, the statistic data
514 // is stored in the map or other in memory data structures. To get the
515 // other statistic result such as key size distribution, value size
516 // distribution, these data structures are re-processed here.
517 Status
TraceAnalyzer::MakeStatistics() {
520 for (int type
= 0; type
< kTaTypeNum
; type
++) {
521 if (!ta_
[type
].enabled
) {
524 for (auto& stat
: ta_
[type
].stats
) {
525 stat
.second
.a_key_id
= 0;
526 for (auto& record
: stat
.second
.a_key_stats
) {
527 record
.second
.key_id
= stat
.second
.a_key_id
;
528 stat
.second
.a_key_id
++;
529 if (record
.second
.access_count
<=
530 static_cast<uint64_t>(FLAGS_output_ignore_count
)) {
534 // Generate the key access count distribution data
535 if (FLAGS_output_access_count_stats
) {
536 if (stat
.second
.a_count_stats
.find(record
.second
.access_count
) ==
537 stat
.second
.a_count_stats
.end()) {
538 stat
.second
.a_count_stats
[record
.second
.access_count
] = 1;
540 stat
.second
.a_count_stats
[record
.second
.access_count
]++;
544 // Generate the key size distribution data
545 if (FLAGS_output_key_distribution
) {
546 if (stat
.second
.a_key_size_stats
.find(record
.first
.size()) ==
547 stat
.second
.a_key_size_stats
.end()) {
548 stat
.second
.a_key_size_stats
[record
.first
.size()] = 1;
550 stat
.second
.a_key_size_stats
[record
.first
.size()]++;
554 if (!FLAGS_print_correlation
.empty()) {
555 s
= MakeStatisticCorrelation(stat
.second
, record
.second
);
562 // Output the prefix cut or the whole content of the accessed key space
563 if (FLAGS_output_key_stats
|| FLAGS_output_prefix_cut
> 0) {
564 s
= MakeStatisticKeyStatsOrPrefix(stat
.second
);
570 // output the access count distribution
571 if (FLAGS_output_access_count_stats
&& stat
.second
.a_count_dist_f
) {
572 for (auto& record
: stat
.second
.a_count_stats
) {
573 ret
= snprintf(buffer_
, sizeof(buffer_
),
574 "access_count: %" PRIu64
" num: %" PRIu64
"\n",
575 record
.first
, record
.second
);
577 return Status::IOError("Format the output failed");
579 std::string
printout(buffer_
);
580 s
= stat
.second
.a_count_dist_f
->Append(printout
);
582 fprintf(stderr
, "Write access count distribution file failed\n");
588 // find the medium of the key size
589 uint64_t k_count
= 0;
590 bool get_mid
= false;
591 for (auto& record
: stat
.second
.a_key_size_stats
) {
592 k_count
+= record
.second
;
593 if (!get_mid
&& k_count
>= stat
.second
.a_key_mid
) {
594 stat
.second
.a_key_mid
= record
.first
;
597 if (FLAGS_output_key_distribution
&& stat
.second
.a_key_size_f
) {
598 ret
= snprintf(buffer_
, sizeof(buffer_
), "%" PRIu64
" %" PRIu64
"\n",
599 record
.first
, record
.second
);
601 return Status::IOError("Format output failed");
603 std::string
printout(buffer_
);
604 s
= stat
.second
.a_key_size_f
->Append(printout
);
606 fprintf(stderr
, "Write key size distribution file failed\n");
612 // output the value size distribution
613 uint64_t v_begin
= 0, v_end
= 0, v_count
= 0;
615 for (auto& record
: stat
.second
.a_value_size_stats
) {
617 v_end
= (record
.first
+ 1) * FLAGS_value_interval
;
618 v_count
+= record
.second
;
619 if (!get_mid
&& v_count
>= stat
.second
.a_count
/ 2) {
620 stat
.second
.a_value_mid
= (v_begin
+ v_end
) / 2;
623 if (FLAGS_output_value_distribution
&& stat
.second
.a_value_size_f
&&
624 (type
== TraceOperationType::kPut
||
625 type
== TraceOperationType::kMerge
)) {
626 ret
= snprintf(buffer_
, sizeof(buffer_
),
627 "Number_of_value_size_between %" PRIu64
" and %" PRIu64
628 " is: %" PRIu64
"\n",
629 v_begin
, v_end
, record
.second
);
631 return Status::IOError("Format output failed");
633 std::string
printout(buffer_
);
634 s
= stat
.second
.a_value_size_f
->Append(printout
);
636 fprintf(stderr
, "Write value size distribution file failed\n");
644 // Make the QPS statistics
645 if (FLAGS_output_qps_stats
) {
646 s
= MakeStatisticQPS();
655 // Process the statistics of the key access and
656 // prefix of the accessed keys if required
657 Status
TraceAnalyzer::MakeStatisticKeyStatsOrPrefix(TraceStats
& stats
) {
660 std::string prefix
= "0";
661 uint64_t prefix_access
= 0;
662 uint64_t prefix_count
= 0;
663 uint64_t prefix_succ_access
= 0;
664 double prefix_ave_access
= 0.0;
665 stats
.a_succ_count
= 0;
666 for (auto& record
: stats
.a_key_stats
) {
667 // write the key access statistic file
668 if (!stats
.a_key_f
) {
669 return Status::IOError("Failed to open accessed_key_stats file.");
671 stats
.a_succ_count
+= record
.second
.succ_count
;
672 double succ_ratio
= 0.0;
673 if (record
.second
.access_count
> 0) {
674 succ_ratio
= (static_cast<double>(record
.second
.succ_count
)) /
675 record
.second
.access_count
;
677 ret
= snprintf(buffer_
, sizeof(buffer_
),
678 "%u %zu %" PRIu64
" %" PRIu64
" %f\n", record
.second
.cf_id
,
679 record
.second
.value_size
, record
.second
.key_id
,
680 record
.second
.access_count
, succ_ratio
);
682 return Status::IOError("Format output failed");
684 std::string
printout(buffer_
);
685 s
= stats
.a_key_f
->Append(printout
);
687 fprintf(stderr
, "Write key access file failed\n");
691 // write the prefix cut of the accessed keys
692 if (FLAGS_output_prefix_cut
> 0 && stats
.a_prefix_cut_f
) {
693 if (record
.first
.compare(0, FLAGS_output_prefix_cut
, prefix
) != 0) {
694 std::string prefix_out
=
695 ROCKSDB_NAMESPACE::LDBCommand::StringToHex(prefix
);
696 if (prefix_count
== 0) {
697 prefix_ave_access
= 0.0;
700 (static_cast<double>(prefix_access
)) / prefix_count
;
702 double prefix_succ_ratio
= 0.0;
703 if (prefix_access
> 0) {
705 (static_cast<double>(prefix_succ_access
)) / prefix_access
;
708 snprintf(buffer_
, sizeof(buffer_
),
709 "%" PRIu64
" %" PRIu64
" %" PRIu64
" %f %f %s\n",
710 record
.second
.key_id
, prefix_access
, prefix_count
,
711 prefix_ave_access
, prefix_succ_ratio
, prefix_out
.c_str());
713 return Status::IOError("Format output failed");
715 std::string
pout(buffer_
);
716 s
= stats
.a_prefix_cut_f
->Append(pout
);
718 fprintf(stderr
, "Write accessed key prefix file failed\n");
722 // make the top k statistic for the prefix
723 if (static_cast<int32_t>(stats
.top_k_prefix_access
.size()) <
724 FLAGS_print_top_k_access
) {
725 stats
.top_k_prefix_access
.push(
726 std::make_pair(prefix_access
, prefix_out
));
728 if (prefix_access
> stats
.top_k_prefix_access
.top().first
) {
729 stats
.top_k_prefix_access
.pop();
730 stats
.top_k_prefix_access
.push(
731 std::make_pair(prefix_access
, prefix_out
));
735 if (static_cast<int32_t>(stats
.top_k_prefix_ave
.size()) <
736 FLAGS_print_top_k_access
) {
737 stats
.top_k_prefix_ave
.push(
738 std::make_pair(prefix_ave_access
, prefix_out
));
740 if (prefix_ave_access
> stats
.top_k_prefix_ave
.top().first
) {
741 stats
.top_k_prefix_ave
.pop();
742 stats
.top_k_prefix_ave
.push(
743 std::make_pair(prefix_ave_access
, prefix_out
));
747 prefix
= record
.first
.substr(0, FLAGS_output_prefix_cut
);
750 prefix_succ_access
= 0;
752 prefix_access
+= record
.second
.access_count
;
754 prefix_succ_access
+= record
.second
.succ_count
;
760 // Process the statistics of different query type
762 Status
TraceAnalyzer::MakeStatisticCorrelation(TraceStats
& stats
,
764 if (stats
.correlation_output
.size() !=
765 analyzer_opts_
.correlation_list
.size()) {
766 return Status::Corruption("Cannot make the statistic of correlation.");
769 for (int i
= 0; i
< static_cast<int>(analyzer_opts_
.correlation_list
.size());
771 if (i
>= static_cast<int>(stats
.correlation_output
.size()) ||
772 i
>= static_cast<int>(unit
.v_correlation
.size())) {
775 stats
.correlation_output
[i
].first
+= unit
.v_correlation
[i
].count
;
776 stats
.correlation_output
[i
].second
+= unit
.v_correlation
[i
].total_ts
;
781 // Process the statistics of QPS
782 Status
TraceAnalyzer::MakeStatisticQPS() {
783 if(begin_time_
== 0) {
784 begin_time_
= trace_create_time_
;
787 static_cast<uint32_t>((end_time_
- begin_time_
) / 1000000);
790 std::vector
<std::vector
<uint32_t>> type_qps(
791 duration
, std::vector
<uint32_t>(kTaTypeNum
+ 1, 0));
792 std::vector
<uint64_t> qps_sum(kTaTypeNum
+ 1, 0);
793 std::vector
<uint32_t> qps_peak(kTaTypeNum
+ 1, 0);
794 qps_ave_
.resize(kTaTypeNum
+ 1);
796 for (int type
= 0; type
< kTaTypeNum
; type
++) {
797 if (!ta_
[type
].enabled
) {
800 for (auto& stat
: ta_
[type
].stats
) {
801 uint32_t time_line
= 0;
802 uint64_t cf_qps_sum
= 0;
803 for (auto& time_it
: stat
.second
.a_qps_stats
) {
804 if (time_it
.first
>= duration
) {
807 type_qps
[time_it
.first
][kTaTypeNum
] += time_it
.second
;
808 type_qps
[time_it
.first
][type
] += time_it
.second
;
809 cf_qps_sum
+= time_it
.second
;
810 if (time_it
.second
> stat
.second
.a_peak_qps
) {
811 stat
.second
.a_peak_qps
= time_it
.second
;
813 if (stat
.second
.a_qps_f
) {
814 while (time_line
< time_it
.first
) {
815 ret
= snprintf(buffer_
, sizeof(buffer_
), "%u\n", 0);
817 return Status::IOError("Format the output failed");
819 std::string
printout(buffer_
);
820 s
= stat
.second
.a_qps_f
->Append(printout
);
822 fprintf(stderr
, "Write QPS file failed\n");
827 ret
= snprintf(buffer_
, sizeof(buffer_
), "%u\n", time_it
.second
);
829 return Status::IOError("Format the output failed");
831 std::string
printout(buffer_
);
832 s
= stat
.second
.a_qps_f
->Append(printout
);
834 fprintf(stderr
, "Write QPS file failed\n");
837 if (time_line
== time_it
.first
) {
842 // Process the top k QPS peaks
843 if (FLAGS_output_prefix_cut
> 0) {
844 if (static_cast<int32_t>(stat
.second
.top_k_qps_sec
.size()) <
845 FLAGS_print_top_k_access
) {
846 stat
.second
.top_k_qps_sec
.push(
847 std::make_pair(time_it
.second
, time_it
.first
));
849 if (stat
.second
.top_k_qps_sec
.size() > 0 &&
850 stat
.second
.top_k_qps_sec
.top().first
< time_it
.second
) {
851 stat
.second
.top_k_qps_sec
.pop();
852 stat
.second
.top_k_qps_sec
.push(
853 std::make_pair(time_it
.second
, time_it
.first
));
859 stat
.second
.a_ave_qps
= 0;
861 stat
.second
.a_ave_qps
= (static_cast<double>(cf_qps_sum
)) / duration
;
864 // Output the accessed unique key number change overtime
865 if (stat
.second
.a_key_num_f
) {
866 uint64_t cur_uni_key
=
867 static_cast<uint64_t>(stat
.second
.a_key_stats
.size());
868 double cur_ratio
= 0.0;
869 uint64_t cur_num
= 0;
870 for (uint32_t i
= 0; i
< duration
; i
++) {
871 auto find_time
= stat
.second
.uni_key_num
.find(i
);
872 if (find_time
!= stat
.second
.uni_key_num
.end()) {
873 cur_ratio
= (static_cast<double>(find_time
->second
)) / cur_uni_key
;
874 cur_num
= find_time
->second
;
876 ret
= snprintf(buffer_
, sizeof(buffer_
), "%" PRIu64
" %.12f\n",
879 return Status::IOError("Format the output failed");
881 std::string
printout(buffer_
);
882 s
= stat
.second
.a_key_num_f
->Append(printout
);
885 "Write accessed unique key number change file failed\n");
891 // output the prefix of top k access peak
892 if (FLAGS_output_prefix_cut
> 0 && stat
.second
.a_top_qps_prefix_f
) {
893 while (!stat
.second
.top_k_qps_sec
.empty()) {
894 ret
= snprintf(buffer_
, sizeof(buffer_
), "At time: %u with QPS: %u\n",
895 stat
.second
.top_k_qps_sec
.top().second
,
896 stat
.second
.top_k_qps_sec
.top().first
);
898 return Status::IOError("Format the output failed");
900 std::string
printout(buffer_
);
901 s
= stat
.second
.a_top_qps_prefix_f
->Append(printout
);
903 fprintf(stderr
, "Write prefix QPS top K file failed\n");
906 uint32_t qps_time
= stat
.second
.top_k_qps_sec
.top().second
;
907 stat
.second
.top_k_qps_sec
.pop();
908 if (stat
.second
.a_qps_prefix_stats
.find(qps_time
) !=
909 stat
.second
.a_qps_prefix_stats
.end()) {
910 for (auto& qps_prefix
: stat
.second
.a_qps_prefix_stats
[qps_time
]) {
911 std::string qps_prefix_out
=
912 ROCKSDB_NAMESPACE::LDBCommand::StringToHex(qps_prefix
.first
);
913 ret
= snprintf(buffer_
, sizeof(buffer_
),
914 "The prefix: %s Access count: %u\n",
915 qps_prefix_out
.c_str(), qps_prefix
.second
);
917 return Status::IOError("Format the output failed");
919 std::string
pout(buffer_
);
920 s
= stat
.second
.a_top_qps_prefix_f
->Append(pout
);
922 fprintf(stderr
, "Write prefix QPS top K file failed\n");
933 for (uint32_t i
= 0; i
< duration
; i
++) {
934 for (int type
= 0; type
<= kTaTypeNum
; type
++) {
935 if (type
< kTaTypeNum
) {
936 ret
= snprintf(buffer_
, sizeof(buffer_
), "%u ", type_qps
[i
][type
]);
938 ret
= snprintf(buffer_
, sizeof(buffer_
), "%u\n", type_qps
[i
][type
]);
941 return Status::IOError("Format the output failed");
943 std::string
printout(buffer_
);
944 s
= qps_f_
->Append(printout
);
948 qps_sum
[type
] += type_qps
[i
][type
];
949 if (type_qps
[i
][type
] > qps_peak
[type
]) {
950 qps_peak
[type
] = type_qps
[i
][type
];
957 int cfs_size
= static_cast<uint32_t>(cfs_
.size());
959 for (uint32_t i
= 0; i
< duration
; i
++) {
960 for (int cf
= 0; cf
< cfs_size
; cf
++) {
961 if (cfs_
[cf
].cf_qps
.find(i
) != cfs_
[cf
].cf_qps
.end()) {
962 v
= cfs_
[cf
].cf_qps
[i
];
966 if (cf
< cfs_size
- 1) {
967 ret
= snprintf(buffer_
, sizeof(buffer_
), "%u ", v
);
969 ret
= snprintf(buffer_
, sizeof(buffer_
), "%u\n", v
);
972 return Status::IOError("Format the output failed");
974 std::string
printout(buffer_
);
975 s
= cf_qps_f_
->Append(printout
);
983 qps_peak_
= qps_peak
;
984 for (int type
= 0; type
<= kTaTypeNum
; type
++) {
988 qps_ave_
[type
] = (static_cast<double>(qps_sum
[type
])) / duration
;
995 // In reprocessing, if we have the whole key space
996 // we can output the access count of all keys in a cf
997 // we can make some statistics of the whole key space
998 // also, we output the top k accessed keys here
999 Status
TraceAnalyzer::ReProcessing() {
1002 for (auto& cf_it
: cfs_
) {
1003 uint32_t cf_id
= cf_it
.first
;
1005 // output the time series;
1006 if (FLAGS_output_time_series
) {
1007 for (int type
= 0; type
< kTaTypeNum
; type
++) {
1008 if (!ta_
[type
].enabled
||
1009 ta_
[type
].stats
.find(cf_id
) == ta_
[type
].stats
.end()) {
1012 TraceStats
& stat
= ta_
[type
].stats
[cf_id
];
1013 if (!stat
.time_series_f
) {
1014 fprintf(stderr
, "Cannot write time_series of '%s' in '%u'\n",
1015 ta_
[type
].type_name
.c_str(), cf_id
);
1018 while (!stat
.time_series
.empty()) {
1019 uint64_t key_id
= 0;
1020 auto found
= stat
.a_key_stats
.find(stat
.time_series
.front().key
);
1021 if (found
!= stat
.a_key_stats
.end()) {
1022 key_id
= found
->second
.key_id
;
1025 snprintf(buffer_
, sizeof(buffer_
), "%u %" PRIu64
" %" PRIu64
"\n",
1026 stat
.time_series
.front().type
,
1027 stat
.time_series
.front().ts
, key_id
);
1029 return Status::IOError("Format the output failed");
1031 std::string
printout(buffer_
);
1032 s
= stat
.time_series_f
->Append(printout
);
1034 fprintf(stderr
, "Write time series file failed\n");
1037 stat
.time_series
.pop_front();
1042 // process the whole key space if needed
1043 if (!FLAGS_key_space_dir
.empty()) {
1044 std::string whole_key_path
=
1045 FLAGS_key_space_dir
+ "/" + std::to_string(cf_id
) + ".txt";
1046 std::string input_key
, get_key
;
1047 std::vector
<std::string
> prefix(kTaTypeNum
);
1048 std::istringstream iss
;
1049 bool has_data
= true;
1050 std::unique_ptr
<SequentialFile
> wkey_input_f
;
1052 s
= env_
->NewSequentialFile(whole_key_path
, &wkey_input_f
, env_options_
);
1054 fprintf(stderr
, "Cannot open the whole key space file of CF: %u\n",
1056 wkey_input_f
.reset();
1060 std::unique_ptr
<FSSequentialFile
> file
;
1061 file
= NewLegacySequentialFileWrapper(wkey_input_f
);
1062 size_t kTraceFileReadaheadSize
= 2 * 1024 * 1024;
1063 SequentialFileReader
sf_reader(
1064 std::move(file
), whole_key_path
,
1065 kTraceFileReadaheadSize
/* filereadahead_size */);
1066 for (cfs_
[cf_id
].w_count
= 0;
1067 ReadOneLine(&iss
, &sf_reader
, &get_key
, &has_data
, &s
);
1068 ++cfs_
[cf_id
].w_count
) {
1070 fprintf(stderr
, "Read whole key space file failed\n");
1074 input_key
= ROCKSDB_NAMESPACE::LDBCommand::HexToString(get_key
);
1075 for (int type
= 0; type
< kTaTypeNum
; type
++) {
1076 if (!ta_
[type
].enabled
) {
1079 TraceStats
& stat
= ta_
[type
].stats
[cf_id
];
1081 if (stat
.a_key_stats
.find(input_key
) != stat
.a_key_stats
.end()) {
1082 ret
= snprintf(buffer_
, sizeof(buffer_
),
1083 "%" PRIu64
" %" PRIu64
"\n", cfs_
[cf_id
].w_count
,
1084 stat
.a_key_stats
[input_key
].access_count
);
1086 return Status::IOError("Format the output failed");
1088 std::string
printout(buffer_
);
1089 s
= stat
.w_key_f
->Append(printout
);
1091 fprintf(stderr
, "Write whole key space access file failed\n");
1097 // Output the prefix cut file of the whole key space
1098 if (FLAGS_output_prefix_cut
> 0 && stat
.w_prefix_cut_f
) {
1099 if (input_key
.compare(0, FLAGS_output_prefix_cut
, prefix
[type
]) !=
1101 prefix
[type
] = input_key
.substr(0, FLAGS_output_prefix_cut
);
1102 std::string prefix_out
=
1103 ROCKSDB_NAMESPACE::LDBCommand::StringToHex(prefix
[type
]);
1104 ret
= snprintf(buffer_
, sizeof(buffer_
), "%" PRIu64
" %s\n",
1105 cfs_
[cf_id
].w_count
, prefix_out
.c_str());
1107 return Status::IOError("Format the output failed");
1109 std::string
printout(buffer_
);
1110 s
= stat
.w_prefix_cut_f
->Append(printout
);
1113 "Write whole key space prefix cut file failed\n");
1120 // Make the statistics fo the key size distribution
1121 if (FLAGS_output_key_distribution
) {
1122 if (cfs_
[cf_id
].w_key_size_stats
.find(input_key
.size()) ==
1123 cfs_
[cf_id
].w_key_size_stats
.end()) {
1124 cfs_
[cf_id
].w_key_size_stats
[input_key
.size()] = 1;
1126 cfs_
[cf_id
].w_key_size_stats
[input_key
.size()]++;
1133 // process the top k accessed keys
1134 if (FLAGS_print_top_k_access
> 0) {
1135 for (int type
= 0; type
< kTaTypeNum
; type
++) {
1136 if (!ta_
[type
].enabled
||
1137 ta_
[type
].stats
.find(cf_id
) == ta_
[type
].stats
.end()) {
1140 TraceStats
& stat
= ta_
[type
].stats
[cf_id
];
1141 for (auto& record
: stat
.a_key_stats
) {
1142 if (static_cast<int32_t>(stat
.top_k_queue
.size()) <
1143 FLAGS_print_top_k_access
) {
1144 stat
.top_k_queue
.push(
1145 std::make_pair(record
.second
.access_count
, record
.first
));
1147 if (record
.second
.access_count
> stat
.top_k_queue
.top().first
) {
1148 stat
.top_k_queue
.pop();
1149 stat
.top_k_queue
.push(
1150 std::make_pair(record
.second
.access_count
, record
.first
));
1157 return Status::OK();
1160 // End the processing, print the requested results
1161 Status
TraceAnalyzer::EndProcessing() {
1163 if (trace_sequence_f_
) {
1164 s
= trace_sequence_f_
->Close();
1166 if (FLAGS_no_print
) {
1171 s
= CloseOutputFiles();
1176 // Insert the corresponding key statistics to the correct type
1177 // and correct CF, output the time-series file if needed
1178 Status
TraceAnalyzer::KeyStatsInsertion(const uint32_t& type
,
1179 const uint32_t& cf_id
,
1180 const std::string
& key
,
1181 const size_t value_size
,
1182 const uint64_t ts
) {
1187 unit
.value_size
= value_size
;
1188 unit
.access_count
= 1;
1189 unit
.latest_ts
= ts
;
1190 if (type
!= TraceOperationType::kGet
|| value_size
> 0) {
1191 unit
.succ_count
= 1;
1193 unit
.succ_count
= 0;
1195 unit
.v_correlation
.resize(analyzer_opts_
.correlation_list
.size());
1197 i
< (static_cast<int>(analyzer_opts_
.correlation_list
.size())); i
++) {
1198 unit
.v_correlation
[i
].count
= 0;
1199 unit
.v_correlation
[i
].total_ts
= 0;
1202 if (FLAGS_output_prefix_cut
> 0) {
1203 prefix
= key
.substr(0, FLAGS_output_prefix_cut
);
1206 if (begin_time_
== 0) {
1209 uint32_t time_in_sec
;
1210 if (ts
< begin_time_
) {
1213 time_in_sec
= static_cast<uint32_t>((ts
- begin_time_
) / 1000000);
1216 uint64_t dist_value_size
= value_size
/ FLAGS_value_interval
;
1217 auto found_stats
= ta_
[type
].stats
.find(cf_id
);
1218 if (found_stats
== ta_
[type
].stats
.end()) {
1219 ta_
[type
].stats
[cf_id
].cf_id
= cf_id
;
1220 ta_
[type
].stats
[cf_id
].cf_name
= std::to_string(cf_id
);
1221 ta_
[type
].stats
[cf_id
].a_count
= 1;
1222 ta_
[type
].stats
[cf_id
].a_key_id
= 0;
1223 ta_
[type
].stats
[cf_id
].a_key_size_sqsum
= MultiplyCheckOverflow(
1224 static_cast<uint64_t>(key
.size()), static_cast<uint64_t>(key
.size()));
1225 ta_
[type
].stats
[cf_id
].a_key_size_sum
= key
.size();
1226 ta_
[type
].stats
[cf_id
].a_value_size_sqsum
= MultiplyCheckOverflow(
1227 static_cast<uint64_t>(value_size
), static_cast<uint64_t>(value_size
));
1228 ta_
[type
].stats
[cf_id
].a_value_size_sum
= value_size
;
1229 s
= OpenStatsOutputFiles(ta_
[type
].type_name
, ta_
[type
].stats
[cf_id
]);
1230 if (!FLAGS_print_correlation
.empty()) {
1231 s
= StatsUnitCorrelationUpdate(unit
, type
, ts
, key
);
1233 ta_
[type
].stats
[cf_id
].a_key_stats
[key
] = unit
;
1234 ta_
[type
].stats
[cf_id
].a_value_size_stats
[dist_value_size
] = 1;
1235 ta_
[type
].stats
[cf_id
].a_qps_stats
[time_in_sec
] = 1;
1236 ta_
[type
].stats
[cf_id
].correlation_output
.resize(
1237 analyzer_opts_
.correlation_list
.size());
1238 if (FLAGS_output_prefix_cut
> 0) {
1239 std::map
<std::string
, uint32_t> tmp_qps_map
;
1240 tmp_qps_map
[prefix
] = 1;
1241 ta_
[type
].stats
[cf_id
].a_qps_prefix_stats
[time_in_sec
] = tmp_qps_map
;
1243 if (time_in_sec
!= cur_time_sec_
) {
1244 ta_
[type
].stats
[cf_id
].uni_key_num
[cur_time_sec_
] =
1245 static_cast<uint64_t>(ta_
[type
].stats
[cf_id
].a_key_stats
.size());
1246 cur_time_sec_
= time_in_sec
;
1249 found_stats
->second
.a_count
++;
1250 found_stats
->second
.a_key_size_sqsum
+= MultiplyCheckOverflow(
1251 static_cast<uint64_t>(key
.size()), static_cast<uint64_t>(key
.size()));
1252 found_stats
->second
.a_key_size_sum
+= key
.size();
1253 found_stats
->second
.a_value_size_sqsum
+= MultiplyCheckOverflow(
1254 static_cast<uint64_t>(value_size
), static_cast<uint64_t>(value_size
));
1255 found_stats
->second
.a_value_size_sum
+= value_size
;
1256 auto found_key
= found_stats
->second
.a_key_stats
.find(key
);
1257 if (found_key
== found_stats
->second
.a_key_stats
.end()) {
1258 found_stats
->second
.a_key_stats
[key
] = unit
;
1260 found_key
->second
.access_count
++;
1261 if (type
!= TraceOperationType::kGet
|| value_size
> 0) {
1262 found_key
->second
.succ_count
++;
1264 if (!FLAGS_print_correlation
.empty()) {
1265 s
= StatsUnitCorrelationUpdate(found_key
->second
, type
, ts
, key
);
1268 if (time_in_sec
!= cur_time_sec_
) {
1269 found_stats
->second
.uni_key_num
[cur_time_sec_
] =
1270 static_cast<uint64_t>(found_stats
->second
.a_key_stats
.size());
1271 cur_time_sec_
= time_in_sec
;
1275 found_stats
->second
.a_value_size_stats
.find(dist_value_size
);
1276 if (found_value
== found_stats
->second
.a_value_size_stats
.end()) {
1277 found_stats
->second
.a_value_size_stats
[dist_value_size
] = 1;
1279 found_value
->second
++;
1282 auto found_qps
= found_stats
->second
.a_qps_stats
.find(time_in_sec
);
1283 if (found_qps
== found_stats
->second
.a_qps_stats
.end()) {
1284 found_stats
->second
.a_qps_stats
[time_in_sec
] = 1;
1286 found_qps
->second
++;
1289 if (FLAGS_output_prefix_cut
> 0) {
1290 auto found_qps_prefix
=
1291 found_stats
->second
.a_qps_prefix_stats
.find(time_in_sec
);
1292 if (found_qps_prefix
== found_stats
->second
.a_qps_prefix_stats
.end()) {
1293 std::map
<std::string
, uint32_t> tmp_qps_map
;
1294 found_stats
->second
.a_qps_prefix_stats
[time_in_sec
] = tmp_qps_map
;
1296 if (found_stats
->second
.a_qps_prefix_stats
[time_in_sec
].find(prefix
) ==
1297 found_stats
->second
.a_qps_prefix_stats
[time_in_sec
].end()) {
1298 found_stats
->second
.a_qps_prefix_stats
[time_in_sec
][prefix
] = 1;
1300 found_stats
->second
.a_qps_prefix_stats
[time_in_sec
][prefix
]++;
1305 if (cfs_
.find(cf_id
) == cfs_
.end()) {
1307 cf_unit
.cf_id
= cf_id
;
1308 cf_unit
.w_count
= 0;
1309 cf_unit
.a_count
= 0;
1310 cfs_
[cf_id
] = cf_unit
;
1313 if (FLAGS_output_qps_stats
) {
1314 cfs_
[cf_id
].cf_qps
[time_in_sec
]++;
1317 if (FLAGS_output_time_series
) {
1319 trace_u
.type
= type
;
1321 trace_u
.value_size
= value_size
;
1322 trace_u
.ts
= (ts
- time_series_start_
) / 1000000;
1323 trace_u
.cf_id
= cf_id
;
1324 ta_
[type
].stats
[cf_id
].time_series
.push_back(trace_u
);
1330 // Update the correlation unit of each key if enabled
1331 Status
TraceAnalyzer::StatsUnitCorrelationUpdate(StatsUnit
& unit
,
1332 const uint32_t& type_second
,
1334 const std::string
& key
) {
1335 if (type_second
>= kTaTypeNum
) {
1336 fprintf(stderr
, "Unknown Type Id: %u\n", type_second
);
1337 return Status::NotFound();
1340 for (int type_first
= 0; type_first
< kTaTypeNum
; type_first
++) {
1341 if (type_first
>= static_cast<int>(ta_
.size()) ||
1342 type_first
>= static_cast<int>(analyzer_opts_
.correlation_map
.size())) {
1345 if (analyzer_opts_
.correlation_map
[type_first
][type_second
] < 0 ||
1346 ta_
[type_first
].stats
.find(unit
.cf_id
) == ta_
[type_first
].stats
.end() ||
1347 ta_
[type_first
].stats
[unit
.cf_id
].a_key_stats
.find(key
) ==
1348 ta_
[type_first
].stats
[unit
.cf_id
].a_key_stats
.end() ||
1349 ta_
[type_first
].stats
[unit
.cf_id
].a_key_stats
[key
].latest_ts
== ts
) {
1353 int correlation_id
=
1354 analyzer_opts_
.correlation_map
[type_first
][type_second
];
1356 // after get the x-y operation time or x, update;
1357 if (correlation_id
< 0 ||
1358 correlation_id
>= static_cast<int>(unit
.v_correlation
.size())) {
1361 unit
.v_correlation
[correlation_id
].count
++;
1362 unit
.v_correlation
[correlation_id
].total_ts
+=
1363 (ts
- ta_
[type_first
].stats
[unit
.cf_id
].a_key_stats
[key
].latest_ts
);
1366 unit
.latest_ts
= ts
;
1367 return Status::OK();
1370 // when a new trace statistic is created, the file handler
1371 // pointers should be initiated if needed according to
1372 // the trace analyzer options
1373 Status
TraceAnalyzer::OpenStatsOutputFiles(const std::string
& type
,
1374 TraceStats
& new_stats
) {
1376 if (FLAGS_output_key_stats
) {
1377 s
= CreateOutputFile(type
, new_stats
.cf_name
, "accessed_key_stats.txt",
1378 &new_stats
.a_key_f
);
1379 s
= CreateOutputFile(type
, new_stats
.cf_name
,
1380 "accessed_unique_key_num_change.txt",
1381 &new_stats
.a_key_num_f
);
1382 if (!FLAGS_key_space_dir
.empty()) {
1383 s
= CreateOutputFile(type
, new_stats
.cf_name
, "whole_key_stats.txt",
1384 &new_stats
.w_key_f
);
1388 if (FLAGS_output_access_count_stats
) {
1389 s
= CreateOutputFile(type
, new_stats
.cf_name
,
1390 "accessed_key_count_distribution.txt",
1391 &new_stats
.a_count_dist_f
);
1394 if (FLAGS_output_prefix_cut
> 0) {
1395 s
= CreateOutputFile(type
, new_stats
.cf_name
, "accessed_key_prefix_cut.txt",
1396 &new_stats
.a_prefix_cut_f
);
1397 if (!FLAGS_key_space_dir
.empty()) {
1398 s
= CreateOutputFile(type
, new_stats
.cf_name
, "whole_key_prefix_cut.txt",
1399 &new_stats
.w_prefix_cut_f
);
1402 if (FLAGS_output_qps_stats
) {
1403 s
= CreateOutputFile(type
, new_stats
.cf_name
,
1404 "accessed_top_k_qps_prefix_cut.txt",
1405 &new_stats
.a_top_qps_prefix_f
);
1409 if (FLAGS_output_time_series
) {
1410 s
= CreateOutputFile(type
, new_stats
.cf_name
, "time_series.txt",
1411 &new_stats
.time_series_f
);
1414 if (FLAGS_output_value_distribution
) {
1415 s
= CreateOutputFile(type
, new_stats
.cf_name
,
1416 "accessed_value_size_distribution.txt",
1417 &new_stats
.a_value_size_f
);
1420 if (FLAGS_output_key_distribution
) {
1421 s
= CreateOutputFile(type
, new_stats
.cf_name
,
1422 "accessed_key_size_distribution.txt",
1423 &new_stats
.a_key_size_f
);
1426 if (FLAGS_output_qps_stats
) {
1427 s
= CreateOutputFile(type
, new_stats
.cf_name
, "qps_stats.txt",
1428 &new_stats
.a_qps_f
);
1434 // create the output path of the files to be opened
1435 Status
TraceAnalyzer::CreateOutputFile(
1436 const std::string
& type
, const std::string
& cf_name
,
1437 const std::string
& ending
,
1438 std::unique_ptr
<ROCKSDB_NAMESPACE::WritableFile
>* f_ptr
) {
1440 path
= output_path_
+ "/" + FLAGS_output_prefix
+ "-" + type
+ "-" + cf_name
+
1443 s
= env_
->NewWritableFile(path
, f_ptr
, env_options_
);
1445 fprintf(stderr
, "Cannot open file: %s\n", path
.c_str());
1448 return Status::OK();
1451 // Close the output files in the TraceStats if they are opened
1452 Status
TraceAnalyzer::CloseOutputFiles() {
1454 for (int type
= 0; type
< kTaTypeNum
; type
++) {
1455 if (!ta_
[type
].enabled
) {
1458 for (auto& stat
: ta_
[type
].stats
) {
1459 if (s
.ok() && stat
.second
.time_series_f
) {
1460 s
= stat
.second
.time_series_f
->Close();
1463 if (s
.ok() && stat
.second
.a_key_f
) {
1464 s
= stat
.second
.a_key_f
->Close();
1467 if (s
.ok() && stat
.second
.a_key_num_f
) {
1468 s
= stat
.second
.a_key_num_f
->Close();
1471 if (s
.ok() && stat
.second
.a_count_dist_f
) {
1472 s
= stat
.second
.a_count_dist_f
->Close();
1475 if (s
.ok() && stat
.second
.a_prefix_cut_f
) {
1476 s
= stat
.second
.a_prefix_cut_f
->Close();
1479 if (s
.ok() && stat
.second
.a_value_size_f
) {
1480 s
= stat
.second
.a_value_size_f
->Close();
1483 if (s
.ok() && stat
.second
.a_key_size_f
) {
1484 s
= stat
.second
.a_key_size_f
->Close();
1487 if (s
.ok() && stat
.second
.a_qps_f
) {
1488 s
= stat
.second
.a_qps_f
->Close();
1491 if (s
.ok() && stat
.second
.a_top_qps_prefix_f
) {
1492 s
= stat
.second
.a_top_qps_prefix_f
->Close();
1495 if (s
.ok() && stat
.second
.w_key_f
) {
1496 s
= stat
.second
.w_key_f
->Close();
1498 if (s
.ok() && stat
.second
.w_prefix_cut_f
) {
1499 s
= stat
.second
.w_prefix_cut_f
->Close();
1506 // Handle the Get request in the trace
1507 Status
TraceAnalyzer::HandleGet(uint32_t column_family_id
,
1508 const std::string
& key
, const uint64_t& ts
,
1509 const uint32_t& get_ret
) {
1511 size_t value_size
= 0;
1512 if (FLAGS_convert_to_human_readable_trace
&& trace_sequence_f_
) {
1513 s
= WriteTraceSequence(TraceOperationType::kGet
, column_family_id
, key
,
1516 return Status::Corruption("Failed to write the trace sequence to file");
1520 if (ta_
[TraceOperationType::kGet
].sample_count
>= sample_max_
) {
1521 ta_
[TraceOperationType::kGet
].sample_count
= 0;
1523 if (ta_
[TraceOperationType::kGet
].sample_count
> 0) {
1524 ta_
[TraceOperationType::kGet
].sample_count
++;
1525 return Status::OK();
1527 ta_
[TraceOperationType::kGet
].sample_count
++;
1529 if (!ta_
[TraceOperationType::kGet
].enabled
) {
1530 return Status::OK();
1535 s
= KeyStatsInsertion(TraceOperationType::kGet
, column_family_id
, key
,
1538 return Status::Corruption("Failed to insert key statistics");
1543 // Handle the Put request in the write batch of the trace
1544 Status
TraceAnalyzer::HandlePut(uint32_t column_family_id
, const Slice
& key
,
1545 const Slice
& value
) {
1547 size_t value_size
= value
.ToString().size();
1548 if (FLAGS_convert_to_human_readable_trace
&& trace_sequence_f_
) {
1549 s
= WriteTraceSequence(TraceOperationType::kPut
, column_family_id
,
1550 key
.ToString(), value_size
, c_time_
);
1552 return Status::Corruption("Failed to write the trace sequence to file");
1556 if (ta_
[TraceOperationType::kPut
].sample_count
>= sample_max_
) {
1557 ta_
[TraceOperationType::kPut
].sample_count
= 0;
1559 if (ta_
[TraceOperationType::kPut
].sample_count
> 0) {
1560 ta_
[TraceOperationType::kPut
].sample_count
++;
1561 return Status::OK();
1563 ta_
[TraceOperationType::kPut
].sample_count
++;
1565 if (!ta_
[TraceOperationType::kPut
].enabled
) {
1566 return Status::OK();
1568 s
= KeyStatsInsertion(TraceOperationType::kPut
, column_family_id
,
1569 key
.ToString(), value_size
, c_time_
);
1571 return Status::Corruption("Failed to insert key statistics");
1576 // Handle the Delete request in the write batch of the trace
1577 Status
TraceAnalyzer::HandleDelete(uint32_t column_family_id
,
1580 size_t value_size
= 0;
1581 if (FLAGS_convert_to_human_readable_trace
&& trace_sequence_f_
) {
1582 s
= WriteTraceSequence(TraceOperationType::kDelete
, column_family_id
,
1583 key
.ToString(), value_size
, c_time_
);
1585 return Status::Corruption("Failed to write the trace sequence to file");
1589 if (ta_
[TraceOperationType::kDelete
].sample_count
>= sample_max_
) {
1590 ta_
[TraceOperationType::kDelete
].sample_count
= 0;
1592 if (ta_
[TraceOperationType::kDelete
].sample_count
> 0) {
1593 ta_
[TraceOperationType::kDelete
].sample_count
++;
1594 return Status::OK();
1596 ta_
[TraceOperationType::kDelete
].sample_count
++;
1598 if (!ta_
[TraceOperationType::kDelete
].enabled
) {
1599 return Status::OK();
1601 s
= KeyStatsInsertion(TraceOperationType::kDelete
, column_family_id
,
1602 key
.ToString(), value_size
, c_time_
);
1604 return Status::Corruption("Failed to insert key statistics");
1609 // Handle the SingleDelete request in the write batch of the trace
1610 Status
TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id
,
1613 size_t value_size
= 0;
1614 if (FLAGS_convert_to_human_readable_trace
&& trace_sequence_f_
) {
1615 s
= WriteTraceSequence(TraceOperationType::kSingleDelete
, column_family_id
,
1616 key
.ToString(), value_size
, c_time_
);
1618 return Status::Corruption("Failed to write the trace sequence to file");
1622 if (ta_
[TraceOperationType::kSingleDelete
].sample_count
>= sample_max_
) {
1623 ta_
[TraceOperationType::kSingleDelete
].sample_count
= 0;
1625 if (ta_
[TraceOperationType::kSingleDelete
].sample_count
> 0) {
1626 ta_
[TraceOperationType::kSingleDelete
].sample_count
++;
1627 return Status::OK();
1629 ta_
[TraceOperationType::kSingleDelete
].sample_count
++;
1631 if (!ta_
[TraceOperationType::kSingleDelete
].enabled
) {
1632 return Status::OK();
1634 s
= KeyStatsInsertion(TraceOperationType::kSingleDelete
, column_family_id
,
1635 key
.ToString(), value_size
, c_time_
);
1637 return Status::Corruption("Failed to insert key statistics");
1642 // Handle the DeleteRange request in the write batch of the trace
1643 Status
TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id
,
1644 const Slice
& begin_key
,
1645 const Slice
& end_key
) {
1647 size_t value_size
= 0;
1648 if (FLAGS_convert_to_human_readable_trace
&& trace_sequence_f_
) {
1649 s
= WriteTraceSequence(TraceOperationType::kRangeDelete
, column_family_id
,
1650 begin_key
.ToString(), value_size
, c_time_
);
1652 return Status::Corruption("Failed to write the trace sequence to file");
1656 if (ta_
[TraceOperationType::kRangeDelete
].sample_count
>= sample_max_
) {
1657 ta_
[TraceOperationType::kRangeDelete
].sample_count
= 0;
1659 if (ta_
[TraceOperationType::kRangeDelete
].sample_count
> 0) {
1660 ta_
[TraceOperationType::kRangeDelete
].sample_count
++;
1661 return Status::OK();
1663 ta_
[TraceOperationType::kRangeDelete
].sample_count
++;
1665 if (!ta_
[TraceOperationType::kRangeDelete
].enabled
) {
1666 return Status::OK();
1668 s
= KeyStatsInsertion(TraceOperationType::kRangeDelete
, column_family_id
,
1669 begin_key
.ToString(), value_size
, c_time_
);
1670 s
= KeyStatsInsertion(TraceOperationType::kRangeDelete
, column_family_id
,
1671 end_key
.ToString(), value_size
, c_time_
);
1673 return Status::Corruption("Failed to insert key statistics");
1678 // Handle the Merge request in the write batch of the trace
1679 Status
TraceAnalyzer::HandleMerge(uint32_t column_family_id
, const Slice
& key
,
1680 const Slice
& value
) {
1682 size_t value_size
= value
.ToString().size();
1683 if (FLAGS_convert_to_human_readable_trace
&& trace_sequence_f_
) {
1684 s
= WriteTraceSequence(TraceOperationType::kMerge
, column_family_id
,
1685 key
.ToString(), value_size
, c_time_
);
1687 return Status::Corruption("Failed to write the trace sequence to file");
1691 if (ta_
[TraceOperationType::kMerge
].sample_count
>= sample_max_
) {
1692 ta_
[TraceOperationType::kMerge
].sample_count
= 0;
1694 if (ta_
[TraceOperationType::kMerge
].sample_count
> 0) {
1695 ta_
[TraceOperationType::kMerge
].sample_count
++;
1696 return Status::OK();
1698 ta_
[TraceOperationType::kMerge
].sample_count
++;
1700 if (!ta_
[TraceOperationType::kMerge
].enabled
) {
1701 return Status::OK();
1703 s
= KeyStatsInsertion(TraceOperationType::kMerge
, column_family_id
,
1704 key
.ToString(), value_size
, c_time_
);
1706 return Status::Corruption("Failed to insert key statistics");
1711 // Handle the Iterator request in the trace
1712 Status
TraceAnalyzer::HandleIter(uint32_t column_family_id
,
1713 const std::string
& key
, const uint64_t& ts
,
1714 TraceType
& trace_type
) {
1716 size_t value_size
= 0;
1718 if (trace_type
== kTraceIteratorSeek
) {
1719 type
= TraceOperationType::kIteratorSeek
;
1720 } else if (trace_type
== kTraceIteratorSeekForPrev
) {
1721 type
= TraceOperationType::kIteratorSeekForPrev
;
1729 if (FLAGS_convert_to_human_readable_trace
&& trace_sequence_f_
) {
1730 s
= WriteTraceSequence(type
, column_family_id
, key
, value_size
, ts
);
1732 return Status::Corruption("Failed to write the trace sequence to file");
1736 if (ta_
[type
].sample_count
>= sample_max_
) {
1737 ta_
[type
].sample_count
= 0;
1739 if (ta_
[type
].sample_count
> 0) {
1740 ta_
[type
].sample_count
++;
1741 return Status::OK();
1743 ta_
[type
].sample_count
++;
1745 if (!ta_
[type
].enabled
) {
1746 return Status::OK();
1748 s
= KeyStatsInsertion(type
, column_family_id
, key
, value_size
, ts
);
1750 return Status::Corruption("Failed to insert key statistics");
1755 // Before the analyzer is closed, the requested general statistic results are
1756 // printed out here. In current stage, these information are not output to
1761 void TraceAnalyzer::PrintStatistics() {
1762 for (int type
= 0; type
< kTaTypeNum
; type
++) {
1763 if (!ta_
[type
].enabled
) {
1766 ta_
[type
].total_keys
= 0;
1767 ta_
[type
].total_access
= 0;
1768 ta_
[type
].total_succ_access
= 0;
1769 printf("\n################# Operation Type: %s #####################\n",
1770 ta_
[type
].type_name
.c_str());
1771 if (qps_ave_
.size() == kTaTypeNum
+ 1) {
1772 printf("Peak QPS is: %u Average QPS is: %f\n", qps_peak_
[type
],
1775 for (auto& stat_it
: ta_
[type
].stats
) {
1776 if (stat_it
.second
.a_count
== 0) {
1779 TraceStats
& stat
= stat_it
.second
;
1780 uint64_t total_a_keys
= static_cast<uint64_t>(stat
.a_key_stats
.size());
1781 double key_size_ave
= 0.0;
1782 double value_size_ave
= 0.0;
1783 double key_size_vari
= 0.0;
1784 double value_size_vari
= 0.0;
1785 if (stat
.a_count
> 0) {
1787 (static_cast<double>(stat
.a_key_size_sum
)) / stat
.a_count
;
1789 (static_cast<double>(stat
.a_value_size_sum
)) / stat
.a_count
;
1790 key_size_vari
= std::sqrt((static_cast<double>(stat
.a_key_size_sqsum
)) /
1792 key_size_ave
* key_size_ave
);
1793 value_size_vari
= std::sqrt(
1794 (static_cast<double>(stat
.a_value_size_sqsum
)) / stat
.a_count
-
1795 value_size_ave
* value_size_ave
);
1797 if (value_size_ave
== 0.0) {
1798 stat
.a_value_mid
= 0;
1800 cfs_
[stat
.cf_id
].a_count
+= total_a_keys
;
1801 ta_
[type
].total_keys
+= total_a_keys
;
1802 ta_
[type
].total_access
+= stat
.a_count
;
1803 ta_
[type
].total_succ_access
+= stat
.a_succ_count
;
1804 printf("*********************************************************\n");
1805 printf("colume family id: %u\n", stat
.cf_id
);
1806 printf("Total number of queries to this cf by %s: %" PRIu64
"\n",
1807 ta_
[type
].type_name
.c_str(), stat
.a_count
);
1808 printf("Total unique keys in this cf: %" PRIu64
"\n", total_a_keys
);
1809 printf("Average key size: %f key size medium: %" PRIu64
1810 " Key size Variation: %f\n",
1811 key_size_ave
, stat
.a_key_mid
, key_size_vari
);
1812 if (type
== kPut
|| type
== kMerge
) {
1813 printf("Average value size: %f Value size medium: %" PRIu64
1814 " Value size variation: %f\n",
1815 value_size_ave
, stat
.a_value_mid
, value_size_vari
);
1817 printf("Peak QPS is: %u Average QPS is: %f\n", stat
.a_peak_qps
,
1820 // print the top k accessed key and its access count
1821 if (FLAGS_print_top_k_access
> 0) {
1822 printf("The Top %d keys that are accessed:\n",
1823 FLAGS_print_top_k_access
);
1824 while (!stat
.top_k_queue
.empty()) {
1825 std::string hex_key
= ROCKSDB_NAMESPACE::LDBCommand::StringToHex(
1826 stat
.top_k_queue
.top().second
);
1827 printf("Access_count: %" PRIu64
" %s\n", stat
.top_k_queue
.top().first
,
1829 stat
.top_k_queue
.pop();
1833 // print the top k access prefix range and
1834 // top k prefix range with highest average access per key
1835 if (FLAGS_output_prefix_cut
> 0) {
1836 printf("The Top %d accessed prefix range:\n", FLAGS_print_top_k_access
);
1837 while (!stat
.top_k_prefix_access
.empty()) {
1838 printf("Prefix: %s Access count: %" PRIu64
"\n",
1839 stat
.top_k_prefix_access
.top().second
.c_str(),
1840 stat
.top_k_prefix_access
.top().first
);
1841 stat
.top_k_prefix_access
.pop();
1844 printf("The Top %d prefix with highest access per key:\n",
1845 FLAGS_print_top_k_access
);
1846 while (!stat
.top_k_prefix_ave
.empty()) {
1847 printf("Prefix: %s access per key: %f\n",
1848 stat
.top_k_prefix_ave
.top().second
.c_str(),
1849 stat
.top_k_prefix_ave
.top().first
);
1850 stat
.top_k_prefix_ave
.pop();
1854 // print the operation correlations
1855 if (!FLAGS_print_correlation
.empty()) {
1856 for (int correlation
= 0;
1858 static_cast<int>(analyzer_opts_
.correlation_list
.size());
1861 "The correlation statistics of '%s' after '%s' is:",
1862 taIndexToOpt
[analyzer_opts_
.correlation_list
[correlation
].second
]
1864 taIndexToOpt
[analyzer_opts_
.correlation_list
[correlation
].first
]
1866 double correlation_ave
= 0.0;
1867 if (stat
.correlation_output
[correlation
].first
> 0) {
1869 (static_cast<double>(
1870 stat
.correlation_output
[correlation
].second
)) /
1871 (stat
.correlation_output
[correlation
].first
* 1000);
1873 printf(" total numbers: %" PRIu64
" average time: %f(ms)\n",
1874 stat
.correlation_output
[correlation
].first
, correlation_ave
);
1878 printf("*********************************************************\n");
1879 printf("Total keys of '%s' is: %" PRIu64
"\n", ta_
[type
].type_name
.c_str(),
1880 ta_
[type
].total_keys
);
1881 printf("Total access is: %" PRIu64
"\n", ta_
[type
].total_access
);
1882 total_access_keys_
+= ta_
[type
].total_keys
;
1885 // Print the overall statistic information of the trace
1886 printf("\n*********************************************************\n");
1887 printf("*********************************************************\n");
1888 printf("The column family based statistics\n");
1889 for (auto& cf
: cfs_
) {
1890 printf("The column family id: %u\n", cf
.first
);
1891 printf("The whole key space key numbers: %" PRIu64
"\n", cf
.second
.w_count
);
1892 printf("The accessed key space key numbers: %" PRIu64
"\n",
1896 if (FLAGS_print_overall_stats
) {
1897 printf("\n*********************************************************\n");
1898 printf("*********************************************************\n");
1899 if (qps_peak_
.size() == kTaTypeNum
+ 1) {
1900 printf("Average QPS per second: %f Peak QPS: %u\n", qps_ave_
[kTaTypeNum
],
1901 qps_peak_
[kTaTypeNum
]);
1903 printf("The statistics related to query number need to times: %u\n",
1905 printf("Total_requests: %" PRIu64
" Total_accessed_keys: %" PRIu64
1906 " Total_gets: %" PRIu64
" Total_write_batch: %" PRIu64
"\n",
1907 total_requests_
, total_access_keys_
, total_gets_
, total_writes_
);
1908 for (int type
= 0; type
< kTaTypeNum
; type
++) {
1909 if (!ta_
[type
].enabled
) {
1912 printf("Operation: '%s' has: %" PRIu64
"\n", ta_
[type
].type_name
.c_str(),
1913 ta_
[type
].total_access
);
1918 // Write the trace sequence to file
1919 Status
TraceAnalyzer::WriteTraceSequence(const uint32_t& type
,
1920 const uint32_t& cf_id
,
1921 const std::string
& key
,
1922 const size_t value_size
,
1923 const uint64_t ts
) {
1924 std::string hex_key
= ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key
);
1926 ret
= snprintf(buffer_
, sizeof(buffer_
), "%u %u %zu %" PRIu64
"\n", type
,
1927 cf_id
, value_size
, ts
);
1929 return Status::IOError("failed to format the output");
1931 std::string
printout(buffer_
);
1932 if (!FLAGS_no_key
) {
1933 printout
= hex_key
+ " " + printout
;
1935 return trace_sequence_f_
->Append(printout
);
1938 // The entrance function of Trace_Analyzer
1939 int trace_analyzer_tool(int argc
, char** argv
) {
1940 std::string trace_path
;
1941 std::string output_path
;
1943 AnalyzerOptions analyzer_opts
;
1945 ParseCommandLineFlags(&argc
, &argv
, true);
1947 if (!FLAGS_print_correlation
.empty()) {
1948 analyzer_opts
.SparseCorrelationInput(FLAGS_print_correlation
);
1951 std::unique_ptr
<TraceAnalyzer
> analyzer(
1952 new TraceAnalyzer(FLAGS_trace_path
, FLAGS_output_dir
, analyzer_opts
));
1955 fprintf(stderr
, "Cannot initiate the trace analyzer\n");
1959 ROCKSDB_NAMESPACE::Status s
= analyzer
->PrepareProcessing();
1961 fprintf(stderr
, "%s\n", s
.getState());
1962 fprintf(stderr
, "Cannot initiate the trace reader\n");
1966 s
= analyzer
->StartProcessing();
1967 if (!s
.ok() && !FLAGS_try_process_corrupted_trace
) {
1968 fprintf(stderr
, "%s\n", s
.getState());
1969 fprintf(stderr
, "Cannot processing the trace\n");
1973 s
= analyzer
->MakeStatistics();
1975 fprintf(stderr
, "%s\n", s
.getState());
1976 analyzer
->EndProcessing();
1977 fprintf(stderr
, "Cannot make the statistics\n");
1981 s
= analyzer
->ReProcessing();
1983 fprintf(stderr
, "%s\n", s
.getState());
1984 fprintf(stderr
, "Cannot re-process the trace for more statistics\n");
1985 analyzer
->EndProcessing();
1989 s
= analyzer
->EndProcessing();
1991 fprintf(stderr
, "%s\n", s
.getState());
1992 fprintf(stderr
, "Cannot close the trace analyzer\n");
1998 } // namespace ROCKSDB_NAMESPACE
2000 #endif // Endif of Gflag
2001 #endif // RocksDB LITE