]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/tools/trace_analyzer_tool.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / tools / trace_analyzer_tool.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6
7#ifndef ROCKSDB_LITE
8
9#ifndef __STDC_FORMAT_MACROS
10#define __STDC_FORMAT_MACROS
11#endif
12
13#ifdef GFLAGS
14#ifdef NUMA
15#include <numa.h>
16#include <numaif.h>
17#endif
18#ifndef OS_WIN
19#include <unistd.h>
20#endif
21
22#include <cinttypes>
23#include <cmath>
24#include <cstdio>
25#include <cstdlib>
26#include <memory>
27#include <sstream>
28#include <stdexcept>
29
30#include "db/db_impl.h"
31#include "db/memtable.h"
32#include "db/write_batch_internal.h"
33#include "options/cf_options.h"
34#include "rocksdb/db.h"
35#include "rocksdb/env.h"
36#include "rocksdb/iterator.h"
37#include "rocksdb/slice.h"
38#include "rocksdb/slice_transform.h"
39#include "rocksdb/status.h"
40#include "rocksdb/table_properties.h"
41#include "rocksdb/utilities/ldb_cmd.h"
42#include "rocksdb/write_batch.h"
43#include "table/meta_blocks.h"
44#include "table/plain_table_factory.h"
45#include "table/table_reader.h"
46#include "tools/trace_analyzer_tool.h"
47#include "util/coding.h"
48#include "util/compression.h"
49#include "util/file_reader_writer.h"
50#include "util/gflags_compat.h"
51#include "util/random.h"
52#include "util/string_util.h"
53#include "util/trace_replay.h"
54
55using GFLAGS_NAMESPACE::ParseCommandLineFlags;
56using GFLAGS_NAMESPACE::RegisterFlagValidator;
57using GFLAGS_NAMESPACE::SetUsageMessage;
58
59DEFINE_string(trace_path, "", "The trace file path.");
60DEFINE_string(output_dir, "", "The directory to store the output files.");
61DEFINE_string(output_prefix, "trace",
62 "The prefix used for all the output files.");
63DEFINE_bool(output_key_stats, false,
64 "Output the key access count statistics to file\n"
65 "for accessed keys:\n"
66 "file name: <prefix>-<query_type>-<cf_id>-accessed_key_stats.txt\n"
67 "Format:[cf_id value_size access_keyid access_count]\n"
68 "for the whole key space keys:\n"
69 "File name: <prefix>-<query_type>-<cf_id>-whole_key_stats.txt\n"
70 "Format:[whole_key_space_keyid access_count]");
71DEFINE_bool(output_access_count_stats, false,
72 "Output the access count distribution statistics to file.\n"
73 "File name: <prefix>-<query_type>-<cf_id>-accessed_"
74 "key_count_distribution.txt \n"
75 "Format:[access_count number_of_access_count]");
76DEFINE_bool(output_time_series, false,
77 "Output the access time in second of each key, "
78 "such that we can have the time series data of the queries \n"
79 "File name: <prefix>-<query_type>-<cf_id>-time_series.txt\n"
80 "Format:[type_id time_in_sec access_keyid].");
494da23a
TL
81DEFINE_bool(try_process_corrupted_trace, false,
82 "In default, trace_analyzer will exit if the trace file is "
83 "corrupted due to the unexpected tracing cases. If this option "
84 "is enabled, trace_analyzer will stop reading the trace file, "
85 "and start analyzing the read-in data.");
11fdf7f2
TL
86DEFINE_int32(output_prefix_cut, 0,
87 "The number of bytes as prefix to cut the keys.\n"
88 "If it is enabled, it will generate the following:\n"
89 "For accessed keys:\n"
90 "File name: <prefix>-<query_type>-<cf_id>-"
91 "accessed_key_prefix_cut.txt \n"
92 "Format:[acessed_keyid access_count_of_prefix "
93 "number_of_keys_in_prefix average_key_access "
94 "prefix_succ_ratio prefix]\n"
95 "For whole key space keys:\n"
96 "File name: <prefix>-<query_type>-<cf_id>"
97 "-whole_key_prefix_cut.txt\n"
98 "Format:[start_keyid_in_whole_keyspace prefix]\n"
99 "if 'output_qps_stats' and 'top_k' are enabled, it will output:\n"
100 "File name: <prefix>-<query_type>-<cf_id>"
101 "-accessed_top_k_qps_prefix_cut.txt\n"
102 "Format:[the_top_ith_qps_time QPS], [prefix qps_of_this_second].");
103DEFINE_bool(convert_to_human_readable_trace, false,
104 "Convert the binary trace file to a human readable txt file "
105 "for further processing. "
106 "This file will be extremely large "
107 "(similar size as the original binary trace file). "
108 "You can specify 'no_key' to reduce the size, if key is not "
109 "needed in the next step.\n"
110 "File name: <prefix>_human_readable_trace.txt\n"
111 "Format:[type_id cf_id value_size time_in_micorsec <key>].");
112DEFINE_bool(output_qps_stats, false,
113 "Output the query per second(qps) statistics \n"
114 "For the overall qps, it will contain all qps of each query type. "
115 "The time is started from the first trace record\n"
116 "File name: <prefix>_qps_stats.txt\n"
117 "Format: [qps_type_1 qps_type_2 ...... overall_qps]\n"
118 "For each cf and query, it will have its own qps output.\n"
119 "File name: <prefix>-<query_type>-<cf_id>_qps_stats.txt \n"
120 "Format:[query_count_in_this_second].");
121DEFINE_bool(no_print, false, "Do not print out any result");
122DEFINE_string(
123 print_correlation, "",
124 "intput format: [correlation pairs][.,.]\n"
125 "Output the query correlations between the pairs of query types "
126 "listed in the parameter, input should select the operations from:\n"
127 "get, put, delete, single_delete, rangle_delete, merge. No space "
128 "between the pairs separated by commar. Example: =[get,get]... "
129 "It will print out the number of pairs of 'A after B' and "
130 "the average time interval between the two query.");
131DEFINE_string(key_space_dir, "",
132 "<the directory stores full key space files> \n"
133 "The key space files should be: <column family id>.txt");
134DEFINE_bool(analyze_get, false, "Analyze the Get query.");
135DEFINE_bool(analyze_put, false, "Analyze the Put query.");
136DEFINE_bool(analyze_delete, false, "Analyze the Delete query.");
137DEFINE_bool(analyze_single_delete, false, "Analyze the SingleDelete query.");
138DEFINE_bool(analyze_range_delete, false, "Analyze the DeleteRange query.");
139DEFINE_bool(analyze_merge, false, "Analyze the Merge query.");
140DEFINE_bool(analyze_iterator, false,
141 " Analyze the iterate query like seek() and seekForPrev().");
142DEFINE_bool(no_key, false,
143 " Does not output the key to the result files to make smaller.");
144DEFINE_bool(print_overall_stats, true,
145 " Print the stats of the whole trace, "
146 "like total requests, keys, and etc.");
494da23a 147DEFINE_bool(output_key_distribution, false, "Print the key size distribution.");
11fdf7f2
TL
148DEFINE_bool(
149 output_value_distribution, false,
150 "Out put the value size distribution, only available for Put and Merge.\n"
151 "File name: <prefix>-<query_type>-<cf_id>"
152 "-accessed_value_size_distribution.txt\n"
153 "Format:[Number_of_value_size_between x and "
154 "x+value_interval is: <the count>]");
155DEFINE_int32(print_top_k_access, 1,
156 "<top K of the variables to be printed> "
157 "Print the top k accessed keys, top k accessed prefix "
158 "and etc.");
159DEFINE_int32(output_ignore_count, 0,
160 "<threshold>, ignores the access count <= this value, "
161 "it will shorter the output.");
162DEFINE_int32(value_interval, 8,
163 "To output the value distribution, we need to set the value "
164 "intervals and make the statistic of the value size distribution "
165 "in different intervals. The default is 8.");
494da23a
TL
166DEFINE_double(sample_ratio, 1.0,
167 "If the trace size is extremely huge or user want to sample "
168 "the trace when analyzing, sample ratio can be set (0, 1.0]");
11fdf7f2
TL
169
170namespace rocksdb {
171
172std::map<std::string, int> taOptToIndex = {
173 {"get", 0}, {"put", 1},
174 {"delete", 2}, {"single_delete", 3},
175 {"range_delete", 4}, {"merge", 5},
176 {"iterator_Seek", 6}, {"iterator_SeekForPrev", 7}};
177
178std::map<int, std::string> taIndexToOpt = {
179 {0, "get"}, {1, "put"},
180 {2, "delete"}, {3, "single_delete"},
181 {4, "range_delete"}, {5, "merge"},
182 {6, "iterator_Seek"}, {7, "iterator_SeekForPrev"}};
183
184namespace {
185
186uint64_t MultiplyCheckOverflow(uint64_t op1, uint64_t op2) {
187 if (op1 == 0 || op2 == 0) {
188 return 0;
189 }
190 if (port::kMaxUint64 / op1 < op2) {
191 return op1;
192 }
193 return (op1 * op2);
194}
195
196void DecodeCFAndKeyFromString(std::string& buffer, uint32_t* cf_id, Slice* key) {
197 Slice buf(buffer);
198 GetFixed32(&buf, cf_id);
199 GetLengthPrefixedSlice(&buf, key);
200}
201
202} // namespace
203
204// The default constructor of AnalyzerOptions
205AnalyzerOptions::AnalyzerOptions()
206 : correlation_map(kTaTypeNum, std::vector<int>(kTaTypeNum, -1)) {}
207
208AnalyzerOptions::~AnalyzerOptions() {}
209
210void AnalyzerOptions::SparseCorrelationInput(const std::string& in_str) {
211 std::string cur = in_str;
212 if (cur.size() == 0) {
213 return;
214 }
215 while (!cur.empty()) {
216 if (cur.compare(0, 1, "[") != 0) {
217 fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
218 exit(1);
219 }
220 std::string opt1, opt2;
221 std::size_t split = cur.find_first_of(",");
222 if (split != std::string::npos) {
223 opt1 = cur.substr(1, split - 1);
224 } else {
225 fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
226 exit(1);
227 }
228 std::size_t end = cur.find_first_of("]");
229 if (end != std::string::npos) {
230 opt2 = cur.substr(split + 1, end - split - 1);
231 } else {
232 fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
233 exit(1);
234 }
235 cur = cur.substr(end + 1);
236
237 if (taOptToIndex.find(opt1) != taOptToIndex.end() &&
238 taOptToIndex.find(opt2) != taOptToIndex.end()) {
239 correlation_list.push_back(
240 std::make_pair(taOptToIndex[opt1], taOptToIndex[opt2]));
241 } else {
242 fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
243 exit(1);
244 }
245 }
246
247 int sequence = 0;
248 for (auto& it : correlation_list) {
249 correlation_map[it.first][it.second] = sequence;
250 sequence++;
251 }
252 return;
253}
254
255// The trace statistic struct constructor
256TraceStats::TraceStats() {
257 cf_id = 0;
258 cf_name = "0";
259 a_count = 0;
260 a_key_id = 0;
261 a_key_size_sqsum = 0;
262 a_key_size_sum = 0;
263 a_key_mid = 0;
264 a_value_size_sqsum = 0;
265 a_value_size_sum = 0;
266 a_value_mid = 0;
267 a_peak_qps = 0;
268 a_ave_qps = 0.0;
269}
270
271TraceStats::~TraceStats() {}
272
273// The trace analyzer constructor
274TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path,
275 AnalyzerOptions _analyzer_opts)
276 : trace_name_(trace_path),
277 output_path_(output_path),
278 analyzer_opts_(_analyzer_opts) {
279 rocksdb::EnvOptions env_options;
280 env_ = rocksdb::Env::Default();
281 offset_ = 0;
282 c_time_ = 0;
283 total_requests_ = 0;
284 total_access_keys_ = 0;
285 total_gets_ = 0;
286 total_writes_ = 0;
494da23a 287 trace_create_time_ = 0;
11fdf7f2
TL
288 begin_time_ = 0;
289 end_time_ = 0;
290 time_series_start_ = 0;
494da23a
TL
291 cur_time_sec_ = 0;
292 if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) {
293 sample_max_ = 1;
294 } else {
295 sample_max_ = static_cast<uint32_t>(1.0 / FLAGS_sample_ratio);
296 }
297
11fdf7f2
TL
298 ta_.resize(kTaTypeNum);
299 ta_[0].type_name = "get";
300 if (FLAGS_analyze_get) {
301 ta_[0].enabled = true;
302 } else {
303 ta_[0].enabled = false;
304 }
305 ta_[1].type_name = "put";
306 if (FLAGS_analyze_put) {
307 ta_[1].enabled = true;
308 } else {
309 ta_[1].enabled = false;
310 }
311 ta_[2].type_name = "delete";
312 if (FLAGS_analyze_delete) {
313 ta_[2].enabled = true;
314 } else {
315 ta_[2].enabled = false;
316 }
317 ta_[3].type_name = "single_delete";
318 if (FLAGS_analyze_single_delete) {
319 ta_[3].enabled = true;
320 } else {
321 ta_[3].enabled = false;
322 }
323 ta_[4].type_name = "range_delete";
324 if (FLAGS_analyze_range_delete) {
325 ta_[4].enabled = true;
326 } else {
327 ta_[4].enabled = false;
328 }
329 ta_[5].type_name = "merge";
330 if (FLAGS_analyze_merge) {
331 ta_[5].enabled = true;
332 } else {
333 ta_[5].enabled = false;
334 }
335 ta_[6].type_name = "iterator_Seek";
336 if (FLAGS_analyze_iterator) {
337 ta_[6].enabled = true;
338 } else {
339 ta_[6].enabled = false;
340 }
341 ta_[7].type_name = "iterator_SeekForPrev";
342 if (FLAGS_analyze_iterator) {
343 ta_[7].enabled = true;
344 } else {
345 ta_[7].enabled = false;
346 }
494da23a
TL
347 for (int i = 0; i < kTaTypeNum; i++) {
348 ta_[i].sample_count = 0;
349 }
11fdf7f2
TL
350}
351
352TraceAnalyzer::~TraceAnalyzer() {}
353
354// Prepare the processing
355// Initiate the global trace reader and writer here
356Status TraceAnalyzer::PrepareProcessing() {
357 Status s;
358 // Prepare the trace reader
359 s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_);
360 if (!s.ok()) {
361 return s;
362 }
363
364 // Prepare and open the trace sequence file writer if needed
365 if (FLAGS_convert_to_human_readable_trace) {
366 std::string trace_sequence_name;
367 trace_sequence_name =
368 output_path_ + "/" + FLAGS_output_prefix + "-human_readable_trace.txt";
369 s = env_->NewWritableFile(trace_sequence_name, &trace_sequence_f_,
370 env_options_);
371 if (!s.ok()) {
372 return s;
373 }
374 }
375
376 // prepare the general QPS file writer
377 if (FLAGS_output_qps_stats) {
378 std::string qps_stats_name;
379 qps_stats_name =
380 output_path_ + "/" + FLAGS_output_prefix + "-qps_stats.txt";
381 s = env_->NewWritableFile(qps_stats_name, &qps_f_, env_options_);
382 if (!s.ok()) {
383 return s;
384 }
494da23a
TL
385
386 qps_stats_name =
387 output_path_ + "/" + FLAGS_output_prefix + "-cf_qps_stats.txt";
388 s = env_->NewWritableFile(qps_stats_name, &cf_qps_f_, env_options_);
389 if (!s.ok()) {
390 return s;
391 }
11fdf7f2
TL
392 }
393 return Status::OK();
394}
395
396Status TraceAnalyzer::ReadTraceHeader(Trace* header) {
397 assert(header != nullptr);
398 Status s = ReadTraceRecord(header);
399 if (!s.ok()) {
400 return s;
401 }
402 if (header->type != kTraceBegin) {
403 return Status::Corruption("Corrupted trace file. Incorrect header.");
404 }
405 if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
406 return Status::Corruption("Corrupted trace file. Incorrect magic.");
407 }
408
409 return s;
410}
411
412Status TraceAnalyzer::ReadTraceFooter(Trace* footer) {
413 assert(footer != nullptr);
414 Status s = ReadTraceRecord(footer);
415 if (!s.ok()) {
416 return s;
417 }
418 if (footer->type != kTraceEnd) {
419 return Status::Corruption("Corrupted trace file. Incorrect footer.");
420 }
421 return s;
422}
423
424Status TraceAnalyzer::ReadTraceRecord(Trace* trace) {
425 assert(trace != nullptr);
426 std::string encoded_trace;
427 Status s = trace_reader_->Read(&encoded_trace);
428 if (!s.ok()) {
429 return s;
430 }
431
432 Slice enc_slice = Slice(encoded_trace);
433 GetFixed64(&enc_slice, &trace->ts);
434 trace->type = static_cast<TraceType>(enc_slice[0]);
435 enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
436 trace->payload = enc_slice.ToString();
437 return s;
438}
439
440// process the trace itself and redirect the trace content
441// to different operation type handler. With different race
442// format, this function can be changed
443Status TraceAnalyzer::StartProcessing() {
444 Status s;
445 Trace header;
446 s = ReadTraceHeader(&header);
447 if (!s.ok()) {
448 fprintf(stderr, "Cannot read the header\n");
449 return s;
450 }
494da23a 451 trace_create_time_ = header.ts;
11fdf7f2
TL
452 if (FLAGS_output_time_series) {
453 time_series_start_ = header.ts;
454 }
455
456 Trace trace;
457 while (s.ok()) {
458 trace.reset();
459 s = ReadTraceRecord(&trace);
460 if (!s.ok()) {
461 break;
462 }
463
464 total_requests_++;
465 end_time_ = trace.ts;
466 if (trace.type == kTraceWrite) {
467 total_writes_++;
468 c_time_ = trace.ts;
469 WriteBatch batch(trace.payload);
470
471 // Note that, if the write happens in a transaction,
472 // 'Write' will be called twice, one for Prepare, one for
473 // Commit. Thus, in the trace, for the same WriteBatch, there
474 // will be two reords if it is in a transaction. Here, we only
475 // process the reord that is committed. If write is non-transaction,
476 // HasBeginPrepare()==false, so we process it normally.
477 if (batch.HasBeginPrepare() && !batch.HasCommit()) {
478 continue;
479 }
480 TraceWriteHandler write_handler(this);
481 s = batch.Iterate(&write_handler);
482 if (!s.ok()) {
483 fprintf(stderr, "Cannot process the write batch in the trace\n");
484 return s;
485 }
486 } else if (trace.type == kTraceGet) {
487 uint32_t cf_id = 0;
488 Slice key;
489 DecodeCFAndKeyFromString(trace.payload, &cf_id, &key);
490 total_gets_++;
491
492 s = HandleGet(cf_id, key.ToString(), trace.ts, 1);
493 if (!s.ok()) {
494 fprintf(stderr, "Cannot process the get in the trace\n");
495 return s;
496 }
497 } else if (trace.type == kTraceIteratorSeek ||
498 trace.type == kTraceIteratorSeekForPrev) {
499 uint32_t cf_id = 0;
500 Slice key;
501 DecodeCFAndKeyFromString(trace.payload, &cf_id, &key);
502 s = HandleIter(cf_id, key.ToString(), trace.ts, trace.type);
503 if (!s.ok()) {
504 fprintf(stderr, "Cannot process the iterator in the trace\n");
505 return s;
506 }
507 } else if (trace.type == kTraceEnd) {
508 break;
509 }
510 }
511 if (s.IsIncomplete()) {
512 // Fix it: Reaching eof returns Incomplete status at the moment.
513 //
514 return Status::OK();
515 }
516 return s;
517}
518
519// After the trace is processed by StartProcessing, the statistic data
520// is stored in the map or other in memory data structures. To get the
521// other statistic result such as key size distribution, value size
522// distribution, these data structures are re-processed here.
523Status TraceAnalyzer::MakeStatistics() {
524 int ret;
525 Status s;
526 for (int type = 0; type < kTaTypeNum; type++) {
527 if (!ta_[type].enabled) {
528 continue;
529 }
530 for (auto& stat : ta_[type].stats) {
531 stat.second.a_key_id = 0;
532 for (auto& record : stat.second.a_key_stats) {
533 record.second.key_id = stat.second.a_key_id;
534 stat.second.a_key_id++;
535 if (record.second.access_count <=
536 static_cast<uint64_t>(FLAGS_output_ignore_count)) {
537 continue;
538 }
539
540 // Generate the key access count distribution data
541 if (FLAGS_output_access_count_stats) {
542 if (stat.second.a_count_stats.find(record.second.access_count) ==
543 stat.second.a_count_stats.end()) {
544 stat.second.a_count_stats[record.second.access_count] = 1;
545 } else {
546 stat.second.a_count_stats[record.second.access_count]++;
547 }
548 }
549
550 // Generate the key size distribution data
494da23a 551 if (FLAGS_output_key_distribution) {
11fdf7f2
TL
552 if (stat.second.a_key_size_stats.find(record.first.size()) ==
553 stat.second.a_key_size_stats.end()) {
554 stat.second.a_key_size_stats[record.first.size()] = 1;
555 } else {
556 stat.second.a_key_size_stats[record.first.size()]++;
557 }
558 }
559
560 if (!FLAGS_print_correlation.empty()) {
561 s = MakeStatisticCorrelation(stat.second, record.second);
562 if (!s.ok()) {
563 return s;
564 }
565 }
566 }
567
568 // Output the prefix cut or the whole content of the accessed key space
569 if (FLAGS_output_key_stats || FLAGS_output_prefix_cut > 0) {
570 s = MakeStatisticKeyStatsOrPrefix(stat.second);
571 if (!s.ok()) {
572 return s;
573 }
574 }
575
576 // output the access count distribution
577 if (FLAGS_output_access_count_stats && stat.second.a_count_dist_f) {
578 for (auto& record : stat.second.a_count_stats) {
579 ret = sprintf(buffer_, "access_count: %" PRIu64 " num: %" PRIu64 "\n",
580 record.first, record.second);
581 if (ret < 0) {
582 return Status::IOError("Format the output failed");
583 }
584 std::string printout(buffer_);
585 s = stat.second.a_count_dist_f->Append(printout);
586 if (!s.ok()) {
587 fprintf(stderr, "Write access count distribution file failed\n");
588 return s;
589 }
590 }
591 }
592
593 // find the medium of the key size
594 uint64_t k_count = 0;
494da23a 595 bool get_mid = false;
11fdf7f2
TL
596 for (auto& record : stat.second.a_key_size_stats) {
597 k_count += record.second;
494da23a 598 if (!get_mid && k_count >= stat.second.a_key_mid) {
11fdf7f2 599 stat.second.a_key_mid = record.first;
494da23a
TL
600 get_mid = true;
601 }
602 if (FLAGS_output_key_distribution && stat.second.a_key_size_f) {
603 ret = sprintf(buffer_, "%" PRIu64 " %" PRIu64 "\n", record.first,
604 record.second);
605 if (ret < 0) {
606 return Status::IOError("Format output failed");
607 }
608 std::string printout(buffer_);
609 s = stat.second.a_key_size_f->Append(printout);
610 if (!s.ok()) {
611 fprintf(stderr, "Write key size distribution file failed\n");
612 return s;
613 }
11fdf7f2
TL
614 }
615 }
616
617 // output the value size distribution
618 uint64_t v_begin = 0, v_end = 0, v_count = 0;
494da23a 619 get_mid = false;
11fdf7f2
TL
620 for (auto& record : stat.second.a_value_size_stats) {
621 v_begin = v_end;
622 v_end = (record.first + 1) * FLAGS_value_interval;
623 v_count += record.second;
624 if (!get_mid && v_count >= stat.second.a_count / 2) {
625 stat.second.a_value_mid = (v_begin + v_end) / 2;
626 get_mid = true;
627 }
628 if (FLAGS_output_value_distribution && stat.second.a_value_size_f &&
629 (type == TraceOperationType::kPut ||
630 type == TraceOperationType::kMerge)) {
631 ret = sprintf(buffer_,
632 "Number_of_value_size_between %" PRIu64 " and %" PRIu64
633 " is: %" PRIu64 "\n",
634 v_begin, v_end, record.second);
635 if (ret < 0) {
636 return Status::IOError("Format output failed");
637 }
638 std::string printout(buffer_);
639 s = stat.second.a_value_size_f->Append(printout);
640 if (!s.ok()) {
641 fprintf(stderr, "Write value size distribution file failed\n");
642 return s;
643 }
644 }
645 }
646 }
647 }
648
649 // Make the QPS statistics
650 if (FLAGS_output_qps_stats) {
651 s = MakeStatisticQPS();
652 if (!s.ok()) {
653 return s;
654 }
655 }
656
657 return Status::OK();
658}
659
660// Process the statistics of the key access and
661// prefix of the accessed keys if required
662Status TraceAnalyzer::MakeStatisticKeyStatsOrPrefix(TraceStats& stats) {
663 int ret;
664 Status s;
665 std::string prefix = "0";
666 uint64_t prefix_access = 0;
667 uint64_t prefix_count = 0;
668 uint64_t prefix_succ_access = 0;
669 double prefix_ave_access = 0.0;
670 stats.a_succ_count = 0;
671 for (auto& record : stats.a_key_stats) {
672 // write the key access statistic file
673 if (!stats.a_key_f) {
674 return Status::IOError("Failed to open accessed_key_stats file.");
675 }
676 stats.a_succ_count += record.second.succ_count;
677 double succ_ratio = 0.0;
678 if (record.second.access_count > 0) {
679 succ_ratio = (static_cast<double>(record.second.succ_count)) /
680 record.second.access_count;
681 }
682 ret = sprintf(buffer_, "%u %zu %" PRIu64 " %" PRIu64 " %f\n",
683 record.second.cf_id, record.second.value_size,
684 record.second.key_id, record.second.access_count, succ_ratio);
685 if (ret < 0) {
686 return Status::IOError("Format output failed");
687 }
688 std::string printout(buffer_);
689 s = stats.a_key_f->Append(printout);
690 if (!s.ok()) {
691 fprintf(stderr, "Write key access file failed\n");
692 return s;
693 }
694
695 // write the prefix cut of the accessed keys
696 if (FLAGS_output_prefix_cut > 0 && stats.a_prefix_cut_f) {
697 if (record.first.compare(0, FLAGS_output_prefix_cut, prefix) != 0) {
698 std::string prefix_out = rocksdb::LDBCommand::StringToHex(prefix);
699 if (prefix_count == 0) {
700 prefix_ave_access = 0.0;
701 } else {
702 prefix_ave_access =
703 (static_cast<double>(prefix_access)) / prefix_count;
704 }
705 double prefix_succ_ratio = 0.0;
706 if (prefix_access > 0) {
707 prefix_succ_ratio =
708 (static_cast<double>(prefix_succ_access)) / prefix_access;
709 }
710 ret = sprintf(buffer_, "%" PRIu64 " %" PRIu64 " %" PRIu64 " %f %f %s\n",
711 record.second.key_id, prefix_access, prefix_count,
712 prefix_ave_access, prefix_succ_ratio, prefix_out.c_str());
713 if (ret < 0) {
714 return Status::IOError("Format output failed");
715 }
716 std::string pout(buffer_);
717 s = stats.a_prefix_cut_f->Append(pout);
718 if (!s.ok()) {
719 fprintf(stderr, "Write accessed key prefix file failed\n");
720 return s;
721 }
722
723 // make the top k statistic for the prefix
724 if (static_cast<int32_t>(stats.top_k_prefix_access.size()) <
725 FLAGS_print_top_k_access) {
726 stats.top_k_prefix_access.push(
727 std::make_pair(prefix_access, prefix_out));
728 } else {
729 if (prefix_access > stats.top_k_prefix_access.top().first) {
730 stats.top_k_prefix_access.pop();
731 stats.top_k_prefix_access.push(
732 std::make_pair(prefix_access, prefix_out));
733 }
734 }
735
736 if (static_cast<int32_t>(stats.top_k_prefix_ave.size()) <
737 FLAGS_print_top_k_access) {
738 stats.top_k_prefix_ave.push(
739 std::make_pair(prefix_ave_access, prefix_out));
740 } else {
741 if (prefix_ave_access > stats.top_k_prefix_ave.top().first) {
742 stats.top_k_prefix_ave.pop();
743 stats.top_k_prefix_ave.push(
744 std::make_pair(prefix_ave_access, prefix_out));
745 }
746 }
747
748 prefix = record.first.substr(0, FLAGS_output_prefix_cut);
749 prefix_access = 0;
750 prefix_count = 0;
751 prefix_succ_access = 0;
752 }
753 prefix_access += record.second.access_count;
754 prefix_count += 1;
755 prefix_succ_access += record.second.succ_count;
756 }
757 }
758 return Status::OK();
759}
760
761// Process the statistics of different query type
762// correlations
763Status TraceAnalyzer::MakeStatisticCorrelation(TraceStats& stats,
764 StatsUnit& unit) {
765 if (stats.correlation_output.size() !=
766 analyzer_opts_.correlation_list.size()) {
767 return Status::Corruption("Cannot make the statistic of correlation.");
768 }
769
770 for (int i = 0; i < static_cast<int>(analyzer_opts_.correlation_list.size());
771 i++) {
772 if (i >= static_cast<int>(stats.correlation_output.size()) ||
773 i >= static_cast<int>(unit.v_correlation.size())) {
774 break;
775 }
776 stats.correlation_output[i].first += unit.v_correlation[i].count;
777 stats.correlation_output[i].second += unit.v_correlation[i].total_ts;
778 }
779 return Status::OK();
780}
781
782// Process the statistics of QPS
783Status TraceAnalyzer::MakeStatisticQPS() {
494da23a
TL
784 if(begin_time_ == 0) {
785 begin_time_ = trace_create_time_;
786 }
11fdf7f2
TL
787 uint32_t duration =
788 static_cast<uint32_t>((end_time_ - begin_time_) / 1000000);
789 int ret;
790 Status s;
791 std::vector<std::vector<uint32_t>> type_qps(
792 duration, std::vector<uint32_t>(kTaTypeNum + 1, 0));
793 std::vector<uint64_t> qps_sum(kTaTypeNum + 1, 0);
794 std::vector<uint32_t> qps_peak(kTaTypeNum + 1, 0);
795 qps_ave_.resize(kTaTypeNum + 1);
796
797 for (int type = 0; type < kTaTypeNum; type++) {
798 if (!ta_[type].enabled) {
799 continue;
800 }
801 for (auto& stat : ta_[type].stats) {
802 uint32_t time_line = 0;
803 uint64_t cf_qps_sum = 0;
804 for (auto& time_it : stat.second.a_qps_stats) {
805 if (time_it.first >= duration) {
806 continue;
807 }
808 type_qps[time_it.first][kTaTypeNum] += time_it.second;
809 type_qps[time_it.first][type] += time_it.second;
810 cf_qps_sum += time_it.second;
811 if (time_it.second > stat.second.a_peak_qps) {
812 stat.second.a_peak_qps = time_it.second;
813 }
814 if (stat.second.a_qps_f) {
815 while (time_line < time_it.first) {
816 ret = sprintf(buffer_, "%u\n", 0);
817 if (ret < 0) {
818 return Status::IOError("Format the output failed");
819 }
820 std::string printout(buffer_);
821 s = stat.second.a_qps_f->Append(printout);
822 if (!s.ok()) {
823 fprintf(stderr, "Write QPS file failed\n");
824 return s;
825 }
826 time_line++;
827 }
828 ret = sprintf(buffer_, "%u\n", time_it.second);
829 if (ret < 0) {
830 return Status::IOError("Format the output failed");
831 }
832 std::string printout(buffer_);
833 s = stat.second.a_qps_f->Append(printout);
834 if (!s.ok()) {
835 fprintf(stderr, "Write QPS file failed\n");
836 return s;
837 }
838 if (time_line == time_it.first) {
839 time_line++;
840 }
841 }
842
843 // Process the top k QPS peaks
844 if (FLAGS_output_prefix_cut > 0) {
845 if (static_cast<int32_t>(stat.second.top_k_qps_sec.size()) <
846 FLAGS_print_top_k_access) {
847 stat.second.top_k_qps_sec.push(
848 std::make_pair(time_it.second, time_it.first));
849 } else {
850 if (stat.second.top_k_qps_sec.size() > 0 &&
851 stat.second.top_k_qps_sec.top().first < time_it.second) {
852 stat.second.top_k_qps_sec.pop();
853 stat.second.top_k_qps_sec.push(
854 std::make_pair(time_it.second, time_it.first));
855 }
856 }
857 }
858 }
859 if (duration == 0) {
860 stat.second.a_ave_qps = 0;
861 } else {
862 stat.second.a_ave_qps = (static_cast<double>(cf_qps_sum)) / duration;
863 }
864
494da23a
TL
865 // Output the accessed unique key number change overtime
866 if (stat.second.a_key_num_f) {
867 uint64_t cur_uni_key =
868 static_cast<uint64_t>(stat.second.a_key_stats.size());
869 double cur_ratio = 0.0;
870 uint64_t cur_num = 0;
871 for (uint32_t i = 0; i < duration; i++) {
872 auto find_time = stat.second.uni_key_num.find(i);
873 if (find_time != stat.second.uni_key_num.end()) {
874 cur_ratio = (static_cast<double>(find_time->second)) / cur_uni_key;
875 cur_num = find_time->second;
876 }
877 ret = sprintf(buffer_, "%" PRIu64 " %.12f\n", cur_num, cur_ratio);
878 if (ret < 0) {
879 return Status::IOError("Format the output failed");
880 }
881 std::string printout(buffer_);
882 s = stat.second.a_key_num_f->Append(printout);
883 if (!s.ok()) {
884 fprintf(stderr,
885 "Write accessed unique key number change file failed\n");
886 return s;
887 }
888 }
889 }
890
11fdf7f2
TL
891 // output the prefix of top k access peak
892 if (FLAGS_output_prefix_cut > 0 && stat.second.a_top_qps_prefix_f) {
893 while (!stat.second.top_k_qps_sec.empty()) {
894 ret = sprintf(buffer_, "At time: %u with QPS: %u\n",
895 stat.second.top_k_qps_sec.top().second,
896 stat.second.top_k_qps_sec.top().first);
897 if (ret < 0) {
898 return Status::IOError("Format the output failed");
899 }
900 std::string printout(buffer_);
901 s = stat.second.a_top_qps_prefix_f->Append(printout);
902 if (!s.ok()) {
903 fprintf(stderr, "Write prefix QPS top K file failed\n");
904 return s;
905 }
906 uint32_t qps_time = stat.second.top_k_qps_sec.top().second;
907 stat.second.top_k_qps_sec.pop();
908 if (stat.second.a_qps_prefix_stats.find(qps_time) !=
909 stat.second.a_qps_prefix_stats.end()) {
910 for (auto& qps_prefix : stat.second.a_qps_prefix_stats[qps_time]) {
911 std::string qps_prefix_out =
912 rocksdb::LDBCommand::StringToHex(qps_prefix.first);
913 ret = sprintf(buffer_, "The prefix: %s Access count: %u\n",
914 qps_prefix_out.c_str(), qps_prefix.second);
915 if (ret < 0) {
916 return Status::IOError("Format the output failed");
917 }
918 std::string pout(buffer_);
919 s = stat.second.a_top_qps_prefix_f->Append(pout);
920 if (!s.ok()) {
921 fprintf(stderr, "Write prefix QPS top K file failed\n");
922 return s;
923 }
924 }
925 }
926 }
927 }
928 }
929 }
930
931 if (qps_f_) {
932 for (uint32_t i = 0; i < duration; i++) {
933 for (int type = 0; type <= kTaTypeNum; type++) {
934 if (type < kTaTypeNum) {
935 ret = sprintf(buffer_, "%u ", type_qps[i][type]);
936 } else {
937 ret = sprintf(buffer_, "%u\n", type_qps[i][type]);
938 }
939 if (ret < 0) {
940 return Status::IOError("Format the output failed");
941 }
942 std::string printout(buffer_);
943 s = qps_f_->Append(printout);
944 if (!s.ok()) {
945 return s;
946 }
947 qps_sum[type] += type_qps[i][type];
948 if (type_qps[i][type] > qps_peak[type]) {
949 qps_peak[type] = type_qps[i][type];
950 }
951 }
952 }
953 }
954
494da23a
TL
955 if (cf_qps_f_) {
956 int cfs_size = static_cast<uint32_t>(cfs_.size());
957 uint32_t v;
958 for (uint32_t i = 0; i < duration; i++) {
959 for (int cf = 0; cf < cfs_size; cf++) {
960 if (cfs_[cf].cf_qps.find(i) != cfs_[cf].cf_qps.end()) {
961 v = cfs_[cf].cf_qps[i];
962 } else {
963 v = 0;
964 }
965 if (cf < cfs_size - 1) {
966 ret = sprintf(buffer_, "%u ", v);
967 } else {
968 ret = sprintf(buffer_, "%u\n", v);
969 }
970 if (ret < 0) {
971 return Status::IOError("Format the output failed");
972 }
973 std::string printout(buffer_);
974 s = cf_qps_f_->Append(printout);
975 if (!s.ok()) {
976 return s;
977 }
978 }
979 }
980 }
981
11fdf7f2
TL
982 qps_peak_ = qps_peak;
983 for (int type = 0; type <= kTaTypeNum; type++) {
984 if (duration == 0) {
985 qps_ave_[type] = 0;
986 } else {
987 qps_ave_[type] = (static_cast<double>(qps_sum[type])) / duration;
988 }
989 }
990
991 return Status::OK();
992}
993
994// In reprocessing, if we have the whole key space
995// we can output the access count of all keys in a cf
996// we can make some statistics of the whole key space
997// also, we output the top k accessed keys here
998Status TraceAnalyzer::ReProcessing() {
999 int ret;
1000 Status s;
1001 for (auto& cf_it : cfs_) {
1002 uint32_t cf_id = cf_it.first;
1003
1004 // output the time series;
1005 if (FLAGS_output_time_series) {
1006 for (int type = 0; type < kTaTypeNum; type++) {
1007 if (!ta_[type].enabled ||
1008 ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
1009 continue;
1010 }
1011 TraceStats& stat = ta_[type].stats[cf_id];
1012 if (!stat.time_series_f) {
1013 fprintf(stderr, "Cannot write time_series of '%s' in '%u'\n",
1014 ta_[type].type_name.c_str(), cf_id);
1015 continue;
1016 }
1017 while (!stat.time_series.empty()) {
1018 uint64_t key_id = 0;
1019 auto found = stat.a_key_stats.find(stat.time_series.front().key);
1020 if (found != stat.a_key_stats.end()) {
1021 key_id = found->second.key_id;
1022 }
1023 ret = sprintf(buffer_, "%u %" PRIu64 " %" PRIu64 "\n",
1024 stat.time_series.front().type,
1025 stat.time_series.front().ts, key_id);
1026 if (ret < 0) {
1027 return Status::IOError("Format the output failed");
1028 }
1029 std::string printout(buffer_);
1030 s = stat.time_series_f->Append(printout);
1031 if (!s.ok()) {
1032 fprintf(stderr, "Write time series file failed\n");
1033 return s;
1034 }
1035 stat.time_series.pop_front();
1036 }
1037 }
1038 }
1039
1040 // process the whole key space if needed
1041 if (!FLAGS_key_space_dir.empty()) {
1042 std::string whole_key_path =
1043 FLAGS_key_space_dir + "/" + std::to_string(cf_id) + ".txt";
1044 std::string input_key, get_key;
1045 std::vector<std::string> prefix(kTaTypeNum);
1046 std::istringstream iss;
1047 bool has_data = true;
1048 s = env_->NewSequentialFile(whole_key_path, &wkey_input_f_, env_options_);
1049 if (!s.ok()) {
1050 fprintf(stderr, "Cannot open the whole key space file of CF: %u\n",
1051 cf_id);
1052 wkey_input_f_.reset();
1053 }
1054 if (wkey_input_f_) {
1055 for (cfs_[cf_id].w_count = 0;
1056 ReadOneLine(&iss, wkey_input_f_.get(), &get_key, &has_data, &s);
1057 ++cfs_[cf_id].w_count) {
1058 if (!s.ok()) {
1059 fprintf(stderr, "Read whole key space file failed\n");
1060 return s;
1061 }
1062
1063 input_key = rocksdb::LDBCommand::HexToString(get_key);
1064 for (int type = 0; type < kTaTypeNum; type++) {
1065 if (!ta_[type].enabled) {
1066 continue;
1067 }
1068 TraceStats& stat = ta_[type].stats[cf_id];
1069 if (stat.w_key_f) {
1070 if (stat.a_key_stats.find(input_key) != stat.a_key_stats.end()) {
1071 ret = sprintf(buffer_, "%" PRIu64 " %" PRIu64 "\n",
1072 cfs_[cf_id].w_count,
1073 stat.a_key_stats[input_key].access_count);
1074 if (ret < 0) {
1075 return Status::IOError("Format the output failed");
1076 }
1077 std::string printout(buffer_);
1078 s = stat.w_key_f->Append(printout);
1079 if (!s.ok()) {
1080 fprintf(stderr, "Write whole key space access file failed\n");
1081 return s;
1082 }
1083 }
1084 }
1085
1086 // Output the prefix cut file of the whole key space
1087 if (FLAGS_output_prefix_cut > 0 && stat.w_prefix_cut_f) {
1088 if (input_key.compare(0, FLAGS_output_prefix_cut, prefix[type]) !=
1089 0) {
1090 prefix[type] = input_key.substr(0, FLAGS_output_prefix_cut);
1091 std::string prefix_out =
1092 rocksdb::LDBCommand::StringToHex(prefix[type]);
1093 ret = sprintf(buffer_, "%" PRIu64 " %s\n", cfs_[cf_id].w_count,
1094 prefix_out.c_str());
1095 if (ret < 0) {
1096 return Status::IOError("Format the output failed");
1097 }
1098 std::string printout(buffer_);
1099 s = stat.w_prefix_cut_f->Append(printout);
1100 if (!s.ok()) {
1101 fprintf(stderr,
1102 "Write whole key space prefix cut file failed\n");
1103 return s;
1104 }
1105 }
1106 }
1107 }
1108
1109 // Make the statistics fo the key size distribution
494da23a 1110 if (FLAGS_output_key_distribution) {
11fdf7f2
TL
1111 if (cfs_[cf_id].w_key_size_stats.find(input_key.size()) ==
1112 cfs_[cf_id].w_key_size_stats.end()) {
1113 cfs_[cf_id].w_key_size_stats[input_key.size()] = 1;
1114 } else {
1115 cfs_[cf_id].w_key_size_stats[input_key.size()]++;
1116 }
1117 }
1118 }
1119 }
1120 }
1121
1122 // process the top k accessed keys
1123 if (FLAGS_print_top_k_access > 0) {
1124 for (int type = 0; type < kTaTypeNum; type++) {
1125 if (!ta_[type].enabled ||
1126 ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
1127 continue;
1128 }
1129 TraceStats& stat = ta_[type].stats[cf_id];
1130 for (auto& record : stat.a_key_stats) {
1131 if (static_cast<int32_t>(stat.top_k_queue.size()) <
1132 FLAGS_print_top_k_access) {
1133 stat.top_k_queue.push(
1134 std::make_pair(record.second.access_count, record.first));
1135 } else {
1136 if (record.second.access_count > stat.top_k_queue.top().first) {
1137 stat.top_k_queue.pop();
1138 stat.top_k_queue.push(
1139 std::make_pair(record.second.access_count, record.first));
1140 }
1141 }
1142 }
1143 }
1144 }
1145 }
1146 return Status::OK();
1147}
1148
1149// End the processing, print the requested results
1150Status TraceAnalyzer::EndProcessing() {
1151 if (trace_sequence_f_) {
1152 trace_sequence_f_->Close();
1153 }
1154 if (FLAGS_no_print) {
1155 return Status::OK();
1156 }
1157 PrintStatistics();
1158 CloseOutputFiles();
1159 return Status::OK();
1160}
1161
1162// Insert the corresponding key statistics to the correct type
1163// and correct CF, output the time-series file if needed
1164Status TraceAnalyzer::KeyStatsInsertion(const uint32_t& type,
1165 const uint32_t& cf_id,
1166 const std::string& key,
1167 const size_t value_size,
1168 const uint64_t ts) {
1169 Status s;
1170 StatsUnit unit;
1171 unit.key_id = 0;
1172 unit.cf_id = cf_id;
1173 unit.value_size = value_size;
1174 unit.access_count = 1;
1175 unit.latest_ts = ts;
1176 if (type != TraceOperationType::kGet || value_size > 0) {
1177 unit.succ_count = 1;
1178 } else {
1179 unit.succ_count = 0;
1180 }
1181 unit.v_correlation.resize(analyzer_opts_.correlation_list.size());
1182 for (int i = 0;
1183 i < (static_cast<int>(analyzer_opts_.correlation_list.size())); i++) {
1184 unit.v_correlation[i].count = 0;
1185 unit.v_correlation[i].total_ts = 0;
1186 }
1187 std::string prefix;
1188 if (FLAGS_output_prefix_cut > 0) {
1189 prefix = key.substr(0, FLAGS_output_prefix_cut);
1190 }
1191
1192 if (begin_time_ == 0) {
1193 begin_time_ = ts;
1194 }
1195 uint32_t time_in_sec;
1196 if (ts < begin_time_) {
1197 time_in_sec = 0;
1198 } else {
1199 time_in_sec = static_cast<uint32_t>((ts - begin_time_) / 1000000);
1200 }
1201
1202 uint64_t dist_value_size = value_size / FLAGS_value_interval;
1203 auto found_stats = ta_[type].stats.find(cf_id);
1204 if (found_stats == ta_[type].stats.end()) {
1205 ta_[type].stats[cf_id].cf_id = cf_id;
1206 ta_[type].stats[cf_id].cf_name = std::to_string(cf_id);
1207 ta_[type].stats[cf_id].a_count = 1;
1208 ta_[type].stats[cf_id].a_key_id = 0;
1209 ta_[type].stats[cf_id].a_key_size_sqsum = MultiplyCheckOverflow(
1210 static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
1211 ta_[type].stats[cf_id].a_key_size_sum = key.size();
1212 ta_[type].stats[cf_id].a_value_size_sqsum = MultiplyCheckOverflow(
1213 static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
1214 ta_[type].stats[cf_id].a_value_size_sum = value_size;
1215 s = OpenStatsOutputFiles(ta_[type].type_name, ta_[type].stats[cf_id]);
1216 if (!FLAGS_print_correlation.empty()) {
1217 s = StatsUnitCorrelationUpdate(unit, type, ts, key);
1218 }
1219 ta_[type].stats[cf_id].a_key_stats[key] = unit;
1220 ta_[type].stats[cf_id].a_value_size_stats[dist_value_size] = 1;
1221 ta_[type].stats[cf_id].a_qps_stats[time_in_sec] = 1;
1222 ta_[type].stats[cf_id].correlation_output.resize(
1223 analyzer_opts_.correlation_list.size());
1224 if (FLAGS_output_prefix_cut > 0) {
1225 std::map<std::string, uint32_t> tmp_qps_map;
1226 tmp_qps_map[prefix] = 1;
1227 ta_[type].stats[cf_id].a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
1228 }
494da23a
TL
1229 if (time_in_sec != cur_time_sec_) {
1230 ta_[type].stats[cf_id].uni_key_num[cur_time_sec_] =
1231 static_cast<uint64_t>(ta_[type].stats[cf_id].a_key_stats.size());
1232 cur_time_sec_ = time_in_sec;
1233 }
11fdf7f2
TL
1234 } else {
1235 found_stats->second.a_count++;
1236 found_stats->second.a_key_size_sqsum += MultiplyCheckOverflow(
1237 static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
1238 found_stats->second.a_key_size_sum += key.size();
1239 found_stats->second.a_value_size_sqsum += MultiplyCheckOverflow(
1240 static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
1241 found_stats->second.a_value_size_sum += value_size;
1242 auto found_key = found_stats->second.a_key_stats.find(key);
1243 if (found_key == found_stats->second.a_key_stats.end()) {
1244 found_stats->second.a_key_stats[key] = unit;
1245 } else {
1246 found_key->second.access_count++;
1247 if (type != TraceOperationType::kGet || value_size > 0) {
1248 found_key->second.succ_count++;
1249 }
1250 if (!FLAGS_print_correlation.empty()) {
1251 s = StatsUnitCorrelationUpdate(found_key->second, type, ts, key);
1252 }
1253 }
494da23a
TL
1254 if (time_in_sec != cur_time_sec_) {
1255 found_stats->second.uni_key_num[cur_time_sec_] =
1256 static_cast<uint64_t>(found_stats->second.a_key_stats.size());
1257 cur_time_sec_ = time_in_sec;
1258 }
11fdf7f2
TL
1259
1260 auto found_value =
1261 found_stats->second.a_value_size_stats.find(dist_value_size);
1262 if (found_value == found_stats->second.a_value_size_stats.end()) {
1263 found_stats->second.a_value_size_stats[dist_value_size] = 1;
1264 } else {
1265 found_value->second++;
1266 }
1267
1268 auto found_qps = found_stats->second.a_qps_stats.find(time_in_sec);
1269 if (found_qps == found_stats->second.a_qps_stats.end()) {
1270 found_stats->second.a_qps_stats[time_in_sec] = 1;
1271 } else {
1272 found_qps->second++;
1273 }
1274
1275 if (FLAGS_output_prefix_cut > 0) {
1276 auto found_qps_prefix =
1277 found_stats->second.a_qps_prefix_stats.find(time_in_sec);
1278 if (found_qps_prefix == found_stats->second.a_qps_prefix_stats.end()) {
1279 std::map<std::string, uint32_t> tmp_qps_map;
1280 found_stats->second.a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
1281 }
1282 if (found_stats->second.a_qps_prefix_stats[time_in_sec].find(prefix) ==
1283 found_stats->second.a_qps_prefix_stats[time_in_sec].end()) {
1284 found_stats->second.a_qps_prefix_stats[time_in_sec][prefix] = 1;
1285 } else {
1286 found_stats->second.a_qps_prefix_stats[time_in_sec][prefix]++;
1287 }
1288 }
1289 }
1290
1291 if (cfs_.find(cf_id) == cfs_.end()) {
1292 CfUnit cf_unit;
1293 cf_unit.cf_id = cf_id;
1294 cf_unit.w_count = 0;
1295 cf_unit.a_count = 0;
1296 cfs_[cf_id] = cf_unit;
1297 }
1298
494da23a
TL
1299 if (FLAGS_output_qps_stats) {
1300 cfs_[cf_id].cf_qps[time_in_sec]++;
1301 }
1302
11fdf7f2
TL
1303 if (FLAGS_output_time_series) {
1304 TraceUnit trace_u;
1305 trace_u.type = type;
1306 trace_u.key = key;
1307 trace_u.value_size = value_size;
1308 trace_u.ts = (ts - time_series_start_) / 1000000;
1309 trace_u.cf_id = cf_id;
1310 ta_[type].stats[cf_id].time_series.push_back(trace_u);
1311 }
1312
1313 return Status::OK();
1314}
1315
1316// Update the correlation unit of each key if enabled
1317Status TraceAnalyzer::StatsUnitCorrelationUpdate(StatsUnit& unit,
1318 const uint32_t& type_second,
1319 const uint64_t& ts,
1320 const std::string& key) {
1321 if (type_second >= kTaTypeNum) {
1322 fprintf(stderr, "Unknown Type Id: %u\n", type_second);
1323 return Status::NotFound();
1324 }
1325
1326 for (int type_first = 0; type_first < kTaTypeNum; type_first++) {
1327 if (type_first >= static_cast<int>(ta_.size()) ||
1328 type_first >= static_cast<int>(analyzer_opts_.correlation_map.size())) {
1329 break;
1330 }
1331 if (analyzer_opts_.correlation_map[type_first][type_second] < 0 ||
1332 ta_[type_first].stats.find(unit.cf_id) == ta_[type_first].stats.end() ||
1333 ta_[type_first].stats[unit.cf_id].a_key_stats.find(key) ==
1334 ta_[type_first].stats[unit.cf_id].a_key_stats.end() ||
1335 ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts == ts) {
1336 continue;
1337 }
1338
1339 int correlation_id =
1340 analyzer_opts_.correlation_map[type_first][type_second];
1341
1342 // after get the x-y operation time or x, update;
1343 if (correlation_id < 0 ||
1344 correlation_id >= static_cast<int>(unit.v_correlation.size())) {
1345 continue;
1346 }
1347 unit.v_correlation[correlation_id].count++;
1348 unit.v_correlation[correlation_id].total_ts +=
1349 (ts - ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts);
1350 }
1351
1352 unit.latest_ts = ts;
1353 return Status::OK();
1354}
1355
1356// when a new trace statistic is created, the file handler
1357// pointers should be initiated if needed according to
1358// the trace analyzer options
1359Status TraceAnalyzer::OpenStatsOutputFiles(const std::string& type,
1360 TraceStats& new_stats) {
1361 Status s;
1362 if (FLAGS_output_key_stats) {
1363 s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_stats.txt",
1364 &new_stats.a_key_f);
494da23a
TL
1365 s = CreateOutputFile(type, new_stats.cf_name,
1366 "accessed_unique_key_num_change.txt",
1367 &new_stats.a_key_num_f);
11fdf7f2
TL
1368 if (!FLAGS_key_space_dir.empty()) {
1369 s = CreateOutputFile(type, new_stats.cf_name, "whole_key_stats.txt",
1370 &new_stats.w_key_f);
1371 }
1372 }
1373
1374 if (FLAGS_output_access_count_stats) {
1375 s = CreateOutputFile(type, new_stats.cf_name,
1376 "accessed_key_count_distribution.txt",
1377 &new_stats.a_count_dist_f);
1378 }
1379
1380 if (FLAGS_output_prefix_cut > 0) {
1381 s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_prefix_cut.txt",
1382 &new_stats.a_prefix_cut_f);
1383 if (!FLAGS_key_space_dir.empty()) {
1384 s = CreateOutputFile(type, new_stats.cf_name, "whole_key_prefix_cut.txt",
1385 &new_stats.w_prefix_cut_f);
1386 }
1387
1388 if (FLAGS_output_qps_stats) {
1389 s = CreateOutputFile(type, new_stats.cf_name,
1390 "accessed_top_k_qps_prefix_cut.txt",
1391 &new_stats.a_top_qps_prefix_f);
1392 }
1393 }
1394
1395 if (FLAGS_output_time_series) {
1396 s = CreateOutputFile(type, new_stats.cf_name, "time_series.txt",
1397 &new_stats.time_series_f);
1398 }
1399
1400 if (FLAGS_output_value_distribution) {
1401 s = CreateOutputFile(type, new_stats.cf_name,
1402 "accessed_value_size_distribution.txt",
1403 &new_stats.a_value_size_f);
1404 }
1405
494da23a
TL
1406 if (FLAGS_output_key_distribution) {
1407 s = CreateOutputFile(type, new_stats.cf_name,
1408 "accessed_key_size_distribution.txt",
1409 &new_stats.a_key_size_f);
1410 }
1411
11fdf7f2
TL
1412 if (FLAGS_output_qps_stats) {
1413 s = CreateOutputFile(type, new_stats.cf_name, "qps_stats.txt",
1414 &new_stats.a_qps_f);
1415 }
1416
1417 return Status::OK();
1418}
1419
1420// create the output path of the files to be opened
1421Status TraceAnalyzer::CreateOutputFile(
1422 const std::string& type, const std::string& cf_name,
1423 const std::string& ending, std::unique_ptr<rocksdb::WritableFile>* f_ptr) {
1424 std::string path;
1425 path = output_path_ + "/" + FLAGS_output_prefix + "-" + type + "-" + cf_name +
1426 "-" + ending;
1427 Status s;
1428 s = env_->NewWritableFile(path, f_ptr, env_options_);
1429 if (!s.ok()) {
1430 fprintf(stderr, "Cannot open file: %s\n", path.c_str());
1431 exit(1);
1432 }
1433 return Status::OK();
1434}
1435
1436// Close the output files in the TraceStats if they are opened
1437void TraceAnalyzer::CloseOutputFiles() {
1438 for (int type = 0; type < kTaTypeNum; type++) {
1439 if (!ta_[type].enabled) {
1440 continue;
1441 }
1442 for (auto& stat : ta_[type].stats) {
1443 if (stat.second.time_series_f) {
1444 stat.second.time_series_f->Close();
1445 }
1446
1447 if (stat.second.a_key_f) {
1448 stat.second.a_key_f->Close();
1449 }
1450
494da23a
TL
1451 if (stat.second.a_key_num_f) {
1452 stat.second.a_key_num_f->Close();
1453 }
1454
11fdf7f2
TL
1455 if (stat.second.a_count_dist_f) {
1456 stat.second.a_count_dist_f->Close();
1457 }
1458
1459 if (stat.second.a_prefix_cut_f) {
1460 stat.second.a_prefix_cut_f->Close();
1461 }
1462
1463 if (stat.second.a_value_size_f) {
1464 stat.second.a_value_size_f->Close();
1465 }
1466
494da23a
TL
1467 if (stat.second.a_key_size_f) {
1468 stat.second.a_key_size_f->Close();
1469 }
1470
11fdf7f2
TL
1471 if (stat.second.a_qps_f) {
1472 stat.second.a_qps_f->Close();
1473 }
1474
1475 if (stat.second.a_top_qps_prefix_f) {
1476 stat.second.a_top_qps_prefix_f->Close();
1477 }
1478
1479 if (stat.second.w_key_f) {
1480 stat.second.w_key_f->Close();
1481 }
1482 if (stat.second.w_prefix_cut_f) {
1483 stat.second.w_prefix_cut_f->Close();
1484 }
1485 }
1486 }
1487 return;
1488}
1489
1490// Handle the Get request in the trace
1491Status TraceAnalyzer::HandleGet(uint32_t column_family_id,
1492 const std::string& key, const uint64_t& ts,
1493 const uint32_t& get_ret) {
1494 Status s;
1495 size_t value_size = 0;
1496 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1497 s = WriteTraceSequence(TraceOperationType::kGet, column_family_id, key,
1498 value_size, ts);
1499 if (!s.ok()) {
1500 return Status::Corruption("Failed to write the trace sequence to file");
1501 }
1502 }
1503
494da23a
TL
1504 if (ta_[TraceOperationType::kGet].sample_count >= sample_max_) {
1505 ta_[TraceOperationType::kGet].sample_count = 0;
1506 }
1507 if (ta_[TraceOperationType::kGet].sample_count > 0) {
1508 ta_[TraceOperationType::kGet].sample_count++;
1509 return Status::OK();
1510 }
1511 ta_[TraceOperationType::kGet].sample_count++;
1512
11fdf7f2
TL
1513 if (!ta_[TraceOperationType::kGet].enabled) {
1514 return Status::OK();
1515 }
1516 if (get_ret == 1) {
1517 value_size = 10;
1518 }
1519 s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, key,
1520 value_size, ts);
1521 if (!s.ok()) {
1522 return Status::Corruption("Failed to insert key statistics");
1523 }
1524 return s;
1525}
1526
1527// Handle the Put request in the write batch of the trace
1528Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key,
1529 const Slice& value) {
1530 Status s;
1531 size_t value_size = value.ToString().size();
1532 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1533 s = WriteTraceSequence(TraceOperationType::kPut, column_family_id,
1534 key.ToString(), value_size, c_time_);
1535 if (!s.ok()) {
1536 return Status::Corruption("Failed to write the trace sequence to file");
1537 }
1538 }
1539
494da23a
TL
1540 if (ta_[TraceOperationType::kPut].sample_count >= sample_max_) {
1541 ta_[TraceOperationType::kPut].sample_count = 0;
1542 }
1543 if (ta_[TraceOperationType::kPut].sample_count > 0) {
1544 ta_[TraceOperationType::kPut].sample_count++;
1545 return Status::OK();
1546 }
1547 ta_[TraceOperationType::kPut].sample_count++;
1548
11fdf7f2
TL
1549 if (!ta_[TraceOperationType::kPut].enabled) {
1550 return Status::OK();
1551 }
1552 s = KeyStatsInsertion(TraceOperationType::kPut, column_family_id,
1553 key.ToString(), value_size, c_time_);
1554 if (!s.ok()) {
1555 return Status::Corruption("Failed to insert key statistics");
1556 }
1557 return s;
1558}
1559
1560// Handle the Delete request in the write batch of the trace
1561Status TraceAnalyzer::HandleDelete(uint32_t column_family_id,
1562 const Slice& key) {
1563 Status s;
1564 size_t value_size = 0;
1565 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1566 s = WriteTraceSequence(TraceOperationType::kDelete, column_family_id,
1567 key.ToString(), value_size, c_time_);
1568 if (!s.ok()) {
1569 return Status::Corruption("Failed to write the trace sequence to file");
1570 }
1571 }
1572
494da23a
TL
1573 if (ta_[TraceOperationType::kDelete].sample_count >= sample_max_) {
1574 ta_[TraceOperationType::kDelete].sample_count = 0;
1575 }
1576 if (ta_[TraceOperationType::kDelete].sample_count > 0) {
1577 ta_[TraceOperationType::kDelete].sample_count++;
1578 return Status::OK();
1579 }
1580 ta_[TraceOperationType::kDelete].sample_count++;
1581
11fdf7f2
TL
1582 if (!ta_[TraceOperationType::kDelete].enabled) {
1583 return Status::OK();
1584 }
1585 s = KeyStatsInsertion(TraceOperationType::kDelete, column_family_id,
1586 key.ToString(), value_size, c_time_);
1587 if (!s.ok()) {
1588 return Status::Corruption("Failed to insert key statistics");
1589 }
1590 return s;
1591}
1592
1593// Handle the SingleDelete request in the write batch of the trace
1594Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id,
1595 const Slice& key) {
1596 Status s;
1597 size_t value_size = 0;
1598 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1599 s = WriteTraceSequence(TraceOperationType::kSingleDelete, column_family_id,
1600 key.ToString(), value_size, c_time_);
1601 if (!s.ok()) {
1602 return Status::Corruption("Failed to write the trace sequence to file");
1603 }
1604 }
1605
494da23a
TL
1606 if (ta_[TraceOperationType::kSingleDelete].sample_count >= sample_max_) {
1607 ta_[TraceOperationType::kSingleDelete].sample_count = 0;
1608 }
1609 if (ta_[TraceOperationType::kSingleDelete].sample_count > 0) {
1610 ta_[TraceOperationType::kSingleDelete].sample_count++;
1611 return Status::OK();
1612 }
1613 ta_[TraceOperationType::kSingleDelete].sample_count++;
1614
11fdf7f2
TL
1615 if (!ta_[TraceOperationType::kSingleDelete].enabled) {
1616 return Status::OK();
1617 }
1618 s = KeyStatsInsertion(TraceOperationType::kSingleDelete, column_family_id,
1619 key.ToString(), value_size, c_time_);
1620 if (!s.ok()) {
1621 return Status::Corruption("Failed to insert key statistics");
1622 }
1623 return s;
1624}
1625
1626// Handle the DeleteRange request in the write batch of the trace
1627Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id,
1628 const Slice& begin_key,
1629 const Slice& end_key) {
1630 Status s;
1631 size_t value_size = 0;
1632 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1633 s = WriteTraceSequence(TraceOperationType::kRangeDelete, column_family_id,
1634 begin_key.ToString(), value_size, c_time_);
1635 if (!s.ok()) {
1636 return Status::Corruption("Failed to write the trace sequence to file");
1637 }
1638 }
1639
494da23a
TL
1640 if (ta_[TraceOperationType::kRangeDelete].sample_count >= sample_max_) {
1641 ta_[TraceOperationType::kRangeDelete].sample_count = 0;
1642 }
1643 if (ta_[TraceOperationType::kRangeDelete].sample_count > 0) {
1644 ta_[TraceOperationType::kRangeDelete].sample_count++;
1645 return Status::OK();
1646 }
1647 ta_[TraceOperationType::kRangeDelete].sample_count++;
1648
11fdf7f2
TL
1649 if (!ta_[TraceOperationType::kRangeDelete].enabled) {
1650 return Status::OK();
1651 }
1652 s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id,
1653 begin_key.ToString(), value_size, c_time_);
1654 s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id,
1655 end_key.ToString(), value_size, c_time_);
1656 if (!s.ok()) {
1657 return Status::Corruption("Failed to insert key statistics");
1658 }
1659 return s;
1660}
1661
1662// Handle the Merge request in the write batch of the trace
1663Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key,
1664 const Slice& value) {
1665 Status s;
1666 size_t value_size = value.ToString().size();
1667 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1668 s = WriteTraceSequence(TraceOperationType::kMerge, column_family_id,
1669 key.ToString(), value_size, c_time_);
1670 if (!s.ok()) {
1671 return Status::Corruption("Failed to write the trace sequence to file");
1672 }
1673 }
1674
494da23a
TL
1675 if (ta_[TraceOperationType::kMerge].sample_count >= sample_max_) {
1676 ta_[TraceOperationType::kMerge].sample_count = 0;
1677 }
1678 if (ta_[TraceOperationType::kMerge].sample_count > 0) {
1679 ta_[TraceOperationType::kMerge].sample_count++;
1680 return Status::OK();
1681 }
1682 ta_[TraceOperationType::kMerge].sample_count++;
1683
11fdf7f2
TL
1684 if (!ta_[TraceOperationType::kMerge].enabled) {
1685 return Status::OK();
1686 }
1687 s = KeyStatsInsertion(TraceOperationType::kMerge, column_family_id,
1688 key.ToString(), value_size, c_time_);
1689 if (!s.ok()) {
1690 return Status::Corruption("Failed to insert key statistics");
1691 }
1692 return s;
1693}
1694
1695// Handle the Iterator request in the trace
1696Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
1697 const std::string& key, const uint64_t& ts,
1698 TraceType& trace_type) {
1699 Status s;
1700 size_t value_size = 0;
1701 int type = -1;
1702 if (trace_type == kTraceIteratorSeek) {
1703 type = TraceOperationType::kIteratorSeek;
1704 } else if (trace_type == kTraceIteratorSeekForPrev) {
1705 type = TraceOperationType::kIteratorSeekForPrev;
1706 } else {
1707 return s;
1708 }
1709 if (type == -1) {
1710 return s;
1711 }
1712
1713 if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
1714 s = WriteTraceSequence(type, column_family_id, key, value_size, ts);
1715 if (!s.ok()) {
1716 return Status::Corruption("Failed to write the trace sequence to file");
1717 }
1718 }
1719
494da23a
TL
1720 if (ta_[type].sample_count >= sample_max_) {
1721 ta_[type].sample_count = 0;
1722 }
1723 if (ta_[type].sample_count > 0) {
1724 ta_[type].sample_count++;
1725 return Status::OK();
1726 }
1727 ta_[type].sample_count++;
1728
11fdf7f2
TL
1729 if (!ta_[type].enabled) {
1730 return Status::OK();
1731 }
1732 s = KeyStatsInsertion(type, column_family_id, key, value_size, ts);
1733 if (!s.ok()) {
1734 return Status::Corruption("Failed to insert key statistics");
1735 }
1736 return s;
1737}
1738
1739// Before the analyzer is closed, the requested general statistic results are
1740// printed out here. In current stage, these information are not output to
1741// the files.
1742// -----type
1743// |__cf_id
1744// |_statistics
1745void TraceAnalyzer::PrintStatistics() {
1746 for (int type = 0; type < kTaTypeNum; type++) {
1747 if (!ta_[type].enabled) {
1748 continue;
1749 }
1750 ta_[type].total_keys = 0;
1751 ta_[type].total_access = 0;
1752 ta_[type].total_succ_access = 0;
1753 printf("\n################# Operation Type: %s #####################\n",
1754 ta_[type].type_name.c_str());
1755 if (qps_ave_.size() == kTaTypeNum + 1) {
1756 printf("Peak QPS is: %u Average QPS is: %f\n", qps_peak_[type],
1757 qps_ave_[type]);
1758 }
1759 for (auto& stat_it : ta_[type].stats) {
1760 if (stat_it.second.a_count == 0) {
1761 continue;
1762 }
1763 TraceStats& stat = stat_it.second;
1764 uint64_t total_a_keys = static_cast<uint64_t>(stat.a_key_stats.size());
1765 double key_size_ave = 0.0;
1766 double value_size_ave = 0.0;
1767 double key_size_vari = 0.0;
1768 double value_size_vari = 0.0;
1769 if (stat.a_count > 0) {
1770 key_size_ave =
1771 (static_cast<double>(stat.a_key_size_sum)) / stat.a_count;
1772 value_size_ave =
1773 (static_cast<double>(stat.a_value_size_sum)) / stat.a_count;
1774 key_size_vari = std::sqrt((static_cast<double>(stat.a_key_size_sqsum)) /
1775 stat.a_count -
1776 key_size_ave * key_size_ave);
1777 value_size_vari = std::sqrt(
1778 (static_cast<double>(stat.a_value_size_sqsum)) / stat.a_count -
1779 value_size_ave * value_size_ave);
1780 }
1781 if (value_size_ave == 0.0) {
1782 stat.a_value_mid = 0;
1783 }
1784 cfs_[stat.cf_id].a_count += total_a_keys;
1785 ta_[type].total_keys += total_a_keys;
1786 ta_[type].total_access += stat.a_count;
1787 ta_[type].total_succ_access += stat.a_succ_count;
1788 printf("*********************************************************\n");
1789 printf("colume family id: %u\n", stat.cf_id);
494da23a
TL
1790 printf("Total number of queries to this cf by %s: %" PRIu64 "\n",
1791 ta_[type].type_name.c_str(), stat.a_count);
11fdf7f2
TL
1792 printf("Total unique keys in this cf: %" PRIu64 "\n", total_a_keys);
1793 printf("Average key size: %f key size medium: %" PRIu64
1794 " Key size Variation: %f\n",
1795 key_size_ave, stat.a_key_mid, key_size_vari);
1796 if (type == kPut || type == kMerge) {
1797 printf("Average value size: %f Value size medium: %" PRIu64
1798 " Value size variation: %f\n",
1799 value_size_ave, stat.a_value_mid, value_size_vari);
1800 }
1801 printf("Peak QPS is: %u Average QPS is: %f\n", stat.a_peak_qps,
1802 stat.a_ave_qps);
1803
1804 // print the top k accessed key and its access count
1805 if (FLAGS_print_top_k_access > 0) {
1806 printf("The Top %d keys that are accessed:\n",
1807 FLAGS_print_top_k_access);
1808 while (!stat.top_k_queue.empty()) {
1809 std::string hex_key =
1810 rocksdb::LDBCommand::StringToHex(stat.top_k_queue.top().second);
1811 printf("Access_count: %" PRIu64 " %s\n", stat.top_k_queue.top().first,
1812 hex_key.c_str());
1813 stat.top_k_queue.pop();
1814 }
1815 }
1816
1817 // print the top k access prefix range and
1818 // top k prefix range with highest average access per key
1819 if (FLAGS_output_prefix_cut > 0) {
1820 printf("The Top %d accessed prefix range:\n", FLAGS_print_top_k_access);
1821 while (!stat.top_k_prefix_access.empty()) {
1822 printf("Prefix: %s Access count: %" PRIu64 "\n",
1823 stat.top_k_prefix_access.top().second.c_str(),
1824 stat.top_k_prefix_access.top().first);
1825 stat.top_k_prefix_access.pop();
1826 }
1827
1828 printf("The Top %d prefix with highest access per key:\n",
1829 FLAGS_print_top_k_access);
1830 while (!stat.top_k_prefix_ave.empty()) {
1831 printf("Prefix: %s access per key: %f\n",
1832 stat.top_k_prefix_ave.top().second.c_str(),
1833 stat.top_k_prefix_ave.top().first);
1834 stat.top_k_prefix_ave.pop();
1835 }
1836 }
1837
11fdf7f2
TL
1838 // print the operation correlations
1839 if (!FLAGS_print_correlation.empty()) {
1840 for (int correlation = 0;
1841 correlation <
1842 static_cast<int>(analyzer_opts_.correlation_list.size());
1843 correlation++) {
1844 printf(
1845 "The correlation statistics of '%s' after '%s' is:",
1846 taIndexToOpt[analyzer_opts_.correlation_list[correlation].second]
1847 .c_str(),
1848 taIndexToOpt[analyzer_opts_.correlation_list[correlation].first]
1849 .c_str());
1850 double correlation_ave = 0.0;
1851 if (stat.correlation_output[correlation].first > 0) {
1852 correlation_ave =
1853 (static_cast<double>(
1854 stat.correlation_output[correlation].second)) /
1855 (stat.correlation_output[correlation].first * 1000);
1856 }
1857 printf(" total numbers: %" PRIu64 " average time: %f(ms)\n",
1858 stat.correlation_output[correlation].first, correlation_ave);
1859 }
1860 }
1861 }
1862 printf("*********************************************************\n");
1863 printf("Total keys of '%s' is: %" PRIu64 "\n", ta_[type].type_name.c_str(),
1864 ta_[type].total_keys);
1865 printf("Total access is: %" PRIu64 "\n", ta_[type].total_access);
1866 total_access_keys_ += ta_[type].total_keys;
1867 }
1868
1869 // Print the overall statistic information of the trace
1870 printf("\n*********************************************************\n");
1871 printf("*********************************************************\n");
1872 printf("The column family based statistics\n");
1873 for (auto& cf : cfs_) {
1874 printf("The column family id: %u\n", cf.first);
1875 printf("The whole key space key numbers: %" PRIu64 "\n", cf.second.w_count);
1876 printf("The accessed key space key numbers: %" PRIu64 "\n",
1877 cf.second.a_count);
1878 }
1879
1880 if (FLAGS_print_overall_stats) {
1881 printf("\n*********************************************************\n");
1882 printf("*********************************************************\n");
1883 if (qps_peak_.size() == kTaTypeNum + 1) {
1884 printf("Average QPS per second: %f Peak QPS: %u\n", qps_ave_[kTaTypeNum],
1885 qps_peak_[kTaTypeNum]);
1886 }
494da23a
TL
1887 printf("The statistics related to query number need to times: %u\n",
1888 sample_max_);
11fdf7f2
TL
1889 printf("Total_requests: %" PRIu64 " Total_accessed_keys: %" PRIu64
1890 " Total_gets: %" PRIu64 " Total_write_batch: %" PRIu64 "\n",
1891 total_requests_, total_access_keys_, total_gets_, total_writes_);
1892 for (int type = 0; type < kTaTypeNum; type++) {
1893 if (!ta_[type].enabled) {
1894 continue;
1895 }
1896 printf("Operation: '%s' has: %" PRIu64 "\n", ta_[type].type_name.c_str(),
1897 ta_[type].total_access);
1898 }
1899 }
1900}
1901
1902// Write the trace sequence to file
1903Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type,
1904 const uint32_t& cf_id,
1905 const std::string& key,
1906 const size_t value_size,
1907 const uint64_t ts) {
1908 std::string hex_key = rocksdb::LDBCommand::StringToHex(key);
1909 int ret;
1910 ret =
1911 sprintf(buffer_, "%u %u %zu %" PRIu64 "\n", type, cf_id, value_size, ts);
1912 if (ret < 0) {
1913 return Status::IOError("failed to format the output");
1914 }
1915 std::string printout(buffer_);
1916 if (!FLAGS_no_key) {
1917 printout = hex_key + " " + printout;
1918 }
1919 return trace_sequence_f_->Append(printout);
1920}
1921
1922// The entrance function of Trace_Analyzer
1923int trace_analyzer_tool(int argc, char** argv) {
1924 std::string trace_path;
1925 std::string output_path;
1926
1927 AnalyzerOptions analyzer_opts;
1928
1929 ParseCommandLineFlags(&argc, &argv, true);
1930
1931 if (!FLAGS_print_correlation.empty()) {
1932 analyzer_opts.SparseCorrelationInput(FLAGS_print_correlation);
1933 }
1934
1935 std::unique_ptr<TraceAnalyzer> analyzer(
1936 new TraceAnalyzer(FLAGS_trace_path, FLAGS_output_dir, analyzer_opts));
1937
1938 if (!analyzer) {
1939 fprintf(stderr, "Cannot initiate the trace analyzer\n");
1940 exit(1);
1941 }
1942
1943 rocksdb::Status s = analyzer->PrepareProcessing();
1944 if (!s.ok()) {
1945 fprintf(stderr, "%s\n", s.getState());
1946 fprintf(stderr, "Cannot initiate the trace reader\n");
1947 exit(1);
1948 }
1949
1950 s = analyzer->StartProcessing();
494da23a 1951 if (!s.ok() && !FLAGS_try_process_corrupted_trace) {
11fdf7f2
TL
1952 fprintf(stderr, "%s\n", s.getState());
1953 fprintf(stderr, "Cannot processing the trace\n");
1954 exit(1);
1955 }
1956
1957 s = analyzer->MakeStatistics();
1958 if (!s.ok()) {
1959 fprintf(stderr, "%s\n", s.getState());
1960 analyzer->EndProcessing();
1961 fprintf(stderr, "Cannot make the statistics\n");
1962 exit(1);
1963 }
1964
1965 s = analyzer->ReProcessing();
1966 if (!s.ok()) {
1967 fprintf(stderr, "%s\n", s.getState());
1968 fprintf(stderr, "Cannot re-process the trace for more statistics\n");
1969 analyzer->EndProcessing();
1970 exit(1);
1971 }
1972
1973 s = analyzer->EndProcessing();
1974 if (!s.ok()) {
1975 fprintf(stderr, "%s\n", s.getState());
1976 fprintf(stderr, "Cannot close the trace analyzer\n");
1977 exit(1);
1978 }
1979
1980 return 0;
1981}
1982} // namespace rocksdb
1983
1984#endif // Endif of Gflag
1985#endif // RocksDB LITE