]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/tools/db_bench_tool.cc
0cb4e0eb27ec185d085d9db4f4c8f74e4ec1b6ac
[ceph.git] / ceph / src / rocksdb / tools / db_bench_tool.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifndef __STDC_FORMAT_MACROS
11 #define __STDC_FORMAT_MACROS
12 #endif
13
14 #ifdef GFLAGS
15 #ifdef NUMA
16 #include <numa.h>
17 #include <numaif.h>
18 #endif
19 #ifndef OS_WIN
20 #include <unistd.h>
21 #endif
22 #include <fcntl.h>
23 #include <inttypes.h>
24 #include <math.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <sys/types.h>
28 #include <atomic>
29 #include <condition_variable>
30 #include <cstddef>
31 #include <memory>
32 #include <mutex>
33 #include <thread>
34 #include <unordered_map>
35
36 #include "db/db_impl.h"
37 #include "db/malloc_stats.h"
38 #include "db/version_set.h"
39 #include "hdfs/env_hdfs.h"
40 #include "monitoring/histogram.h"
41 #include "monitoring/statistics.h"
42 #include "options/cf_options.h"
43 #include "port/port.h"
44 #include "port/stack_trace.h"
45 #include "rocksdb/cache.h"
46 #include "rocksdb/db.h"
47 #include "rocksdb/env.h"
48 #include "rocksdb/filter_policy.h"
49 #include "rocksdb/memtablerep.h"
50 #include "rocksdb/options.h"
51 #include "rocksdb/perf_context.h"
52 #include "rocksdb/persistent_cache.h"
53 #include "rocksdb/rate_limiter.h"
54 #include "rocksdb/slice.h"
55 #include "rocksdb/slice_transform.h"
56 #include "rocksdb/utilities/object_registry.h"
57 #include "rocksdb/utilities/optimistic_transaction_db.h"
58 #include "rocksdb/utilities/options_util.h"
59 #include "rocksdb/utilities/sim_cache.h"
60 #include "rocksdb/utilities/transaction.h"
61 #include "rocksdb/utilities/transaction_db.h"
62 #include "rocksdb/write_batch.h"
63 #include "util/cast_util.h"
64 #include "util/compression.h"
65 #include "util/crc32c.h"
66 #include "util/gflags_compat.h"
67 #include "util/mutexlock.h"
68 #include "util/random.h"
69 #include "util/stderr_logger.h"
70 #include "util/string_util.h"
71 #include "util/testutil.h"
72 #include "util/transaction_test_util.h"
73 #include "util/xxhash.h"
74 #include "utilities/blob_db/blob_db.h"
75 #include "utilities/merge_operators.h"
76 #include "utilities/merge_operators/bytesxor.h"
77 #include "utilities/persistent_cache/block_cache_tier.h"
78
79 #ifdef OS_WIN
80 #include <io.h> // open/close
81 #endif
82
83 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
84 using GFLAGS_NAMESPACE::RegisterFlagValidator;
85 using GFLAGS_NAMESPACE::SetUsageMessage;
86
87 DEFINE_string(
88 benchmarks,
89 "fillseq,"
90 "fillseqdeterministic,"
91 "fillsync,"
92 "fillrandom,"
93 "filluniquerandomdeterministic,"
94 "overwrite,"
95 "readrandom,"
96 "newiterator,"
97 "newiteratorwhilewriting,"
98 "seekrandom,"
99 "seekrandomwhilewriting,"
100 "seekrandomwhilemerging,"
101 "readseq,"
102 "readreverse,"
103 "compact,"
104 "compactall,"
105 "multireadrandom,"
106 "mixgraph,"
107 "readseq,"
108 "readtocache,"
109 "readreverse,"
110 "readwhilewriting,"
111 "readwhilemerging,"
112 "readwhilescanning,"
113 "readrandomwriterandom,"
114 "updaterandom,"
115 "xorupdaterandom,"
116 "randomwithverify,"
117 "fill100K,"
118 "crc32c,"
119 "xxhash,"
120 "compress,"
121 "uncompress,"
122 "acquireload,"
123 "fillseekseq,"
124 "randomtransaction,"
125 "randomreplacekeys,"
126 "timeseries",
127
128 "Comma-separated list of operations to run in the specified"
129 " order. Available benchmarks:\n"
130 "\tfillseq -- write N values in sequential key"
131 " order in async mode\n"
132 "\tfillseqdeterministic -- write N values in the specified"
133 " key order and keep the shape of the LSM tree\n"
134 "\tfillrandom -- write N values in random key order in async"
135 " mode\n"
136 "\tfilluniquerandomdeterministic -- write N values in a random"
137 " key order and keep the shape of the LSM tree\n"
138 "\toverwrite -- overwrite N values in random key order in"
139 " async mode\n"
140 "\tfillsync -- write N/100 values in random key order in "
141 "sync mode\n"
142 "\tfill100K -- write N/1000 100K values in random order in"
143 " async mode\n"
144 "\tdeleteseq -- delete N keys in sequential order\n"
145 "\tdeleterandom -- delete N keys in random order\n"
146 "\treadseq -- read N times sequentially\n"
147 "\treadtocache -- 1 thread reading database sequentially\n"
148 "\treadreverse -- read N times in reverse order\n"
149 "\treadrandom -- read N times in random order\n"
150 "\treadmissing -- read N missing keys in random order\n"
151 "\treadwhilewriting -- 1 writer, N threads doing random "
152 "reads\n"
153 "\treadwhilemerging -- 1 merger, N threads doing random "
154 "reads\n"
155 "\treadwhilescanning -- 1 thread doing full table scan, "
156 "N threads doing random reads\n"
157 "\treadrandomwriterandom -- N threads doing random-read, "
158 "random-write\n"
159 "\tupdaterandom -- N threads doing read-modify-write for random "
160 "keys\n"
161 "\txorupdaterandom -- N threads doing read-XOR-write for "
162 "random keys\n"
163 "\tappendrandom -- N threads doing read-modify-write with "
164 "growing values\n"
165 "\tmergerandom -- same as updaterandom/appendrandom using merge"
166 " operator. "
167 "Must be used with merge_operator\n"
168 "\treadrandommergerandom -- perform N random read-or-merge "
169 "operations. Must be used with merge_operator\n"
170 "\tnewiterator -- repeated iterator creation\n"
171 "\tseekrandom -- N random seeks, call Next seek_nexts times "
172 "per seek\n"
173 "\tseekrandomwhilewriting -- seekrandom and 1 thread doing "
174 "overwrite\n"
175 "\tseekrandomwhilemerging -- seekrandom and 1 thread doing "
176 "merge\n"
177 "\tcrc32c -- repeated crc32c of 4K of data\n"
178 "\txxhash -- repeated xxHash of 4K of data\n"
179 "\tacquireload -- load N*1000 times\n"
180 "\tfillseekseq -- write N values in sequential key, then read "
181 "them by seeking to each key\n"
182 "\trandomtransaction -- execute N random transactions and "
183 "verify correctness\n"
184 "\trandomreplacekeys -- randomly replaces N keys by deleting "
185 "the old version and putting the new version\n\n"
186 "\ttimeseries -- 1 writer generates time series data "
187 "and multiple readers doing random reads on id\n\n"
188 "Meta operations:\n"
189 "\tcompact -- Compact the entire DB; If multiple, randomly choose one\n"
190 "\tcompactall -- Compact the entire DB\n"
191 "\tstats -- Print DB stats\n"
192 "\tresetstats -- Reset DB stats\n"
193 "\tlevelstats -- Print the number of files and bytes per level\n"
194 "\tsstables -- Print sstable info\n"
195 "\theapprofile -- Dump a heap profile (if supported by this port)\n"
196 "\treplay -- replay the trace file specified with trace_file\n");
197
198 DEFINE_int64(num, 1000000, "Number of key/values to place in database");
199
200 DEFINE_int64(numdistinct, 1000,
201 "Number of distinct keys to use. Used in RandomWithVerify to "
202 "read/write on fewer keys so that gets are more likely to find the"
203 " key and puts are more likely to update the same key");
204
205 DEFINE_int64(merge_keys, -1,
206 "Number of distinct keys to use for MergeRandom and "
207 "ReadRandomMergeRandom. "
208 "If negative, there will be FLAGS_num keys.");
209 DEFINE_int32(num_column_families, 1, "Number of Column Families to use.");
210
211 DEFINE_int32(
212 num_hot_column_families, 0,
213 "Number of Hot Column Families. If more than 0, only write to this "
214 "number of column families. After finishing all the writes to them, "
215 "create new set of column families and insert to them. Only used "
216 "when num_column_families > 1.");
217
218 DEFINE_string(column_family_distribution, "",
219 "Comma-separated list of percentages, where the ith element "
220 "indicates the probability of an op using the ith column family. "
221 "The number of elements must be `num_hot_column_families` if "
222 "specified; otherwise, it must be `num_column_families`. The "
223 "sum of elements must be 100. E.g., if `num_column_families=4`, "
224 "and `num_hot_column_families=0`, a valid list could be "
225 "\"10,20,30,40\".");
226
227 DEFINE_int64(reads, -1, "Number of read operations to do. "
228 "If negative, do FLAGS_num reads.");
229
230 DEFINE_int64(deletes, -1, "Number of delete operations to do. "
231 "If negative, do FLAGS_num deletions.");
232
233 DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality");
234
235 DEFINE_int64(seed, 0, "Seed base for random number generators. "
236 "When 0 it is deterministic.");
237
238 DEFINE_int32(threads, 1, "Number of concurrent threads to run.");
239
240 DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
241 " When 0 then num & reads determine the test duration");
242
243 DEFINE_int32(value_size, 100, "Size of each value");
244
245 DEFINE_int32(seek_nexts, 0,
246 "How many times to call Next() after Seek() in "
247 "fillseekseq, seekrandom, seekrandomwhilewriting and "
248 "seekrandomwhilemerging");
249
250 DEFINE_bool(reverse_iterator, false,
251 "When true use Prev rather than Next for iterators that do "
252 "Seek and then Next");
253
254 DEFINE_int64(max_scan_distance, 0,
255 "Used to define iterate_upper_bound (or iterate_lower_bound "
256 "if FLAGS_reverse_iterator is set to true) when value is nonzero");
257
258 DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
259
260 DEFINE_int64(batch_size, 1, "Batch size");
261
262 static bool ValidateKeySize(const char* /*flagname*/, int32_t /*value*/) {
263 return true;
264 }
265
266 static bool ValidateUint32Range(const char* flagname, uint64_t value) {
267 if (value > std::numeric_limits<uint32_t>::max()) {
268 fprintf(stderr, "Invalid value for --%s: %lu, overflow\n", flagname,
269 (unsigned long)value);
270 return false;
271 }
272 return true;
273 }
274
275 DEFINE_int32(key_size, 16, "size of each key");
276
277 DEFINE_int32(num_multi_db, 0,
278 "Number of DBs used in the benchmark. 0 means single DB.");
279
280 DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink"
281 " to this fraction of their original size after compression");
282
283 DEFINE_double(read_random_exp_range, 0.0,
284 "Read random's key will be generated using distribution of "
285 "num * exp(-r) where r is uniform number from 0 to this value. "
286 "The larger the number is, the more skewed the reads are. "
287 "Only used in readrandom and multireadrandom benchmarks.");
288
289 DEFINE_bool(histogram, false, "Print histogram of operation timings");
290
291 DEFINE_bool(enable_numa, false,
292 "Make operations aware of NUMA architecture and bind memory "
293 "and cpus corresponding to nodes together. In NUMA, memory "
294 "in same node as CPUs are closer when compared to memory in "
295 "other nodes. Reads can be faster when the process is bound to "
296 "CPU and memory of same node. Use \"$numactl --hardware\" command "
297 "to see NUMA memory architecture.");
298
299 DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
300 "Number of bytes to buffer in all memtables before compacting");
301
302 DEFINE_bool(cost_write_buffer_to_cache, false,
303 "The usage of memtable is costed to the block cache");
304
305 DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
306 "Number of bytes to buffer in memtable before compacting");
307
308 DEFINE_int32(max_write_buffer_number,
309 rocksdb::Options().max_write_buffer_number,
310 "The number of in-memory memtables. Each memtable is of size"
311 " write_buffer_size bytes.");
312
313 DEFINE_int32(min_write_buffer_number_to_merge,
314 rocksdb::Options().min_write_buffer_number_to_merge,
315 "The minimum number of write buffers that will be merged together"
316 "before writing to storage. This is cheap because it is an"
317 "in-memory merge. If this feature is not enabled, then all these"
318 "write buffers are flushed to L0 as separate files and this "
319 "increases read amplification because a get request has to check"
320 " in all of these files. Also, an in-memory merge may result in"
321 " writing less data to storage if there are duplicate records "
322 " in each of these individual write buffers.");
323
324 DEFINE_int32(max_write_buffer_number_to_maintain,
325 rocksdb::Options().max_write_buffer_number_to_maintain,
326 "The total maximum number of write buffers to maintain in memory "
327 "including copies of buffers that have already been flushed. "
328 "Unlike max_write_buffer_number, this parameter does not affect "
329 "flushing. This controls the minimum amount of write history "
330 "that will be available in memory for conflict checking when "
331 "Transactions are used. If this value is too low, some "
332 "transactions may fail at commit time due to not being able to "
333 "determine whether there were any write conflicts. Setting this "
334 "value to 0 will cause write buffers to be freed immediately "
335 "after they are flushed. If this value is set to -1, "
336 "'max_write_buffer_number' will be used.");
337
338 DEFINE_int32(max_background_jobs,
339 rocksdb::Options().max_background_jobs,
340 "The maximum number of concurrent background jobs that can occur "
341 "in parallel.");
342
343 DEFINE_int32(num_bottom_pri_threads, 0,
344 "The number of threads in the bottom-priority thread pool (used "
345 "by universal compaction only).");
346
347 DEFINE_int32(num_high_pri_threads, 0,
348 "The maximum number of concurrent background compactions"
349 " that can occur in parallel.");
350
351 DEFINE_int32(num_low_pri_threads, 0,
352 "The maximum number of concurrent background compactions"
353 " that can occur in parallel.");
354
355 DEFINE_int32(max_background_compactions,
356 rocksdb::Options().max_background_compactions,
357 "The maximum number of concurrent background compactions"
358 " that can occur in parallel.");
359
360 DEFINE_int32(base_background_compactions, -1, "DEPRECATED");
361
362 DEFINE_uint64(subcompactions, 1,
363 "Maximum number of subcompactions to divide L0-L1 compactions "
364 "into.");
365 static const bool FLAGS_subcompactions_dummy
366 __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_subcompactions,
367 &ValidateUint32Range);
368
369 DEFINE_int32(max_background_flushes,
370 rocksdb::Options().max_background_flushes,
371 "The maximum number of concurrent background flushes"
372 " that can occur in parallel.");
373
374 static rocksdb::CompactionStyle FLAGS_compaction_style_e;
375 DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
376 "style of compaction: level-based, universal and fifo");
377
378 static rocksdb::CompactionPri FLAGS_compaction_pri_e;
379 DEFINE_int32(compaction_pri, (int32_t)rocksdb::Options().compaction_pri,
380 "priority of files to compaction: by size or by data age");
381
382 DEFINE_int32(universal_size_ratio, 0,
383 "Percentage flexibility while comparing file size"
384 " (for universal compaction only).");
385
386 DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files in a"
387 " single compaction run (for universal compaction only).");
388
389 DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
390 " in universal style compaction");
391
392 DEFINE_int32(universal_max_size_amplification_percent, 0,
393 "The max size amplification for universal style compaction");
394
395 DEFINE_int32(universal_compression_size_percent, -1,
396 "The percentage of the database to compress for universal "
397 "compaction. -1 means compress everything.");
398
399 DEFINE_bool(universal_allow_trivial_move, false,
400 "Allow trivial move in universal compaction.");
401
402 DEFINE_int64(cache_size, 8 << 20, // 8MB
403 "Number of bytes to use as a cache of uncompressed data");
404
405 DEFINE_int32(cache_numshardbits, 6,
406 "Number of shards for the block cache"
407 " is 2 ** cache_numshardbits. Negative means use default settings."
408 " This is applied only if FLAGS_cache_size is non-negative.");
409
410 DEFINE_double(cache_high_pri_pool_ratio, 0.0,
411 "Ratio of block cache reserve for high pri blocks. "
412 "If > 0.0, we also enable "
413 "cache_index_and_filter_blocks_with_high_priority.");
414
415 DEFINE_bool(use_clock_cache, false,
416 "Replace default LRU block cache with clock cache.");
417
418 DEFINE_int64(simcache_size, -1,
419 "Number of bytes to use as a simcache of "
420 "uncompressed data. Nagative value disables simcache.");
421
422 DEFINE_bool(cache_index_and_filter_blocks, false,
423 "Cache index/filter blocks in block cache.");
424
425 DEFINE_bool(partition_index_and_filters, false,
426 "Partition index and filter blocks.");
427
428 DEFINE_bool(partition_index, false, "Partition index blocks");
429
430 DEFINE_int64(metadata_block_size,
431 rocksdb::BlockBasedTableOptions().metadata_block_size,
432 "Max partition size when partitioning index/filters");
433
434 // The default reduces the overhead of reading time with flash. With HDD, which
435 // offers much less throughput, however, this number better to be set to 1.
436 DEFINE_int32(ops_between_duration_checks, 1000,
437 "Check duration limit every x ops");
438
439 DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false,
440 "Pin index/filter blocks of L0 files in block cache.");
441
442 DEFINE_bool(
443 pin_top_level_index_and_filter, false,
444 "Pin top-level index of partitioned index/filter blocks in block cache.");
445
446 DEFINE_int32(block_size,
447 static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
448 "Number of bytes in a block.");
449
450 DEFINE_int32(
451 format_version,
452 static_cast<int32_t>(rocksdb::BlockBasedTableOptions().format_version),
453 "Format version of SST files.");
454
455 DEFINE_int32(block_restart_interval,
456 rocksdb::BlockBasedTableOptions().block_restart_interval,
457 "Number of keys between restart points "
458 "for delta encoding of keys in data block.");
459
460 DEFINE_int32(index_block_restart_interval,
461 rocksdb::BlockBasedTableOptions().index_block_restart_interval,
462 "Number of keys between restart points "
463 "for delta encoding of keys in index block.");
464
465 DEFINE_int32(read_amp_bytes_per_bit,
466 rocksdb::BlockBasedTableOptions().read_amp_bytes_per_bit,
467 "Number of bytes per bit to be used in block read-amp bitmap");
468
469 DEFINE_bool(enable_index_compression,
470 rocksdb::BlockBasedTableOptions().enable_index_compression,
471 "Compress the index block");
472
473 DEFINE_bool(block_align, rocksdb::BlockBasedTableOptions().block_align,
474 "Align data blocks on page size");
475
476 DEFINE_bool(use_data_block_hash_index, false,
477 "if use kDataBlockBinaryAndHash "
478 "instead of kDataBlockBinarySearch. "
479 "This is valid if only we use BlockTable");
480
481 DEFINE_double(data_block_hash_table_util_ratio, 0.75,
482 "util ratio for data block hash index table. "
483 "This is only valid if use_data_block_hash_index is "
484 "set to true");
485
486 DEFINE_int64(compressed_cache_size, -1,
487 "Number of bytes to use as a cache of compressed data.");
488
489 DEFINE_int64(row_cache_size, 0,
490 "Number of bytes to use as a cache of individual rows"
491 " (0 = disabled).");
492
493 DEFINE_int32(open_files, rocksdb::Options().max_open_files,
494 "Maximum number of files to keep open at the same time"
495 " (use default if == 0)");
496
497 DEFINE_int32(file_opening_threads, rocksdb::Options().max_file_opening_threads,
498 "If open_files is set to -1, this option set the number of "
499 "threads that will be used to open files during DB::Open()");
500
501 DEFINE_bool(new_table_reader_for_compaction_inputs, true,
502 "If true, uses a separate file handle for compaction inputs");
503
504 DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
505
506 DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
507 "Maximum windows randomaccess buffer size");
508
509 DEFINE_int32(writable_file_max_buffer_size, 1024 * 1024,
510 "Maximum write buffer for Writable File");
511
512 DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
513 " use default settings.");
514 DEFINE_double(memtable_bloom_size_ratio, 0,
515 "Ratio of memtable size used for bloom filter. 0 means no bloom "
516 "filter.");
517 DEFINE_bool(memtable_whole_key_filtering, false,
518 "Try to use whole key bloom filter in memtables.");
519 DEFINE_bool(memtable_use_huge_page, false,
520 "Try to use huge page in memtables.");
521
522 DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing"
523 " database. If you set this flag and also specify a benchmark that"
524 " wants a fresh database, that benchmark will fail.");
525
526 DEFINE_bool(use_existing_keys, false,
527 "If true, uses existing keys in the DB, "
528 "rather than generating new ones. This involves some startup "
529 "latency to load all keys into memory. It is supported for the "
530 "same read/overwrite benchmarks as `-use_existing_db=true`, which "
531 "must also be set for this flag to be enabled. When this flag is "
532 "set, the value for `-num` will be ignored.");
533
534 DEFINE_bool(show_table_properties, false,
535 "If true, then per-level table"
536 " properties will be printed on every stats-interval when"
537 " stats_interval is set and stats_per_interval is on.");
538
539 DEFINE_string(db, "", "Use the db with the following name.");
540
541 // Read cache flags
542
543 DEFINE_string(read_cache_path, "",
544 "If not empty string, a read cache will be used in this path");
545
546 DEFINE_int64(read_cache_size, 4LL * 1024 * 1024 * 1024,
547 "Maximum size of the read cache");
548
549 DEFINE_bool(read_cache_direct_write, true,
550 "Whether to use Direct IO for writing to the read cache");
551
552 DEFINE_bool(read_cache_direct_read, true,
553 "Whether to use Direct IO for reading from read cache");
554
555 DEFINE_bool(use_keep_filter, false, "Whether to use a noop compaction filter");
556
557 static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) {
558 if (value >= 20) {
559 fprintf(stderr, "Invalid value for --%s: %d, must be < 20\n",
560 flagname, value);
561 return false;
562 }
563 return true;
564 }
565
566 DEFINE_bool(verify_checksum, true,
567 "Verify checksum for every block read"
568 " from storage");
569
570 DEFINE_bool(statistics, false, "Database statistics");
571 DEFINE_int32(stats_level, rocksdb::StatsLevel::kExceptDetailedTimers,
572 "stats level for statistics");
573 DEFINE_string(statistics_string, "", "Serialized statistics string");
574 static class std::shared_ptr<rocksdb::Statistics> dbstats;
575
576 DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
577 " --num reads.");
578
579 DEFINE_bool(finish_after_writes, false, "Write thread terminates after all writes are finished");
580
581 DEFINE_bool(sync, false, "Sync all writes to disk");
582
583 DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
584
585 DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
586
587 DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");
588
589 DEFINE_string(truth_db, "/dev/shm/truth_db/dbbench",
590 "Truth key/values used when using verify");
591
592 DEFINE_int32(num_levels, 7, "The total number of levels");
593
594 DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base,
595 "Target file size at level-1");
596
597 DEFINE_int32(target_file_size_multiplier,
598 rocksdb::Options().target_file_size_multiplier,
599 "A multiplier to compute target level-N file size (N >= 2)");
600
601 DEFINE_uint64(max_bytes_for_level_base,
602 rocksdb::Options().max_bytes_for_level_base,
603 "Max bytes for level-1");
604
605 DEFINE_bool(level_compaction_dynamic_level_bytes, false,
606 "Whether level size base is dynamic");
607
608 DEFINE_double(max_bytes_for_level_multiplier, 10,
609 "A multiplier to compute max bytes for level-N (N >= 2)");
610
611 static std::vector<int> FLAGS_max_bytes_for_level_multiplier_additional_v;
612 DEFINE_string(max_bytes_for_level_multiplier_additional, "",
613 "A vector that specifies additional fanout per level");
614
615 DEFINE_int32(level0_stop_writes_trigger,
616 rocksdb::Options().level0_stop_writes_trigger,
617 "Number of files in level-0"
618 " that will trigger put stop.");
619
620 DEFINE_int32(level0_slowdown_writes_trigger,
621 rocksdb::Options().level0_slowdown_writes_trigger,
622 "Number of files in level-0"
623 " that will slow down writes.");
624
625 DEFINE_int32(level0_file_num_compaction_trigger,
626 rocksdb::Options().level0_file_num_compaction_trigger,
627 "Number of files in level-0"
628 " when compactions start");
629
630 static bool ValidateInt32Percent(const char* flagname, int32_t value) {
631 if (value <= 0 || value>=100) {
632 fprintf(stderr, "Invalid value for --%s: %d, 0< pct <100 \n",
633 flagname, value);
634 return false;
635 }
636 return true;
637 }
638 DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed"
639 " as percentage) for the ReadRandomWriteRandom workload. The "
640 "default value 90 means 90% operations out of all reads and writes"
641 " operations are reads. In other words, 9 gets for every 1 put.");
642
643 DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed"
644 " as percentage) for the ReadRandomMergeRandom workload. The"
645 " default value 70 means 70% out of all read and merge operations"
646 " are merges. In other words, 7 merges for every 3 gets.");
647
648 DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
649 "deletes (used in RandomWithVerify only). RandomWithVerify "
650 "calculates writepercent as (100 - FLAGS_readwritepercent - "
651 "deletepercent), so deletepercent must be smaller than (100 - "
652 "FLAGS_readwritepercent)");
653
654 DEFINE_bool(optimize_filters_for_hits, false,
655 "Optimizes bloom filters for workloads for most lookups return "
656 "a value. For now this doesn't create bloom filters for the max "
657 "level of the LSM to reduce metadata that should fit in RAM. ");
658
659 DEFINE_uint64(delete_obsolete_files_period_micros, 0,
660 "Ignored. Left here for backward compatibility");
661
662 DEFINE_int64(writes_before_delete_range, 0,
663 "Number of writes before DeleteRange is called regularly.");
664
665 DEFINE_int64(writes_per_range_tombstone, 0,
666 "Number of writes between range tombstones");
667
668 DEFINE_int64(range_tombstone_width, 100, "Number of keys in tombstone's range");
669
670 DEFINE_int64(max_num_range_tombstones, 0,
671 "Maximum number of range tombstones "
672 "to insert.");
673
674 DEFINE_bool(expand_range_tombstones, false,
675 "Expand range tombstone into sequential regular tombstones.");
676
677 #ifndef ROCKSDB_LITE
678 // Transactions Options
679 DEFINE_bool(optimistic_transaction_db, false,
680 "Open a OptimisticTransactionDB instance. "
681 "Required for randomtransaction benchmark.");
682
683 DEFINE_bool(transaction_db, false,
684 "Open a TransactionDB instance. "
685 "Required for randomtransaction benchmark.");
686
687 DEFINE_uint64(transaction_sets, 2,
688 "Number of keys each transaction will "
689 "modify (use in RandomTransaction only). Max: 9999");
690
691 DEFINE_bool(transaction_set_snapshot, false,
692 "Setting to true will have each transaction call SetSnapshot()"
693 " upon creation.");
694
695 DEFINE_int32(transaction_sleep, 0,
696 "Max microseconds to sleep in between "
697 "reading and writing a value (used in RandomTransaction only). ");
698
699 DEFINE_uint64(transaction_lock_timeout, 100,
700 "If using a transaction_db, specifies the lock wait timeout in"
701 " milliseconds before failing a transaction waiting on a lock");
702 DEFINE_string(
703 options_file, "",
704 "The path to a RocksDB options file. If specified, then db_bench will "
705 "run with the RocksDB options in the default column family of the "
706 "specified options file. "
707 "Note that with this setting, db_bench will ONLY accept the following "
708 "RocksDB options related command-line arguments, all other arguments "
709 "that are related to RocksDB options will be ignored:\n"
710 "\t--use_existing_db\n"
711 "\t--use_existing_keys\n"
712 "\t--statistics\n"
713 "\t--row_cache_size\n"
714 "\t--row_cache_numshardbits\n"
715 "\t--enable_io_prio\n"
716 "\t--dump_malloc_stats\n"
717 "\t--num_multi_db\n");
718
719 // FIFO Compaction Options
720 DEFINE_uint64(fifo_compaction_max_table_files_size_mb, 0,
721 "The limit of total table file sizes to trigger FIFO compaction");
722
723 DEFINE_bool(fifo_compaction_allow_compaction, true,
724 "Allow compaction in FIFO compaction.");
725
726 DEFINE_uint64(fifo_compaction_ttl, 0, "TTL for the SST Files in seconds.");
727
728 // Blob DB Options
729 DEFINE_bool(use_blob_db, false,
730 "Open a BlobDB instance. "
731 "Required for large value benchmark.");
732
733 DEFINE_bool(blob_db_enable_gc, false, "Enable BlobDB garbage collection.");
734
735 DEFINE_bool(blob_db_is_fifo, false, "Enable FIFO eviction strategy in BlobDB.");
736
737 DEFINE_uint64(blob_db_max_db_size, 0,
738 "Max size limit of the directory where blob files are stored.");
739
740 DEFINE_uint64(blob_db_max_ttl_range, 86400,
741 "TTL range to generate BlobDB data (in seconds).");
742
743 DEFINE_uint64(blob_db_ttl_range_secs, 3600,
744 "TTL bucket size to use when creating blob files.");
745
746 DEFINE_uint64(blob_db_min_blob_size, 0,
747 "Smallest blob to store in a file. Blobs smaller than this "
748 "will be inlined with the key in the LSM tree.");
749
750 DEFINE_uint64(blob_db_bytes_per_sync, 0, "Bytes to sync blob file at.");
751
752 DEFINE_uint64(blob_db_file_size, 256 * 1024 * 1024,
753 "Target size of each blob file.");
754
755 #endif // ROCKSDB_LITE
756
757 DEFINE_bool(report_bg_io_stats, false,
758 "Measure times spents on I/Os while in compactions. ");
759
760 DEFINE_bool(use_stderr_info_logger, false,
761 "Write info logs to stderr instead of to LOG file. ");
762
763 DEFINE_string(trace_file, "", "Trace workload to a file. ");
764
765 static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
766 assert(ctype);
767
768 if (!strcasecmp(ctype, "none"))
769 return rocksdb::kNoCompression;
770 else if (!strcasecmp(ctype, "snappy"))
771 return rocksdb::kSnappyCompression;
772 else if (!strcasecmp(ctype, "zlib"))
773 return rocksdb::kZlibCompression;
774 else if (!strcasecmp(ctype, "bzip2"))
775 return rocksdb::kBZip2Compression;
776 else if (!strcasecmp(ctype, "lz4"))
777 return rocksdb::kLZ4Compression;
778 else if (!strcasecmp(ctype, "lz4hc"))
779 return rocksdb::kLZ4HCCompression;
780 else if (!strcasecmp(ctype, "xpress"))
781 return rocksdb::kXpressCompression;
782 else if (!strcasecmp(ctype, "zstd"))
783 return rocksdb::kZSTD;
784
785 fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
786 return rocksdb::kSnappyCompression; // default value
787 }
788
789 static std::string ColumnFamilyName(size_t i) {
790 if (i == 0) {
791 return rocksdb::kDefaultColumnFamilyName;
792 } else {
793 char name[100];
794 snprintf(name, sizeof(name), "column_family_name_%06zu", i);
795 return std::string(name);
796 }
797 }
798
799 DEFINE_string(compression_type, "snappy",
800 "Algorithm to use to compress the database");
801 static enum rocksdb::CompressionType FLAGS_compression_type_e =
802 rocksdb::kSnappyCompression;
803
804 DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression");
805
806 DEFINE_int32(compression_level, rocksdb::CompressionOptions().level,
807 "Compression level. The meaning of this value is library-"
808 "dependent. If unset, we try to use the default for the library "
809 "specified in `--compression_type`");
810
811 DEFINE_int32(compression_max_dict_bytes,
812 rocksdb::CompressionOptions().max_dict_bytes,
813 "Maximum size of dictionary used to prime the compression "
814 "library.");
815
816 DEFINE_int32(compression_zstd_max_train_bytes,
817 rocksdb::CompressionOptions().zstd_max_train_bytes,
818 "Maximum size of training data passed to zstd's dictionary "
819 "trainer.");
820
821 DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
822 " from this level. Levels with number < min_level_to_compress are"
823 " not compressed. Otherwise, apply compression_type to "
824 "all levels.");
825
826 static bool ValidateTableCacheNumshardbits(const char* flagname,
827 int32_t value) {
828 if (0 >= value || value > 20) {
829 fprintf(stderr, "Invalid value for --%s: %d, must be 0 < val <= 20\n",
830 flagname, value);
831 return false;
832 }
833 return true;
834 }
835 DEFINE_int32(table_cache_numshardbits, 4, "");
836
837 #ifndef ROCKSDB_LITE
838 DEFINE_string(env_uri, "", "URI for registry Env lookup. Mutually exclusive"
839 " with --hdfs.");
840 #endif // ROCKSDB_LITE
841 DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with"
842 " --env_uri.");
843 static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
844
845 DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
846 "this is greater than zero. When 0 the interval grows over time.");
847
848 DEFINE_int64(stats_interval_seconds, 0, "Report stats every N seconds. This "
849 "overrides stats_interval when both are > 0.");
850
851 DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when"
852 " this is greater than 0.");
853
854 DEFINE_int64(report_interval_seconds, 0,
855 "If greater than zero, it will write simple stats in CVS format "
856 "to --report_file every N seconds");
857
858 DEFINE_string(report_file, "report.csv",
859 "Filename where some simple stats are reported to (if "
860 "--report_interval_seconds is bigger than 0)");
861
862 DEFINE_int32(thread_status_per_interval, 0,
863 "Takes and report a snapshot of the current status of each thread"
864 " when this is greater than 0.");
865
866 DEFINE_int32(perf_level, rocksdb::PerfLevel::kDisable, "Level of perf collection");
867
868 static bool ValidateRateLimit(const char* flagname, double value) {
869 const double EPSILON = 1e-10;
870 if ( value < -EPSILON ) {
871 fprintf(stderr, "Invalid value for --%s: %12.6f, must be >= 0.0\n",
872 flagname, value);
873 return false;
874 }
875 return true;
876 }
877 DEFINE_double(soft_rate_limit, 0.0, "DEPRECATED");
878
879 DEFINE_double(hard_rate_limit, 0.0, "DEPRECATED");
880
881 DEFINE_uint64(soft_pending_compaction_bytes_limit, 64ull * 1024 * 1024 * 1024,
882 "Slowdown writes if pending compaction bytes exceed this number");
883
884 DEFINE_uint64(hard_pending_compaction_bytes_limit, 128ull * 1024 * 1024 * 1024,
885 "Stop writes if pending compaction bytes exceed this number");
886
887 DEFINE_uint64(delayed_write_rate, 8388608u,
888 "Limited bytes allowed to DB when soft_rate_limit or "
889 "level0_slowdown_writes_trigger triggers");
890
891 DEFINE_bool(enable_pipelined_write, true,
892 "Allow WAL and memtable writes to be pipelined");
893
894 DEFINE_bool(allow_concurrent_memtable_write, true,
895 "Allow multi-writers to update mem tables in parallel.");
896
897 DEFINE_bool(inplace_update_support, rocksdb::Options().inplace_update_support,
898 "Support in-place memtable update for smaller or same-size values");
899
900 DEFINE_uint64(inplace_update_num_locks,
901 rocksdb::Options().inplace_update_num_locks,
902 "Number of RW locks to protect in-place memtable updates");
903
904 DEFINE_bool(enable_write_thread_adaptive_yield, true,
905 "Use a yielding spin loop for brief writer thread waits.");
906
907 DEFINE_uint64(
908 write_thread_max_yield_usec, 100,
909 "Maximum microseconds for enable_write_thread_adaptive_yield operation.");
910
911 DEFINE_uint64(write_thread_slow_yield_usec, 3,
912 "The threshold at which a slow yield is considered a signal that "
913 "other processes or threads want the core.");
914
915 DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
916 "When hard_rate_limit is set then this is the max time a put will"
917 " be stalled.");
918
919 DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
920
921 DEFINE_bool(rate_limiter_auto_tuned, false,
922 "Enable dynamic adjustment of rate limit according to demand for "
923 "background I/O");
924
925
926 DEFINE_bool(sine_write_rate, false,
927 "Use a sine wave write_rate_limit");
928
929 DEFINE_uint64(sine_write_rate_interval_milliseconds, 10000,
930 "Interval of which the sine wave write_rate_limit is recalculated");
931
932 DEFINE_double(sine_a, 1,
933 "A in f(x) = A sin(bx + c) + d");
934
935 DEFINE_double(sine_b, 1,
936 "B in f(x) = A sin(bx + c) + d");
937
938 DEFINE_double(sine_c, 0,
939 "C in f(x) = A sin(bx + c) + d");
940
941 DEFINE_double(sine_d, 1,
942 "D in f(x) = A sin(bx + c) + d");
943
944 DEFINE_bool(rate_limit_bg_reads, false,
945 "Use options.rate_limiter on compaction reads");
946
947 DEFINE_uint64(
948 benchmark_write_rate_limit, 0,
949 "If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
950 "is the global rate in bytes/second.");
951
952 // the parameters of mix_graph
953 DEFINE_double(key_dist_a, 0.0,
954 "The parameter 'a' of key access distribution model "
955 "f(x)=a*x^b");
956 DEFINE_double(key_dist_b, 0.0,
957 "The parameter 'b' of key access distribution model "
958 "f(x)=a*x^b");
959 DEFINE_double(value_theta, 0.0,
960 "The parameter 'theta' of Generized Pareto Distribution "
961 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
962 DEFINE_double(value_k, 0.0,
963 "The parameter 'k' of Generized Pareto Distribution "
964 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
965 DEFINE_double(value_sigma, 0.0,
966 "The parameter 'theta' of Generized Pareto Distribution "
967 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
968 DEFINE_double(iter_theta, 0.0,
969 "The parameter 'theta' of Generized Pareto Distribution "
970 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
971 DEFINE_double(iter_k, 0.0,
972 "The parameter 'k' of Generized Pareto Distribution "
973 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
974 DEFINE_double(iter_sigma, 0.0,
975 "The parameter 'sigma' of Generized Pareto Distribution "
976 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
977 DEFINE_double(mix_get_ratio, 1.0,
978 "The ratio of Get queries of mix_graph workload");
979 DEFINE_double(mix_put_ratio, 0.0,
980 "The ratio of Put queries of mix_graph workload");
981 DEFINE_double(mix_seek_ratio, 0.0,
982 "The ratio of Seek queries of mix_graph workload");
983 DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator");
984 DEFINE_int64(mix_ave_kv_size, 512,
985 "The average key-value size of this workload");
986 DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload");
987 DEFINE_double(
988 sine_mix_rate_noise, 0.0,
989 "Add the noise ratio to the sine rate, it is between 0.0 and 1.0");
990 DEFINE_bool(sine_mix_rate, false,
991 "Enable the sine QPS control on the mix workload");
992 DEFINE_uint64(
993 sine_mix_rate_interval_milliseconds, 10000,
994 "Interval of which the sine wave read_rate_limit is recalculated");
995 DEFINE_int64(mix_accesses, -1,
996 "The total query accesses of mix_graph workload");
997
998 DEFINE_uint64(
999 benchmark_read_rate_limit, 0,
1000 "If non-zero, db_bench will rate-limit the reads from RocksDB. This "
1001 "is the global rate in ops/second.");
1002
1003 DEFINE_uint64(max_compaction_bytes, rocksdb::Options().max_compaction_bytes,
1004 "Max bytes allowed in one compaction");
1005
1006 #ifndef ROCKSDB_LITE
1007 DEFINE_bool(readonly, false, "Run read only benchmarks.");
1008
1009 DEFINE_bool(print_malloc_stats, false,
1010 "Print malloc stats to stdout after benchmarks finish.");
1011 #endif // ROCKSDB_LITE
1012
1013 DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
1014
1015 DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
1016 DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
1017 " in MB.");
1018 DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
1019
1020 DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads,
1021 "Allow reads to occur via mmap-ing files");
1022
1023 DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes,
1024 "Allow writes to occur via mmap-ing files");
1025
1026 DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads,
1027 "Use O_DIRECT for reading data");
1028
1029 DEFINE_bool(use_direct_io_for_flush_and_compaction,
1030 rocksdb::Options().use_direct_io_for_flush_and_compaction,
1031 "Use O_DIRECT for background flush and compaction writes");
1032
1033 DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
1034 "Advise random access on table file open");
1035
1036 DEFINE_string(compaction_fadvice, "NORMAL",
1037 "Access pattern advice when a file is compacted");
1038 static auto FLAGS_compaction_fadvice_e =
1039 rocksdb::Options().access_hint_on_compaction_start;
1040
1041 DEFINE_bool(use_tailing_iterator, false,
1042 "Use tailing iterator to access a series of keys instead of get");
1043
1044 DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
1045 "Use adaptive mutex");
1046
1047 DEFINE_uint64(bytes_per_sync, rocksdb::Options().bytes_per_sync,
1048 "Allows OS to incrementally sync SST files to disk while they are"
1049 " being written, in the background. Issue one request for every"
1050 " bytes_per_sync written. 0 turns it off.");
1051
1052 DEFINE_uint64(wal_bytes_per_sync, rocksdb::Options().wal_bytes_per_sync,
1053 "Allows OS to incrementally sync WAL files to disk while they are"
1054 " being written, in the background. Issue one request for every"
1055 " wal_bytes_per_sync written. 0 turns it off.");
1056
1057 DEFINE_bool(use_single_deletes, true,
1058 "Use single deletes (used in RandomReplaceKeys only).");
1059
1060 DEFINE_double(stddev, 2000.0,
1061 "Standard deviation of normal distribution used for picking keys"
1062 " (used in RandomReplaceKeys only).");
1063
1064 DEFINE_int32(key_id_range, 100000,
1065 "Range of possible value of key id (used in TimeSeries only).");
1066
1067 DEFINE_string(expire_style, "none",
1068 "Style to remove expired time entries. Can be one of the options "
1069 "below: none (do not expired data), compaction_filter (use a "
1070 "compaction filter to remove expired data), delete (seek IDs and "
1071 "remove expired data) (used in TimeSeries only).");
1072
1073 DEFINE_uint64(
1074 time_range, 100000,
1075 "Range of timestamp that store in the database (used in TimeSeries"
1076 " only).");
1077
1078 DEFINE_int32(num_deletion_threads, 1,
1079 "Number of threads to do deletion (used in TimeSeries and delete "
1080 "expire_style only).");
1081
1082 DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
1083 " operations on a key in the memtable");
1084
1085 static bool ValidatePrefixSize(const char* flagname, int32_t value) {
1086 if (value < 0 || value>=2000000000) {
1087 fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
1088 flagname, value);
1089 return false;
1090 }
1091 return true;
1092 }
1093 DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
1094 "plain table");
1095 DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated "
1096 "per prefix, 0 means no special handling of the prefix, "
1097 "i.e. use the prefix comes with the generated random number.");
1098 DEFINE_int32(memtable_insert_with_hint_prefix_size, 0,
1099 "If non-zero, enable "
1100 "memtable insert with hint with the given prefix size.");
1101 DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction "
1102 "threads' IO priority");
1103 DEFINE_bool(enable_cpu_prio, false, "Lower the background flush/compaction "
1104 "threads' CPU priority");
1105 DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
1106 "table becomes an identity function. This is only valid when key "
1107 "is 8 bytes");
1108 DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
1109 DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec,
1110 "Gap between printing stats to log in seconds");
1111 DEFINE_uint64(stats_persist_period_sec,
1112 rocksdb::Options().stats_persist_period_sec,
1113 "Gap between persisting stats in seconds");
1114 DEFINE_uint64(stats_history_buffer_size,
1115 rocksdb::Options().stats_history_buffer_size,
1116 "Max number of stats snapshots to keep in memory");
1117
1118 enum RepFactory {
1119 kSkipList,
1120 kPrefixHash,
1121 kVectorRep,
1122 kHashLinkedList,
1123 };
1124
1125 static enum RepFactory StringToRepFactory(const char* ctype) {
1126 assert(ctype);
1127
1128 if (!strcasecmp(ctype, "skip_list"))
1129 return kSkipList;
1130 else if (!strcasecmp(ctype, "prefix_hash"))
1131 return kPrefixHash;
1132 else if (!strcasecmp(ctype, "vector"))
1133 return kVectorRep;
1134 else if (!strcasecmp(ctype, "hash_linkedlist"))
1135 return kHashLinkedList;
1136
1137 fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
1138 return kSkipList;
1139 }
1140
1141 static enum RepFactory FLAGS_rep_factory;
1142 DEFINE_string(memtablerep, "skip_list", "");
1143 DEFINE_int64(hash_bucket_count, 1024 * 1024, "hash bucket count");
1144 DEFINE_bool(use_plain_table, false, "if use plain table "
1145 "instead of block-based table format");
1146 DEFINE_bool(use_cuckoo_table, false, "if use cuckoo table format");
1147 DEFINE_double(cuckoo_hash_ratio, 0.9, "Hash ratio for Cuckoo SST table.");
1148 DEFINE_bool(use_hash_search, false, "if use kHashSearch "
1149 "instead of kBinarySearch. "
1150 "This is valid if only we use BlockTable");
1151 DEFINE_bool(use_block_based_filter, false, "if use kBlockBasedFilter "
1152 "instead of kFullFilter for filter block. "
1153 "This is valid if only we use BlockTable");
1154 DEFINE_string(merge_operator, "", "The merge operator to use with the database."
1155 "If a new merge operator is specified, be sure to use fresh"
1156 " database The possible merge operators are defined in"
1157 " utilities/merge_operators.h");
1158 DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try "
1159 "linear search first for this many steps from the previous "
1160 "position");
1161 DEFINE_bool(report_file_operations, false, "if report number of file "
1162 "operations");
1163
1164 static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) =
1165 RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
1166
1167 static const bool FLAGS_hard_rate_limit_dummy __attribute__((__unused__)) =
1168 RegisterFlagValidator(&FLAGS_hard_rate_limit, &ValidateRateLimit);
1169
1170 static const bool FLAGS_prefix_size_dummy __attribute__((__unused__)) =
1171 RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
1172
1173 static const bool FLAGS_key_size_dummy __attribute__((__unused__)) =
1174 RegisterFlagValidator(&FLAGS_key_size, &ValidateKeySize);
1175
1176 static const bool FLAGS_cache_numshardbits_dummy __attribute__((__unused__)) =
1177 RegisterFlagValidator(&FLAGS_cache_numshardbits,
1178 &ValidateCacheNumshardbits);
1179
1180 static const bool FLAGS_readwritepercent_dummy __attribute__((__unused__)) =
1181 RegisterFlagValidator(&FLAGS_readwritepercent, &ValidateInt32Percent);
1182
1183 DEFINE_int32(disable_seek_compaction, false,
1184 "Not used, left here for backwards compatibility");
1185
1186 static const bool FLAGS_deletepercent_dummy __attribute__((__unused__)) =
1187 RegisterFlagValidator(&FLAGS_deletepercent, &ValidateInt32Percent);
1188 static const bool FLAGS_table_cache_numshardbits_dummy __attribute__((__unused__)) =
1189 RegisterFlagValidator(&FLAGS_table_cache_numshardbits,
1190 &ValidateTableCacheNumshardbits);
1191
1192 namespace rocksdb {
1193
1194 namespace {
1195 struct ReportFileOpCounters {
1196 std::atomic<int> open_counter_;
1197 std::atomic<int> read_counter_;
1198 std::atomic<int> append_counter_;
1199 std::atomic<uint64_t> bytes_read_;
1200 std::atomic<uint64_t> bytes_written_;
1201 };
1202
1203 // A special Env to records and report file operations in db_bench
1204 class ReportFileOpEnv : public EnvWrapper {
1205 public:
1206 explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); }
1207
1208 void reset() {
1209 counters_.open_counter_ = 0;
1210 counters_.read_counter_ = 0;
1211 counters_.append_counter_ = 0;
1212 counters_.bytes_read_ = 0;
1213 counters_.bytes_written_ = 0;
1214 }
1215
1216 Status NewSequentialFile(const std::string& f,
1217 std::unique_ptr<SequentialFile>* r,
1218 const EnvOptions& soptions) override {
1219 class CountingFile : public SequentialFile {
1220 private:
1221 std::unique_ptr<SequentialFile> target_;
1222 ReportFileOpCounters* counters_;
1223
1224 public:
1225 CountingFile(std::unique_ptr<SequentialFile>&& target,
1226 ReportFileOpCounters* counters)
1227 : target_(std::move(target)), counters_(counters) {}
1228
1229 Status Read(size_t n, Slice* result, char* scratch) override {
1230 counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
1231 Status rv = target_->Read(n, result, scratch);
1232 counters_->bytes_read_.fetch_add(result->size(),
1233 std::memory_order_relaxed);
1234 return rv;
1235 }
1236
1237 Status Skip(uint64_t n) override { return target_->Skip(n); }
1238 };
1239
1240 Status s = target()->NewSequentialFile(f, r, soptions);
1241 if (s.ok()) {
1242 counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
1243 r->reset(new CountingFile(std::move(*r), counters()));
1244 }
1245 return s;
1246 }
1247
1248 Status NewRandomAccessFile(const std::string& f,
1249 std::unique_ptr<RandomAccessFile>* r,
1250 const EnvOptions& soptions) override {
1251 class CountingFile : public RandomAccessFile {
1252 private:
1253 std::unique_ptr<RandomAccessFile> target_;
1254 ReportFileOpCounters* counters_;
1255
1256 public:
1257 CountingFile(std::unique_ptr<RandomAccessFile>&& target,
1258 ReportFileOpCounters* counters)
1259 : target_(std::move(target)), counters_(counters) {}
1260 Status Read(uint64_t offset, size_t n, Slice* result,
1261 char* scratch) const override {
1262 counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
1263 Status rv = target_->Read(offset, n, result, scratch);
1264 counters_->bytes_read_.fetch_add(result->size(),
1265 std::memory_order_relaxed);
1266 return rv;
1267 }
1268 };
1269
1270 Status s = target()->NewRandomAccessFile(f, r, soptions);
1271 if (s.ok()) {
1272 counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
1273 r->reset(new CountingFile(std::move(*r), counters()));
1274 }
1275 return s;
1276 }
1277
1278 Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
1279 const EnvOptions& soptions) override {
1280 class CountingFile : public WritableFile {
1281 private:
1282 std::unique_ptr<WritableFile> target_;
1283 ReportFileOpCounters* counters_;
1284
1285 public:
1286 CountingFile(std::unique_ptr<WritableFile>&& target,
1287 ReportFileOpCounters* counters)
1288 : target_(std::move(target)), counters_(counters) {}
1289
1290 Status Append(const Slice& data) override {
1291 counters_->append_counter_.fetch_add(1, std::memory_order_relaxed);
1292 Status rv = target_->Append(data);
1293 counters_->bytes_written_.fetch_add(data.size(),
1294 std::memory_order_relaxed);
1295 return rv;
1296 }
1297
1298 Status Truncate(uint64_t size) override { return target_->Truncate(size); }
1299 Status Close() override { return target_->Close(); }
1300 Status Flush() override { return target_->Flush(); }
1301 Status Sync() override { return target_->Sync(); }
1302 };
1303
1304 Status s = target()->NewWritableFile(f, r, soptions);
1305 if (s.ok()) {
1306 counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
1307 r->reset(new CountingFile(std::move(*r), counters()));
1308 }
1309 return s;
1310 }
1311
1312 // getter
1313 ReportFileOpCounters* counters() { return &counters_; }
1314
1315 private:
1316 ReportFileOpCounters counters_;
1317 };
1318
1319 } // namespace
1320
1321 // Helper for quickly generating random data.
1322 class RandomGenerator {
1323 private:
1324 std::string data_;
1325 unsigned int pos_;
1326
1327 public:
1328 RandomGenerator() {
1329 // We use a limited amount of data over and over again and ensure
1330 // that it is larger than the compression window (32KB), and also
1331 // large enough to serve all typical value sizes we want to write.
1332 Random rnd(301);
1333 std::string piece;
1334 while (data_.size() < (unsigned)std::max(1048576, FLAGS_value_size)) {
1335 // Add a short fragment that is as compressible as specified
1336 // by FLAGS_compression_ratio.
1337 test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
1338 data_.append(piece);
1339 }
1340 pos_ = 0;
1341 }
1342
1343 Slice Generate(unsigned int len) {
1344 assert(len <= data_.size());
1345 if (pos_ + len > data_.size()) {
1346 pos_ = 0;
1347 }
1348 pos_ += len;
1349 return Slice(data_.data() + pos_ - len, len);
1350 }
1351
1352 Slice GenerateWithTTL(unsigned int len) {
1353 assert(len <= data_.size());
1354 if (pos_ + len > data_.size()) {
1355 pos_ = 0;
1356 }
1357 pos_ += len;
1358 return Slice(data_.data() + pos_ - len, len);
1359 }
1360 };
1361
1362 static void AppendWithSpace(std::string* str, Slice msg) {
1363 if (msg.empty()) return;
1364 if (!str->empty()) {
1365 str->push_back(' ');
1366 }
1367 str->append(msg.data(), msg.size());
1368 }
1369
1370 struct DBWithColumnFamilies {
1371 std::vector<ColumnFamilyHandle*> cfh;
1372 DB* db;
1373 #ifndef ROCKSDB_LITE
1374 OptimisticTransactionDB* opt_txn_db;
1375 #endif // ROCKSDB_LITE
1376 std::atomic<size_t> num_created; // Need to be updated after all the
1377 // new entries in cfh are set.
1378 size_t num_hot; // Number of column families to be queried at each moment.
1379 // After each CreateNewCf(), another num_hot number of new
1380 // Column families will be created and used to be queried.
1381 port::Mutex create_cf_mutex; // Only one thread can execute CreateNewCf()
1382 std::vector<int> cfh_idx_to_prob; // ith index holds probability of operating
1383 // on cfh[i].
1384
1385 DBWithColumnFamilies()
1386 : db(nullptr)
1387 #ifndef ROCKSDB_LITE
1388 , opt_txn_db(nullptr)
1389 #endif // ROCKSDB_LITE
1390 {
1391 cfh.clear();
1392 num_created = 0;
1393 num_hot = 0;
1394 }
1395
1396 DBWithColumnFamilies(const DBWithColumnFamilies& other)
1397 : cfh(other.cfh),
1398 db(other.db),
1399 #ifndef ROCKSDB_LITE
1400 opt_txn_db(other.opt_txn_db),
1401 #endif // ROCKSDB_LITE
1402 num_created(other.num_created.load()),
1403 num_hot(other.num_hot),
1404 cfh_idx_to_prob(other.cfh_idx_to_prob) {
1405 }
1406
1407 void DeleteDBs() {
1408 std::for_each(cfh.begin(), cfh.end(),
1409 [](ColumnFamilyHandle* cfhi) { delete cfhi; });
1410 cfh.clear();
1411 #ifndef ROCKSDB_LITE
1412 if (opt_txn_db) {
1413 delete opt_txn_db;
1414 opt_txn_db = nullptr;
1415 } else {
1416 delete db;
1417 db = nullptr;
1418 }
1419 #else
1420 delete db;
1421 db = nullptr;
1422 #endif // ROCKSDB_LITE
1423 }
1424
1425 ColumnFamilyHandle* GetCfh(int64_t rand_num) {
1426 assert(num_hot > 0);
1427 size_t rand_offset = 0;
1428 if (!cfh_idx_to_prob.empty()) {
1429 assert(cfh_idx_to_prob.size() == num_hot);
1430 int sum = 0;
1431 while (sum + cfh_idx_to_prob[rand_offset] < rand_num % 100) {
1432 sum += cfh_idx_to_prob[rand_offset];
1433 ++rand_offset;
1434 }
1435 assert(rand_offset < cfh_idx_to_prob.size());
1436 } else {
1437 rand_offset = rand_num % num_hot;
1438 }
1439 return cfh[num_created.load(std::memory_order_acquire) - num_hot +
1440 rand_offset];
1441 }
1442
1443 // stage: assume CF from 0 to stage * num_hot has be created. Need to create
1444 // stage * num_hot + 1 to stage * (num_hot + 1).
1445 void CreateNewCf(ColumnFamilyOptions options, int64_t stage) {
1446 MutexLock l(&create_cf_mutex);
1447 if ((stage + 1) * num_hot <= num_created) {
1448 // Already created.
1449 return;
1450 }
1451 auto new_num_created = num_created + num_hot;
1452 assert(new_num_created <= cfh.size());
1453 for (size_t i = num_created; i < new_num_created; i++) {
1454 Status s =
1455 db->CreateColumnFamily(options, ColumnFamilyName(i), &(cfh[i]));
1456 if (!s.ok()) {
1457 fprintf(stderr, "create column family error: %s\n",
1458 s.ToString().c_str());
1459 abort();
1460 }
1461 }
1462 num_created.store(new_num_created, std::memory_order_release);
1463 }
1464 };
1465
1466 // a class that reports stats to CSV file
1467 class ReporterAgent {
1468 public:
1469 ReporterAgent(Env* env, const std::string& fname,
1470 uint64_t report_interval_secs)
1471 : env_(env),
1472 total_ops_done_(0),
1473 last_report_(0),
1474 report_interval_secs_(report_interval_secs),
1475 stop_(false) {
1476 auto s = env_->NewWritableFile(fname, &report_file_, EnvOptions());
1477 if (s.ok()) {
1478 s = report_file_->Append(Header() + "\n");
1479 }
1480 if (s.ok()) {
1481 s = report_file_->Flush();
1482 }
1483 if (!s.ok()) {
1484 fprintf(stderr, "Can't open %s: %s\n", fname.c_str(),
1485 s.ToString().c_str());
1486 abort();
1487 }
1488
1489 reporting_thread_ = port::Thread([&]() { SleepAndReport(); });
1490 }
1491
1492 ~ReporterAgent() {
1493 {
1494 std::unique_lock<std::mutex> lk(mutex_);
1495 stop_ = true;
1496 stop_cv_.notify_all();
1497 }
1498 reporting_thread_.join();
1499 }
1500
1501 // thread safe
1502 void ReportFinishedOps(int64_t num_ops) {
1503 total_ops_done_.fetch_add(num_ops);
1504 }
1505
1506 private:
1507 std::string Header() const { return "secs_elapsed,interval_qps"; }
1508 void SleepAndReport() {
1509 uint64_t kMicrosInSecond = 1000 * 1000;
1510 auto time_started = env_->NowMicros();
1511 while (true) {
1512 {
1513 std::unique_lock<std::mutex> lk(mutex_);
1514 if (stop_ ||
1515 stop_cv_.wait_for(lk, std::chrono::seconds(report_interval_secs_),
1516 [&]() { return stop_; })) {
1517 // stopping
1518 break;
1519 }
1520 // else -> timeout, which means time for a report!
1521 }
1522 auto total_ops_done_snapshot = total_ops_done_.load();
1523 // round the seconds elapsed
1524 auto secs_elapsed =
1525 (env_->NowMicros() - time_started + kMicrosInSecond / 2) /
1526 kMicrosInSecond;
1527 std::string report = ToString(secs_elapsed) + "," +
1528 ToString(total_ops_done_snapshot - last_report_) +
1529 "\n";
1530 auto s = report_file_->Append(report);
1531 if (s.ok()) {
1532 s = report_file_->Flush();
1533 }
1534 if (!s.ok()) {
1535 fprintf(stderr,
1536 "Can't write to report file (%s), stopping the reporting\n",
1537 s.ToString().c_str());
1538 break;
1539 }
1540 last_report_ = total_ops_done_snapshot;
1541 }
1542 }
1543
1544 Env* env_;
1545 std::unique_ptr<WritableFile> report_file_;
1546 std::atomic<int64_t> total_ops_done_;
1547 int64_t last_report_;
1548 const uint64_t report_interval_secs_;
1549 rocksdb::port::Thread reporting_thread_;
1550 std::mutex mutex_;
1551 // will notify on stop
1552 std::condition_variable stop_cv_;
1553 bool stop_;
1554 };
1555
1556 enum OperationType : unsigned char {
1557 kRead = 0,
1558 kWrite,
1559 kDelete,
1560 kSeek,
1561 kMerge,
1562 kUpdate,
1563 kCompress,
1564 kUncompress,
1565 kCrc,
1566 kHash,
1567 kOthers
1568 };
1569
1570 static std::unordered_map<OperationType, std::string, std::hash<unsigned char>>
1571 OperationTypeString = {
1572 {kRead, "read"},
1573 {kWrite, "write"},
1574 {kDelete, "delete"},
1575 {kSeek, "seek"},
1576 {kMerge, "merge"},
1577 {kUpdate, "update"},
1578 {kCompress, "compress"},
1579 {kCompress, "uncompress"},
1580 {kCrc, "crc"},
1581 {kHash, "hash"},
1582 {kOthers, "op"}
1583 };
1584
1585 class CombinedStats;
1586 class Stats {
1587 private:
1588 int id_;
1589 uint64_t start_;
1590 uint64_t sine_interval_;
1591 uint64_t finish_;
1592 double seconds_;
1593 uint64_t done_;
1594 uint64_t last_report_done_;
1595 uint64_t next_report_;
1596 uint64_t bytes_;
1597 uint64_t last_op_finish_;
1598 uint64_t last_report_finish_;
1599 std::unordered_map<OperationType, std::shared_ptr<HistogramImpl>,
1600 std::hash<unsigned char>> hist_;
1601 std::string message_;
1602 bool exclude_from_merge_;
1603 ReporterAgent* reporter_agent_; // does not own
1604 friend class CombinedStats;
1605
1606 public:
1607 Stats() { Start(-1); }
1608
1609 void SetReporterAgent(ReporterAgent* reporter_agent) {
1610 reporter_agent_ = reporter_agent;
1611 }
1612
1613 void Start(int id) {
1614 id_ = id;
1615 next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
1616 last_op_finish_ = start_;
1617 hist_.clear();
1618 done_ = 0;
1619 last_report_done_ = 0;
1620 bytes_ = 0;
1621 seconds_ = 0;
1622 start_ = FLAGS_env->NowMicros();
1623 sine_interval_ = FLAGS_env->NowMicros();
1624 finish_ = start_;
1625 last_report_finish_ = start_;
1626 message_.clear();
1627 // When set, stats from this thread won't be merged with others.
1628 exclude_from_merge_ = false;
1629 }
1630
1631 void Merge(const Stats& other) {
1632 if (other.exclude_from_merge_)
1633 return;
1634
1635 for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
1636 auto this_it = hist_.find(it->first);
1637 if (this_it != hist_.end()) {
1638 this_it->second->Merge(*(other.hist_.at(it->first)));
1639 } else {
1640 hist_.insert({ it->first, it->second });
1641 }
1642 }
1643
1644 done_ += other.done_;
1645 bytes_ += other.bytes_;
1646 seconds_ += other.seconds_;
1647 if (other.start_ < start_) start_ = other.start_;
1648 if (other.finish_ > finish_) finish_ = other.finish_;
1649
1650 // Just keep the messages from one thread
1651 if (message_.empty()) message_ = other.message_;
1652 }
1653
1654 void Stop() {
1655 finish_ = FLAGS_env->NowMicros();
1656 seconds_ = (finish_ - start_) * 1e-6;
1657 }
1658
1659 void AddMessage(Slice msg) {
1660 AppendWithSpace(&message_, msg);
1661 }
1662
1663 void SetId(int id) { id_ = id; }
1664 void SetExcludeFromMerge() { exclude_from_merge_ = true; }
1665
1666 void PrintThreadStatus() {
1667 std::vector<ThreadStatus> thread_list;
1668 FLAGS_env->GetThreadList(&thread_list);
1669
1670 fprintf(stderr, "\n%18s %10s %12s %20s %13s %45s %12s %s\n",
1671 "ThreadID", "ThreadType", "cfName", "Operation",
1672 "ElapsedTime", "Stage", "State", "OperationProperties");
1673
1674 int64_t current_time = 0;
1675 Env::Default()->GetCurrentTime(&current_time);
1676 for (auto ts : thread_list) {
1677 fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s",
1678 ts.thread_id,
1679 ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(),
1680 ts.cf_name.c_str(),
1681 ThreadStatus::GetOperationName(ts.operation_type).c_str(),
1682 ThreadStatus::MicrosToString(ts.op_elapsed_micros).c_str(),
1683 ThreadStatus::GetOperationStageName(ts.operation_stage).c_str(),
1684 ThreadStatus::GetStateName(ts.state_type).c_str());
1685
1686 auto op_properties = ThreadStatus::InterpretOperationProperties(
1687 ts.operation_type, ts.op_properties);
1688 for (const auto& op_prop : op_properties) {
1689 fprintf(stderr, " %s %" PRIu64" |",
1690 op_prop.first.c_str(), op_prop.second);
1691 }
1692 fprintf(stderr, "\n");
1693 }
1694 }
1695
1696 void ResetSineInterval() {
1697 sine_interval_ = FLAGS_env->NowMicros();
1698 }
1699
1700 uint64_t GetSineInterval() {
1701 return sine_interval_;
1702 }
1703
1704 uint64_t GetStart() {
1705 return start_;
1706 }
1707
1708 void ResetLastOpTime() {
1709 // Set to now to avoid latency from calls to SleepForMicroseconds
1710 last_op_finish_ = FLAGS_env->NowMicros();
1711 }
1712
1713 void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops,
1714 enum OperationType op_type = kOthers) {
1715 if (reporter_agent_) {
1716 reporter_agent_->ReportFinishedOps(num_ops);
1717 }
1718 if (FLAGS_histogram) {
1719 uint64_t now = FLAGS_env->NowMicros();
1720 uint64_t micros = now - last_op_finish_;
1721
1722 if (hist_.find(op_type) == hist_.end())
1723 {
1724 auto hist_temp = std::make_shared<HistogramImpl>();
1725 hist_.insert({op_type, std::move(hist_temp)});
1726 }
1727 hist_[op_type]->Add(micros);
1728
1729 if (micros > 20000 && !FLAGS_stats_interval) {
1730 fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, "");
1731 fflush(stderr);
1732 }
1733 last_op_finish_ = now;
1734 }
1735
1736 done_ += num_ops;
1737 if (done_ >= next_report_) {
1738 if (!FLAGS_stats_interval) {
1739 if (next_report_ < 1000) next_report_ += 100;
1740 else if (next_report_ < 5000) next_report_ += 500;
1741 else if (next_report_ < 10000) next_report_ += 1000;
1742 else if (next_report_ < 50000) next_report_ += 5000;
1743 else if (next_report_ < 100000) next_report_ += 10000;
1744 else if (next_report_ < 500000) next_report_ += 50000;
1745 else next_report_ += 100000;
1746 fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, "");
1747 } else {
1748 uint64_t now = FLAGS_env->NowMicros();
1749 int64_t usecs_since_last = now - last_report_finish_;
1750
1751 // Determine whether to print status where interval is either
1752 // each N operations or each N seconds.
1753
1754 if (FLAGS_stats_interval_seconds &&
1755 usecs_since_last < (FLAGS_stats_interval_seconds * 1000000)) {
1756 // Don't check again for this many operations
1757 next_report_ += FLAGS_stats_interval;
1758
1759 } else {
1760
1761 fprintf(stderr,
1762 "%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and "
1763 "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
1764 FLAGS_env->TimeToString(now/1000000).c_str(),
1765 id_,
1766 done_ - last_report_done_, done_,
1767 (done_ - last_report_done_) /
1768 (usecs_since_last / 1000000.0),
1769 done_ / ((now - start_) / 1000000.0),
1770 (now - last_report_finish_) / 1000000.0,
1771 (now - start_) / 1000000.0);
1772
1773 if (id_ == 0 && FLAGS_stats_per_interval) {
1774 std::string stats;
1775
1776 if (db_with_cfh && db_with_cfh->num_created.load()) {
1777 for (size_t i = 0; i < db_with_cfh->num_created.load(); ++i) {
1778 if (db->GetProperty(db_with_cfh->cfh[i], "rocksdb.cfstats",
1779 &stats))
1780 fprintf(stderr, "%s\n", stats.c_str());
1781 if (FLAGS_show_table_properties) {
1782 for (int level = 0; level < FLAGS_num_levels; ++level) {
1783 if (db->GetProperty(
1784 db_with_cfh->cfh[i],
1785 "rocksdb.aggregated-table-properties-at-level" +
1786 ToString(level),
1787 &stats)) {
1788 if (stats.find("# entries=0") == std::string::npos) {
1789 fprintf(stderr, "Level[%d]: %s\n", level,
1790 stats.c_str());
1791 }
1792 }
1793 }
1794 }
1795 }
1796 } else if (db) {
1797 if (db->GetProperty("rocksdb.stats", &stats)) {
1798 fprintf(stderr, "%s\n", stats.c_str());
1799 }
1800 if (FLAGS_show_table_properties) {
1801 for (int level = 0; level < FLAGS_num_levels; ++level) {
1802 if (db->GetProperty(
1803 "rocksdb.aggregated-table-properties-at-level" +
1804 ToString(level),
1805 &stats)) {
1806 if (stats.find("# entries=0") == std::string::npos) {
1807 fprintf(stderr, "Level[%d]: %s\n", level, stats.c_str());
1808 }
1809 }
1810 }
1811 }
1812 }
1813 }
1814
1815 next_report_ += FLAGS_stats_interval;
1816 last_report_finish_ = now;
1817 last_report_done_ = done_;
1818 }
1819 }
1820 if (id_ == 0 && FLAGS_thread_status_per_interval) {
1821 PrintThreadStatus();
1822 }
1823 fflush(stderr);
1824 }
1825 }
1826
1827 void AddBytes(int64_t n) {
1828 bytes_ += n;
1829 }
1830
1831 void Report(const Slice& name) {
1832 // Pretend at least one op was done in case we are running a benchmark
1833 // that does not call FinishedOps().
1834 if (done_ < 1) done_ = 1;
1835
1836 std::string extra;
1837 if (bytes_ > 0) {
1838 // Rate is computed on actual elapsed time, not the sum of per-thread
1839 // elapsed times.
1840 double elapsed = (finish_ - start_) * 1e-6;
1841 char rate[100];
1842 snprintf(rate, sizeof(rate), "%6.1f MB/s",
1843 (bytes_ / 1048576.0) / elapsed);
1844 extra = rate;
1845 }
1846 AppendWithSpace(&extra, message_);
1847 double elapsed = (finish_ - start_) * 1e-6;
1848 double throughput = (double)done_/elapsed;
1849
1850 fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
1851 name.ToString().c_str(),
1852 seconds_ * 1e6 / done_,
1853 (long)throughput,
1854 (extra.empty() ? "" : " "),
1855 extra.c_str());
1856 if (FLAGS_histogram) {
1857 for (auto it = hist_.begin(); it != hist_.end(); ++it) {
1858 fprintf(stdout, "Microseconds per %s:\n%s\n",
1859 OperationTypeString[it->first].c_str(),
1860 it->second->ToString().c_str());
1861 }
1862 }
1863 if (FLAGS_report_file_operations) {
1864 ReportFileOpEnv* env = static_cast<ReportFileOpEnv*>(FLAGS_env);
1865 ReportFileOpCounters* counters = env->counters();
1866 fprintf(stdout, "Num files opened: %d\n",
1867 counters->open_counter_.load(std::memory_order_relaxed));
1868 fprintf(stdout, "Num Read(): %d\n",
1869 counters->read_counter_.load(std::memory_order_relaxed));
1870 fprintf(stdout, "Num Append(): %d\n",
1871 counters->append_counter_.load(std::memory_order_relaxed));
1872 fprintf(stdout, "Num bytes read: %" PRIu64 "\n",
1873 counters->bytes_read_.load(std::memory_order_relaxed));
1874 fprintf(stdout, "Num bytes written: %" PRIu64 "\n",
1875 counters->bytes_written_.load(std::memory_order_relaxed));
1876 env->reset();
1877 }
1878 fflush(stdout);
1879 }
1880 };
1881
1882 class CombinedStats {
1883 public:
1884 void AddStats(const Stats& stat) {
1885 uint64_t total_ops = stat.done_;
1886 uint64_t total_bytes_ = stat.bytes_;
1887 double elapsed;
1888
1889 if (total_ops < 1) {
1890 total_ops = 1;
1891 }
1892
1893 elapsed = (stat.finish_ - stat.start_) * 1e-6;
1894 throughput_ops_.emplace_back(total_ops / elapsed);
1895
1896 if (total_bytes_ > 0) {
1897 double mbs = (total_bytes_ / 1048576.0);
1898 throughput_mbs_.emplace_back(mbs / elapsed);
1899 }
1900 }
1901
1902 void Report(const std::string& bench_name) {
1903 const char* name = bench_name.c_str();
1904 int num_runs = static_cast<int>(throughput_ops_.size());
1905
1906 if (throughput_mbs_.size() == throughput_ops_.size()) {
1907 fprintf(stdout,
1908 "%s [AVG %d runs] : %d ops/sec; %6.1f MB/sec\n"
1909 "%s [MEDIAN %d runs] : %d ops/sec; %6.1f MB/sec\n",
1910 name, num_runs, static_cast<int>(CalcAvg(throughput_ops_)),
1911 CalcAvg(throughput_mbs_), name, num_runs,
1912 static_cast<int>(CalcMedian(throughput_ops_)),
1913 CalcMedian(throughput_mbs_));
1914 } else {
1915 fprintf(stdout,
1916 "%s [AVG %d runs] : %d ops/sec\n"
1917 "%s [MEDIAN %d runs] : %d ops/sec\n",
1918 name, num_runs, static_cast<int>(CalcAvg(throughput_ops_)), name,
1919 num_runs, static_cast<int>(CalcMedian(throughput_ops_)));
1920 }
1921 }
1922
1923 private:
1924 double CalcAvg(std::vector<double> data) {
1925 double avg = 0;
1926 for (double x : data) {
1927 avg += x;
1928 }
1929 avg = avg / data.size();
1930 return avg;
1931 }
1932
1933 double CalcMedian(std::vector<double> data) {
1934 assert(data.size() > 0);
1935 std::sort(data.begin(), data.end());
1936
1937 size_t mid = data.size() / 2;
1938 if (data.size() % 2 == 1) {
1939 // Odd number of entries
1940 return data[mid];
1941 } else {
1942 // Even number of entries
1943 return (data[mid] + data[mid - 1]) / 2;
1944 }
1945 }
1946
1947 std::vector<double> throughput_ops_;
1948 std::vector<double> throughput_mbs_;
1949 };
1950
1951 class TimestampEmulator {
1952 private:
1953 std::atomic<uint64_t> timestamp_;
1954
1955 public:
1956 TimestampEmulator() : timestamp_(0) {}
1957 uint64_t Get() const { return timestamp_.load(); }
1958 void Inc() { timestamp_++; }
1959 };
1960
1961 // State shared by all concurrent executions of the same benchmark.
1962 struct SharedState {
1963 port::Mutex mu;
1964 port::CondVar cv;
1965 int total;
1966 int perf_level;
1967 std::shared_ptr<RateLimiter> write_rate_limiter;
1968 std::shared_ptr<RateLimiter> read_rate_limiter;
1969
1970 // Each thread goes through the following states:
1971 // (1) initializing
1972 // (2) waiting for others to be initialized
1973 // (3) running
1974 // (4) done
1975
1976 long num_initialized;
1977 long num_done;
1978 bool start;
1979
1980 SharedState() : cv(&mu), perf_level(FLAGS_perf_level) { }
1981 };
1982
1983 // Per-thread state for concurrent executions of the same benchmark.
1984 struct ThreadState {
1985 int tid; // 0..n-1 when running in n threads
1986 Random64 rand; // Has different seeds for different threads
1987 Stats stats;
1988 SharedState* shared;
1989
1990 /* implicit */ ThreadState(int index)
1991 : tid(index),
1992 rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {
1993 }
1994 };
1995
1996 class Duration {
1997 public:
1998 Duration(uint64_t max_seconds, int64_t max_ops, int64_t ops_per_stage = 0) {
1999 max_seconds_ = max_seconds;
2000 max_ops_= max_ops;
2001 ops_per_stage_ = (ops_per_stage > 0) ? ops_per_stage : max_ops;
2002 ops_ = 0;
2003 start_at_ = FLAGS_env->NowMicros();
2004 }
2005
2006 int64_t GetStage() { return std::min(ops_, max_ops_ - 1) / ops_per_stage_; }
2007
2008 bool Done(int64_t increment) {
2009 if (increment <= 0) increment = 1; // avoid Done(0) and infinite loops
2010 ops_ += increment;
2011
2012 if (max_seconds_) {
2013 // Recheck every appx 1000 ops (exact iff increment is factor of 1000)
2014 auto granularity = FLAGS_ops_between_duration_checks;
2015 if ((ops_ / granularity) != ((ops_ - increment) / granularity)) {
2016 uint64_t now = FLAGS_env->NowMicros();
2017 return ((now - start_at_) / 1000000) >= max_seconds_;
2018 } else {
2019 return false;
2020 }
2021 } else {
2022 return ops_ > max_ops_;
2023 }
2024 }
2025
2026 private:
2027 uint64_t max_seconds_;
2028 int64_t max_ops_;
2029 int64_t ops_per_stage_;
2030 int64_t ops_;
2031 uint64_t start_at_;
2032 };
2033
2034 class Benchmark {
2035 private:
2036 std::shared_ptr<Cache> cache_;
2037 std::shared_ptr<Cache> compressed_cache_;
2038 std::shared_ptr<const FilterPolicy> filter_policy_;
2039 const SliceTransform* prefix_extractor_;
2040 DBWithColumnFamilies db_;
2041 std::vector<DBWithColumnFamilies> multi_dbs_;
2042 int64_t num_;
2043 int value_size_;
2044 int key_size_;
2045 int prefix_size_;
2046 int64_t keys_per_prefix_;
2047 int64_t entries_per_batch_;
2048 int64_t writes_before_delete_range_;
2049 int64_t writes_per_range_tombstone_;
2050 int64_t range_tombstone_width_;
2051 int64_t max_num_range_tombstones_;
2052 WriteOptions write_options_;
2053 Options open_options_; // keep options around to properly destroy db later
2054 #ifndef ROCKSDB_LITE
2055 TraceOptions trace_options_;
2056 #endif
2057 int64_t reads_;
2058 int64_t deletes_;
2059 double read_random_exp_range_;
2060 int64_t writes_;
2061 int64_t readwrites_;
2062 int64_t merge_keys_;
2063 bool report_file_operations_;
2064 bool use_blob_db_;
2065 std::vector<std::string> keys_;
2066
2067 class ErrorHandlerListener : public EventListener {
2068 public:
2069 #ifndef ROCKSDB_LITE
2070 ErrorHandlerListener()
2071 : mutex_(),
2072 cv_(&mutex_),
2073 no_auto_recovery_(false),
2074 recovery_complete_(false) {}
2075
2076 ~ErrorHandlerListener() override {}
2077
2078 void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
2079 Status /*bg_error*/,
2080 bool* auto_recovery) override {
2081 if (*auto_recovery && no_auto_recovery_) {
2082 *auto_recovery = false;
2083 }
2084 }
2085
2086 void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
2087 InstrumentedMutexLock l(&mutex_);
2088 recovery_complete_ = true;
2089 cv_.SignalAll();
2090 }
2091
2092 bool WaitForRecovery(uint64_t /*abs_time_us*/) {
2093 InstrumentedMutexLock l(&mutex_);
2094 if (!recovery_complete_) {
2095 cv_.Wait(/*abs_time_us*/);
2096 }
2097 if (recovery_complete_) {
2098 recovery_complete_ = false;
2099 return true;
2100 }
2101 return false;
2102 }
2103
2104 void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
2105
2106 private:
2107 InstrumentedMutex mutex_;
2108 InstrumentedCondVar cv_;
2109 bool no_auto_recovery_;
2110 bool recovery_complete_;
2111 #else // ROCKSDB_LITE
2112 bool WaitForRecovery(uint64_t /*abs_time_us*/) { return true; }
2113 void EnableAutoRecovery(bool /*enable*/) {}
2114 #endif // ROCKSDB_LITE
2115 };
2116
2117 std::shared_ptr<ErrorHandlerListener> listener_;
2118
2119 bool SanityCheck() {
2120 if (FLAGS_compression_ratio > 1) {
2121 fprintf(stderr, "compression_ratio should be between 0 and 1\n");
2122 return false;
2123 }
2124 return true;
2125 }
2126
2127 inline bool CompressSlice(const CompressionInfo& compression_info,
2128 const Slice& input, std::string* compressed) {
2129 bool ok = true;
2130 switch (FLAGS_compression_type_e) {
2131 case rocksdb::kSnappyCompression:
2132 ok = Snappy_Compress(compression_info, input.data(), input.size(),
2133 compressed);
2134 break;
2135 case rocksdb::kZlibCompression:
2136 ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
2137 compressed);
2138 break;
2139 case rocksdb::kBZip2Compression:
2140 ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
2141 compressed);
2142 break;
2143 case rocksdb::kLZ4Compression:
2144 ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
2145 compressed);
2146 break;
2147 case rocksdb::kLZ4HCCompression:
2148 ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
2149 compressed);
2150 break;
2151 case rocksdb::kXpressCompression:
2152 ok = XPRESS_Compress(input.data(),
2153 input.size(), compressed);
2154 break;
2155 case rocksdb::kZSTD:
2156 ok = ZSTD_Compress(compression_info, input.data(), input.size(),
2157 compressed);
2158 break;
2159 default:
2160 ok = false;
2161 }
2162 return ok;
2163 }
2164
2165 void PrintHeader() {
2166 PrintEnvironment();
2167 fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size);
2168 fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
2169 FLAGS_value_size,
2170 static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
2171 fprintf(stdout, "Entries: %" PRIu64 "\n", num_);
2172 fprintf(stdout, "Prefix: %d bytes\n", FLAGS_prefix_size);
2173 fprintf(stdout, "Keys per prefix: %" PRIu64 "\n", keys_per_prefix_);
2174 fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
2175 ((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
2176 / 1048576.0));
2177 fprintf(stdout, "FileSize: %.1f MB (estimated)\n",
2178 (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
2179 * num_)
2180 / 1048576.0));
2181 fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n",
2182 FLAGS_benchmark_write_rate_limit);
2183 fprintf(stdout, "Read rate: %" PRIu64 " ops/second\n",
2184 FLAGS_benchmark_read_rate_limit);
2185 if (FLAGS_enable_numa) {
2186 fprintf(stderr, "Running in NUMA enabled mode.\n");
2187 #ifndef NUMA
2188 fprintf(stderr, "NUMA is not defined in the system.\n");
2189 exit(1);
2190 #else
2191 if (numa_available() == -1) {
2192 fprintf(stderr, "NUMA is not supported by the system.\n");
2193 exit(1);
2194 }
2195 #endif
2196 }
2197
2198 auto compression = CompressionTypeToString(FLAGS_compression_type_e);
2199 fprintf(stdout, "Compression: %s\n", compression.c_str());
2200 fprintf(stdout, "Compression sampling rate: %" PRId64 "\n",
2201 FLAGS_sample_for_compression);
2202
2203 switch (FLAGS_rep_factory) {
2204 case kPrefixHash:
2205 fprintf(stdout, "Memtablerep: prefix_hash\n");
2206 break;
2207 case kSkipList:
2208 fprintf(stdout, "Memtablerep: skip_list\n");
2209 break;
2210 case kVectorRep:
2211 fprintf(stdout, "Memtablerep: vector\n");
2212 break;
2213 case kHashLinkedList:
2214 fprintf(stdout, "Memtablerep: hash_linkedlist\n");
2215 break;
2216 }
2217 fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
2218
2219 PrintWarnings(compression.c_str());
2220 fprintf(stdout, "------------------------------------------------\n");
2221 }
2222
2223 void PrintWarnings(const char* compression) {
2224 #if defined(__GNUC__) && !defined(__OPTIMIZE__)
2225 fprintf(stdout,
2226 "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
2227 );
2228 #endif
2229 #ifndef NDEBUG
2230 fprintf(stdout,
2231 "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
2232 #endif
2233 if (FLAGS_compression_type_e != rocksdb::kNoCompression) {
2234 // The test string should not be too small.
2235 const int len = FLAGS_block_size;
2236 std::string input_str(len, 'y');
2237 std::string compressed;
2238 CompressionOptions opts;
2239 CompressionContext context(FLAGS_compression_type_e);
2240 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
2241 FLAGS_compression_type_e,
2242 FLAGS_sample_for_compression);
2243 bool result = CompressSlice(info, Slice(input_str), &compressed);
2244
2245 if (!result) {
2246 fprintf(stdout, "WARNING: %s compression is not enabled\n",
2247 compression);
2248 } else if (compressed.size() >= input_str.size()) {
2249 fprintf(stdout, "WARNING: %s compression is not effective\n",
2250 compression);
2251 }
2252 }
2253 }
2254
2255 // Current the following isn't equivalent to OS_LINUX.
2256 #if defined(__linux)
2257 static Slice TrimSpace(Slice s) {
2258 unsigned int start = 0;
2259 while (start < s.size() && isspace(s[start])) {
2260 start++;
2261 }
2262 unsigned int limit = static_cast<unsigned int>(s.size());
2263 while (limit > start && isspace(s[limit-1])) {
2264 limit--;
2265 }
2266 return Slice(s.data() + start, limit - start);
2267 }
2268 #endif
2269
2270 void PrintEnvironment() {
2271 fprintf(stderr, "RocksDB: version %d.%d\n",
2272 kMajorVersion, kMinorVersion);
2273
2274 #if defined(__linux)
2275 time_t now = time(nullptr);
2276 char buf[52];
2277 // Lint complains about ctime() usage, so replace it with ctime_r(). The
2278 // requirement is to provide a buffer which is at least 26 bytes.
2279 fprintf(stderr, "Date: %s",
2280 ctime_r(&now, buf)); // ctime_r() adds newline
2281
2282 FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
2283 if (cpuinfo != nullptr) {
2284 char line[1000];
2285 int num_cpus = 0;
2286 std::string cpu_type;
2287 std::string cache_size;
2288 while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
2289 const char* sep = strchr(line, ':');
2290 if (sep == nullptr) {
2291 continue;
2292 }
2293 Slice key = TrimSpace(Slice(line, sep - 1 - line));
2294 Slice val = TrimSpace(Slice(sep + 1));
2295 if (key == "model name") {
2296 ++num_cpus;
2297 cpu_type = val.ToString();
2298 } else if (key == "cache size") {
2299 cache_size = val.ToString();
2300 }
2301 }
2302 fclose(cpuinfo);
2303 fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str());
2304 fprintf(stderr, "CPUCache: %s\n", cache_size.c_str());
2305 }
2306 #endif
2307 }
2308
2309 static bool KeyExpired(const TimestampEmulator* timestamp_emulator,
2310 const Slice& key) {
2311 const char* pos = key.data();
2312 pos += 8;
2313 uint64_t timestamp = 0;
2314 if (port::kLittleEndian) {
2315 int bytes_to_fill = 8;
2316 for (int i = 0; i < bytes_to_fill; ++i) {
2317 timestamp |= (static_cast<uint64_t>(static_cast<unsigned char>(pos[i]))
2318 << ((bytes_to_fill - i - 1) << 3));
2319 }
2320 } else {
2321 memcpy(&timestamp, pos, sizeof(timestamp));
2322 }
2323 return timestamp_emulator->Get() - timestamp > FLAGS_time_range;
2324 }
2325
2326 class ExpiredTimeFilter : public CompactionFilter {
2327 public:
2328 explicit ExpiredTimeFilter(
2329 const std::shared_ptr<TimestampEmulator>& timestamp_emulator)
2330 : timestamp_emulator_(timestamp_emulator) {}
2331 bool Filter(int /*level*/, const Slice& key,
2332 const Slice& /*existing_value*/, std::string* /*new_value*/,
2333 bool* /*value_changed*/) const override {
2334 return KeyExpired(timestamp_emulator_.get(), key);
2335 }
2336 const char* Name() const override { return "ExpiredTimeFilter"; }
2337
2338 private:
2339 std::shared_ptr<TimestampEmulator> timestamp_emulator_;
2340 };
2341
2342 class KeepFilter : public CompactionFilter {
2343 public:
2344 bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
2345 std::string* /*new_value*/,
2346 bool* /*value_changed*/) const override {
2347 return false;
2348 }
2349
2350 const char* Name() const override { return "KeepFilter"; }
2351 };
2352
2353 std::shared_ptr<Cache> NewCache(int64_t capacity) {
2354 if (capacity <= 0) {
2355 return nullptr;
2356 }
2357 if (FLAGS_use_clock_cache) {
2358 auto cache = NewClockCache((size_t)capacity, FLAGS_cache_numshardbits);
2359 if (!cache) {
2360 fprintf(stderr, "Clock cache not supported.");
2361 exit(1);
2362 }
2363 return cache;
2364 } else {
2365 return NewLRUCache((size_t)capacity, FLAGS_cache_numshardbits,
2366 false /*strict_capacity_limit*/,
2367 FLAGS_cache_high_pri_pool_ratio);
2368 }
2369 }
2370
2371 public:
2372 Benchmark()
2373 : cache_(NewCache(FLAGS_cache_size)),
2374 compressed_cache_(NewCache(FLAGS_compressed_cache_size)),
2375 filter_policy_(FLAGS_bloom_bits >= 0
2376 ? NewBloomFilterPolicy(FLAGS_bloom_bits,
2377 FLAGS_use_block_based_filter)
2378 : nullptr),
2379 prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
2380 num_(FLAGS_num),
2381 value_size_(FLAGS_value_size),
2382 key_size_(FLAGS_key_size),
2383 prefix_size_(FLAGS_prefix_size),
2384 keys_per_prefix_(FLAGS_keys_per_prefix),
2385 entries_per_batch_(1),
2386 reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
2387 read_random_exp_range_(0.0),
2388 writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
2389 readwrites_(
2390 (FLAGS_writes < 0 && FLAGS_reads < 0)
2391 ? FLAGS_num
2392 : ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)),
2393 merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
2394 report_file_operations_(FLAGS_report_file_operations),
2395 #ifndef ROCKSDB_LITE
2396 use_blob_db_(FLAGS_use_blob_db)
2397 #else
2398 use_blob_db_(false)
2399 #endif // !ROCKSDB_LITE
2400 {
2401 // use simcache instead of cache
2402 if (FLAGS_simcache_size >= 0) {
2403 if (FLAGS_cache_numshardbits >= 1) {
2404 cache_ =
2405 NewSimCache(cache_, FLAGS_simcache_size, FLAGS_cache_numshardbits);
2406 } else {
2407 cache_ = NewSimCache(cache_, FLAGS_simcache_size, 0);
2408 }
2409 }
2410
2411 if (report_file_operations_) {
2412 if (!FLAGS_hdfs.empty()) {
2413 fprintf(stderr,
2414 "--hdfs and --report_file_operations cannot be enabled "
2415 "at the same time");
2416 exit(1);
2417 }
2418 FLAGS_env = new ReportFileOpEnv(rocksdb::Env::Default());
2419 }
2420
2421 if (FLAGS_prefix_size > FLAGS_key_size) {
2422 fprintf(stderr, "prefix size is larger than key size");
2423 exit(1);
2424 }
2425
2426 std::vector<std::string> files;
2427 FLAGS_env->GetChildren(FLAGS_db, &files);
2428 for (size_t i = 0; i < files.size(); i++) {
2429 if (Slice(files[i]).starts_with("heap-")) {
2430 FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
2431 }
2432 }
2433 if (!FLAGS_use_existing_db) {
2434 Options options;
2435 if (!FLAGS_wal_dir.empty()) {
2436 options.wal_dir = FLAGS_wal_dir;
2437 }
2438 #ifndef ROCKSDB_LITE
2439 if (use_blob_db_) {
2440 blob_db::DestroyBlobDB(FLAGS_db, options, blob_db::BlobDBOptions());
2441 }
2442 #endif // !ROCKSDB_LITE
2443 DestroyDB(FLAGS_db, options);
2444 if (!FLAGS_wal_dir.empty()) {
2445 FLAGS_env->DeleteDir(FLAGS_wal_dir);
2446 }
2447
2448 if (FLAGS_num_multi_db > 1) {
2449 FLAGS_env->CreateDir(FLAGS_db);
2450 if (!FLAGS_wal_dir.empty()) {
2451 FLAGS_env->CreateDir(FLAGS_wal_dir);
2452 }
2453 }
2454 }
2455
2456 listener_.reset(new ErrorHandlerListener());
2457 }
2458
2459 ~Benchmark() {
2460 db_.DeleteDBs();
2461 delete prefix_extractor_;
2462 if (cache_.get() != nullptr) {
2463 // this will leak, but we're shutting down so nobody cares
2464 cache_->DisownData();
2465 }
2466 }
2467
2468 Slice AllocateKey(std::unique_ptr<const char[]>* key_guard) {
2469 char* data = new char[key_size_];
2470 const char* const_data = data;
2471 key_guard->reset(const_data);
2472 return Slice(key_guard->get(), key_size_);
2473 }
2474
2475 // Generate key according to the given specification and random number.
2476 // The resulting key will have the following format (if keys_per_prefix_
2477 // is positive), extra trailing bytes are either cut off or padded with '0'.
2478 // The prefix value is derived from key value.
2479 // ----------------------------
2480 // | prefix 00000 | key 00000 |
2481 // ----------------------------
2482 // If keys_per_prefix_ is 0, the key is simply a binary representation of
2483 // random number followed by trailing '0's
2484 // ----------------------------
2485 // | key 00000 |
2486 // ----------------------------
2487 void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
2488 if (!keys_.empty()) {
2489 assert(FLAGS_use_existing_keys);
2490 assert(keys_.size() == static_cast<size_t>(num_keys));
2491 assert(v < static_cast<uint64_t>(num_keys));
2492 *key = keys_[v];
2493 return;
2494 }
2495 char* start = const_cast<char*>(key->data());
2496 char* pos = start;
2497 if (keys_per_prefix_ > 0) {
2498 int64_t num_prefix = num_keys / keys_per_prefix_;
2499 int64_t prefix = v % num_prefix;
2500 int bytes_to_fill = std::min(prefix_size_, 8);
2501 if (port::kLittleEndian) {
2502 for (int i = 0; i < bytes_to_fill; ++i) {
2503 pos[i] = (prefix >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
2504 }
2505 } else {
2506 memcpy(pos, static_cast<void*>(&prefix), bytes_to_fill);
2507 }
2508 if (prefix_size_ > 8) {
2509 // fill the rest with 0s
2510 memset(pos + 8, '0', prefix_size_ - 8);
2511 }
2512 pos += prefix_size_;
2513 }
2514
2515 int bytes_to_fill = std::min(key_size_ - static_cast<int>(pos - start), 8);
2516 if (port::kLittleEndian) {
2517 for (int i = 0; i < bytes_to_fill; ++i) {
2518 pos[i] = (v >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
2519 }
2520 } else {
2521 memcpy(pos, static_cast<void*>(&v), bytes_to_fill);
2522 }
2523 pos += bytes_to_fill;
2524 if (key_size_ > pos - start) {
2525 memset(pos, '0', key_size_ - (pos - start));
2526 }
2527 }
2528
2529 std::string GetPathForMultiple(std::string base_name, size_t id) {
2530 if (!base_name.empty()) {
2531 #ifndef OS_WIN
2532 if (base_name.back() != '/') {
2533 base_name += '/';
2534 }
2535 #else
2536 if (base_name.back() != '\\') {
2537 base_name += '\\';
2538 }
2539 #endif
2540 }
2541 return base_name + ToString(id);
2542 }
2543
2544 void VerifyDBFromDB(std::string& truth_db_name) {
2545 DBWithColumnFamilies truth_db;
2546 auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db);
2547 if (!s.ok()) {
2548 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2549 exit(1);
2550 }
2551 ReadOptions ro;
2552 ro.total_order_seek = true;
2553 std::unique_ptr<Iterator> truth_iter(truth_db.db->NewIterator(ro));
2554 std::unique_ptr<Iterator> db_iter(db_.db->NewIterator(ro));
2555 // Verify that all the key/values in truth_db are retrivable in db with ::Get
2556 fprintf(stderr, "Verifying db >= truth_db with ::Get...\n");
2557 for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) {
2558 std::string value;
2559 s = db_.db->Get(ro, truth_iter->key(), &value);
2560 assert(s.ok());
2561 // TODO(myabandeh): provide debugging hints
2562 assert(Slice(value) == truth_iter->value());
2563 }
2564 // Verify that the db iterator does not give any extra key/value
2565 fprintf(stderr, "Verifying db == truth_db...\n");
2566 for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next(), truth_iter->Next()) {
2567 assert(truth_iter->Valid());
2568 assert(truth_iter->value() == db_iter->value());
2569 }
2570 // No more key should be left unchecked in truth_db
2571 assert(!truth_iter->Valid());
2572 fprintf(stderr, "...Verified\n");
2573 }
2574
2575 void Run() {
2576 if (!SanityCheck()) {
2577 exit(1);
2578 }
2579 Open(&open_options_);
2580 PrintHeader();
2581 std::stringstream benchmark_stream(FLAGS_benchmarks);
2582 std::string name;
2583 std::unique_ptr<ExpiredTimeFilter> filter;
2584 while (std::getline(benchmark_stream, name, ',')) {
2585 // Sanitize parameters
2586 num_ = FLAGS_num;
2587 reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
2588 writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
2589 deletes_ = (FLAGS_deletes < 0 ? FLAGS_num : FLAGS_deletes);
2590 value_size_ = FLAGS_value_size;
2591 key_size_ = FLAGS_key_size;
2592 entries_per_batch_ = FLAGS_batch_size;
2593 writes_before_delete_range_ = FLAGS_writes_before_delete_range;
2594 writes_per_range_tombstone_ = FLAGS_writes_per_range_tombstone;
2595 range_tombstone_width_ = FLAGS_range_tombstone_width;
2596 max_num_range_tombstones_ = FLAGS_max_num_range_tombstones;
2597 write_options_ = WriteOptions();
2598 read_random_exp_range_ = FLAGS_read_random_exp_range;
2599 if (FLAGS_sync) {
2600 write_options_.sync = true;
2601 }
2602 write_options_.disableWAL = FLAGS_disable_wal;
2603
2604 void (Benchmark::*method)(ThreadState*) = nullptr;
2605 void (Benchmark::*post_process_method)() = nullptr;
2606
2607 bool fresh_db = false;
2608 int num_threads = FLAGS_threads;
2609
2610 int num_repeat = 1;
2611 int num_warmup = 0;
2612 if (!name.empty() && *name.rbegin() == ']') {
2613 auto it = name.find('[');
2614 if (it == std::string::npos) {
2615 fprintf(stderr, "unknown benchmark arguments '%s'\n", name.c_str());
2616 exit(1);
2617 }
2618 std::string args = name.substr(it + 1);
2619 args.resize(args.size() - 1);
2620 name.resize(it);
2621
2622 std::string bench_arg;
2623 std::stringstream args_stream(args);
2624 while (std::getline(args_stream, bench_arg, '-')) {
2625 if (bench_arg.empty()) {
2626 continue;
2627 }
2628 if (bench_arg[0] == 'X') {
2629 // Repeat the benchmark n times
2630 std::string num_str = bench_arg.substr(1);
2631 num_repeat = std::stoi(num_str);
2632 } else if (bench_arg[0] == 'W') {
2633 // Warm up the benchmark for n times
2634 std::string num_str = bench_arg.substr(1);
2635 num_warmup = std::stoi(num_str);
2636 }
2637 }
2638 }
2639
2640 // Both fillseqdeterministic and filluniquerandomdeterministic
2641 // fill the levels except the max level with UNIQUE_RANDOM
2642 // and fill the max level with fillseq and filluniquerandom, respectively
2643 if (name == "fillseqdeterministic" ||
2644 name == "filluniquerandomdeterministic") {
2645 if (!FLAGS_disable_auto_compactions) {
2646 fprintf(stderr,
2647 "Please disable_auto_compactions in FillDeterministic "
2648 "benchmark\n");
2649 exit(1);
2650 }
2651 if (num_threads > 1) {
2652 fprintf(stderr,
2653 "filldeterministic multithreaded not supported"
2654 ", use 1 thread\n");
2655 num_threads = 1;
2656 }
2657 fresh_db = true;
2658 if (name == "fillseqdeterministic") {
2659 method = &Benchmark::WriteSeqDeterministic;
2660 } else {
2661 method = &Benchmark::WriteUniqueRandomDeterministic;
2662 }
2663 } else if (name == "fillseq") {
2664 fresh_db = true;
2665 method = &Benchmark::WriteSeq;
2666 } else if (name == "fillbatch") {
2667 fresh_db = true;
2668 entries_per_batch_ = 1000;
2669 method = &Benchmark::WriteSeq;
2670 } else if (name == "fillrandom") {
2671 fresh_db = true;
2672 method = &Benchmark::WriteRandom;
2673 } else if (name == "filluniquerandom") {
2674 fresh_db = true;
2675 if (num_threads > 1) {
2676 fprintf(stderr,
2677 "filluniquerandom multithreaded not supported"
2678 ", use 1 thread");
2679 num_threads = 1;
2680 }
2681 method = &Benchmark::WriteUniqueRandom;
2682 } else if (name == "overwrite") {
2683 method = &Benchmark::WriteRandom;
2684 } else if (name == "fillsync") {
2685 fresh_db = true;
2686 num_ /= 1000;
2687 write_options_.sync = true;
2688 method = &Benchmark::WriteRandom;
2689 } else if (name == "fill100K") {
2690 fresh_db = true;
2691 num_ /= 1000;
2692 value_size_ = 100 * 1000;
2693 method = &Benchmark::WriteRandom;
2694 } else if (name == "readseq") {
2695 method = &Benchmark::ReadSequential;
2696 } else if (name == "readtocache") {
2697 method = &Benchmark::ReadSequential;
2698 num_threads = 1;
2699 reads_ = num_;
2700 } else if (name == "readreverse") {
2701 method = &Benchmark::ReadReverse;
2702 } else if (name == "readrandom") {
2703 method = &Benchmark::ReadRandom;
2704 } else if (name == "readrandomfast") {
2705 method = &Benchmark::ReadRandomFast;
2706 } else if (name == "multireadrandom") {
2707 fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
2708 entries_per_batch_);
2709 method = &Benchmark::MultiReadRandom;
2710 } else if (name == "mixgraph") {
2711 method = &Benchmark::MixGraph;
2712 } else if (name == "readmissing") {
2713 ++key_size_;
2714 method = &Benchmark::ReadRandom;
2715 } else if (name == "newiterator") {
2716 method = &Benchmark::IteratorCreation;
2717 } else if (name == "newiteratorwhilewriting") {
2718 num_threads++; // Add extra thread for writing
2719 method = &Benchmark::IteratorCreationWhileWriting;
2720 } else if (name == "seekrandom") {
2721 method = &Benchmark::SeekRandom;
2722 } else if (name == "seekrandomwhilewriting") {
2723 num_threads++; // Add extra thread for writing
2724 method = &Benchmark::SeekRandomWhileWriting;
2725 } else if (name == "seekrandomwhilemerging") {
2726 num_threads++; // Add extra thread for merging
2727 method = &Benchmark::SeekRandomWhileMerging;
2728 } else if (name == "readrandomsmall") {
2729 reads_ /= 1000;
2730 method = &Benchmark::ReadRandom;
2731 } else if (name == "deleteseq") {
2732 method = &Benchmark::DeleteSeq;
2733 } else if (name == "deleterandom") {
2734 method = &Benchmark::DeleteRandom;
2735 } else if (name == "readwhilewriting") {
2736 num_threads++; // Add extra thread for writing
2737 method = &Benchmark::ReadWhileWriting;
2738 } else if (name == "readwhilemerging") {
2739 num_threads++; // Add extra thread for writing
2740 method = &Benchmark::ReadWhileMerging;
2741 } else if (name == "readwhilescanning") {
2742 num_threads++; // Add extra thread for scaning
2743 method = &Benchmark::ReadWhileScanning;
2744 } else if (name == "readrandomwriterandom") {
2745 method = &Benchmark::ReadRandomWriteRandom;
2746 } else if (name == "readrandommergerandom") {
2747 if (FLAGS_merge_operator.empty()) {
2748 fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
2749 name.c_str());
2750 exit(1);
2751 }
2752 method = &Benchmark::ReadRandomMergeRandom;
2753 } else if (name == "updaterandom") {
2754 method = &Benchmark::UpdateRandom;
2755 } else if (name == "xorupdaterandom") {
2756 method = &Benchmark::XORUpdateRandom;
2757 } else if (name == "appendrandom") {
2758 method = &Benchmark::AppendRandom;
2759 } else if (name == "mergerandom") {
2760 if (FLAGS_merge_operator.empty()) {
2761 fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
2762 name.c_str());
2763 exit(1);
2764 }
2765 method = &Benchmark::MergeRandom;
2766 } else if (name == "randomwithverify") {
2767 method = &Benchmark::RandomWithVerify;
2768 } else if (name == "fillseekseq") {
2769 method = &Benchmark::WriteSeqSeekSeq;
2770 } else if (name == "compact") {
2771 method = &Benchmark::Compact;
2772 } else if (name == "compactall") {
2773 CompactAll();
2774 } else if (name == "crc32c") {
2775 method = &Benchmark::Crc32c;
2776 } else if (name == "xxhash") {
2777 method = &Benchmark::xxHash;
2778 } else if (name == "acquireload") {
2779 method = &Benchmark::AcquireLoad;
2780 } else if (name == "compress") {
2781 method = &Benchmark::Compress;
2782 } else if (name == "uncompress") {
2783 method = &Benchmark::Uncompress;
2784 #ifndef ROCKSDB_LITE
2785 } else if (name == "randomtransaction") {
2786 method = &Benchmark::RandomTransaction;
2787 post_process_method = &Benchmark::RandomTransactionVerify;
2788 #endif // ROCKSDB_LITE
2789 } else if (name == "randomreplacekeys") {
2790 fresh_db = true;
2791 method = &Benchmark::RandomReplaceKeys;
2792 } else if (name == "timeseries") {
2793 timestamp_emulator_.reset(new TimestampEmulator());
2794 if (FLAGS_expire_style == "compaction_filter") {
2795 filter.reset(new ExpiredTimeFilter(timestamp_emulator_));
2796 fprintf(stdout, "Compaction filter is used to remove expired data");
2797 open_options_.compaction_filter = filter.get();
2798 }
2799 fresh_db = true;
2800 method = &Benchmark::TimeSeries;
2801 } else if (name == "stats") {
2802 PrintStats("rocksdb.stats");
2803 } else if (name == "resetstats") {
2804 ResetStats();
2805 } else if (name == "verify") {
2806 VerifyDBFromDB(FLAGS_truth_db);
2807 } else if (name == "levelstats") {
2808 PrintStats("rocksdb.levelstats");
2809 } else if (name == "sstables") {
2810 PrintStats("rocksdb.sstables");
2811 } else if (name == "replay") {
2812 if (num_threads > 1) {
2813 fprintf(stderr, "Multi-threaded replay is not yet supported\n");
2814 exit(1);
2815 }
2816 if (FLAGS_trace_file == "") {
2817 fprintf(stderr, "Please set --trace_file to be replayed from\n");
2818 exit(1);
2819 }
2820 method = &Benchmark::Replay;
2821 } else if (!name.empty()) { // No error message for empty name
2822 fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
2823 exit(1);
2824 }
2825
2826 if (fresh_db) {
2827 if (FLAGS_use_existing_db) {
2828 fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
2829 name.c_str());
2830 method = nullptr;
2831 } else {
2832 if (db_.db != nullptr) {
2833 db_.DeleteDBs();
2834 DestroyDB(FLAGS_db, open_options_);
2835 }
2836 Options options = open_options_;
2837 for (size_t i = 0; i < multi_dbs_.size(); i++) {
2838 delete multi_dbs_[i].db;
2839 if (!open_options_.wal_dir.empty()) {
2840 options.wal_dir = GetPathForMultiple(open_options_.wal_dir, i);
2841 }
2842 DestroyDB(GetPathForMultiple(FLAGS_db, i), options);
2843 }
2844 multi_dbs_.clear();
2845 }
2846 Open(&open_options_); // use open_options for the last accessed
2847 }
2848
2849 if (method != nullptr) {
2850 fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2851
2852 #ifndef ROCKSDB_LITE
2853 // A trace_file option can be provided both for trace and replay
2854 // operations. But db_bench does not support tracing and replaying at
2855 // the same time, for now. So, start tracing only when it is not a
2856 // replay.
2857 if (FLAGS_trace_file != "" && name != "replay") {
2858 std::unique_ptr<TraceWriter> trace_writer;
2859 Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(),
2860 FLAGS_trace_file, &trace_writer);
2861 if (!s.ok()) {
2862 fprintf(stderr, "Encountered an error starting a trace, %s\n",
2863 s.ToString().c_str());
2864 exit(1);
2865 }
2866 s = db_.db->StartTrace(trace_options_, std::move(trace_writer));
2867 if (!s.ok()) {
2868 fprintf(stderr, "Encountered an error starting a trace, %s\n",
2869 s.ToString().c_str());
2870 exit(1);
2871 }
2872 fprintf(stdout, "Tracing the workload to: [%s]\n",
2873 FLAGS_trace_file.c_str());
2874 }
2875 #endif // ROCKSDB_LITE
2876
2877 if (num_warmup > 0) {
2878 printf("Warming up benchmark by running %d times\n", num_warmup);
2879 }
2880
2881 for (int i = 0; i < num_warmup; i++) {
2882 RunBenchmark(num_threads, name, method);
2883 }
2884
2885 if (num_repeat > 1) {
2886 printf("Running benchmark for %d times\n", num_repeat);
2887 }
2888
2889 CombinedStats combined_stats;
2890 for (int i = 0; i < num_repeat; i++) {
2891 Stats stats = RunBenchmark(num_threads, name, method);
2892 combined_stats.AddStats(stats);
2893 }
2894 if (num_repeat > 1) {
2895 combined_stats.Report(name);
2896 }
2897 }
2898 if (post_process_method != nullptr) {
2899 (this->*post_process_method)();
2900 }
2901 }
2902
2903 #ifndef ROCKSDB_LITE
2904 if (name != "replay" && FLAGS_trace_file != "") {
2905 Status s = db_.db->EndTrace();
2906 if (!s.ok()) {
2907 fprintf(stderr, "Encountered an error ending the trace, %s\n",
2908 s.ToString().c_str());
2909 }
2910 }
2911 #endif // ROCKSDB_LITE
2912
2913 if (FLAGS_statistics) {
2914 fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
2915 }
2916 if (FLAGS_simcache_size >= 0) {
2917 fprintf(stdout, "SIMULATOR CACHE STATISTICS:\n%s\n",
2918 static_cast_with_check<SimCache, Cache>(cache_.get())
2919 ->ToString()
2920 .c_str());
2921 }
2922 }
2923
2924 private:
2925 std::shared_ptr<TimestampEmulator> timestamp_emulator_;
2926
2927 struct ThreadArg {
2928 Benchmark* bm;
2929 SharedState* shared;
2930 ThreadState* thread;
2931 void (Benchmark::*method)(ThreadState*);
2932 };
2933
2934 static void ThreadBody(void* v) {
2935 ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
2936 SharedState* shared = arg->shared;
2937 ThreadState* thread = arg->thread;
2938 {
2939 MutexLock l(&shared->mu);
2940 shared->num_initialized++;
2941 if (shared->num_initialized >= shared->total) {
2942 shared->cv.SignalAll();
2943 }
2944 while (!shared->start) {
2945 shared->cv.Wait();
2946 }
2947 }
2948
2949 SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
2950 perf_context.EnablePerLevelPerfContext();
2951 thread->stats.Start(thread->tid);
2952 (arg->bm->*(arg->method))(thread);
2953 thread->stats.Stop();
2954
2955 {
2956 MutexLock l(&shared->mu);
2957 shared->num_done++;
2958 if (shared->num_done >= shared->total) {
2959 shared->cv.SignalAll();
2960 }
2961 }
2962 }
2963
2964 Stats RunBenchmark(int n, Slice name,
2965 void (Benchmark::*method)(ThreadState*)) {
2966 SharedState shared;
2967 shared.total = n;
2968 shared.num_initialized = 0;
2969 shared.num_done = 0;
2970 shared.start = false;
2971 if (FLAGS_benchmark_write_rate_limit > 0) {
2972 shared.write_rate_limiter.reset(
2973 NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
2974 }
2975 if (FLAGS_benchmark_read_rate_limit > 0) {
2976 shared.read_rate_limiter.reset(NewGenericRateLimiter(
2977 FLAGS_benchmark_read_rate_limit, 100000 /* refill_period_us */,
2978 10 /* fairness */, RateLimiter::Mode::kReadsOnly));
2979 }
2980
2981 std::unique_ptr<ReporterAgent> reporter_agent;
2982 if (FLAGS_report_interval_seconds > 0) {
2983 reporter_agent.reset(new ReporterAgent(FLAGS_env, FLAGS_report_file,
2984 FLAGS_report_interval_seconds));
2985 }
2986
2987 ThreadArg* arg = new ThreadArg[n];
2988
2989 for (int i = 0; i < n; i++) {
2990 #ifdef NUMA
2991 if (FLAGS_enable_numa) {
2992 // Performs a local allocation of memory to threads in numa node.
2993 int n_nodes = numa_num_task_nodes(); // Number of nodes in NUMA.
2994 numa_exit_on_error = 1;
2995 int numa_node = i % n_nodes;
2996 bitmask* nodes = numa_allocate_nodemask();
2997 numa_bitmask_clearall(nodes);
2998 numa_bitmask_setbit(nodes, numa_node);
2999 // numa_bind() call binds the process to the node and these
3000 // properties are passed on to the thread that is created in
3001 // StartThread method called later in the loop.
3002 numa_bind(nodes);
3003 numa_set_strict(1);
3004 numa_free_nodemask(nodes);
3005 }
3006 #endif
3007 arg[i].bm = this;
3008 arg[i].method = method;
3009 arg[i].shared = &shared;
3010 arg[i].thread = new ThreadState(i);
3011 arg[i].thread->stats.SetReporterAgent(reporter_agent.get());
3012 arg[i].thread->shared = &shared;
3013 FLAGS_env->StartThread(ThreadBody, &arg[i]);
3014 }
3015
3016 shared.mu.Lock();
3017 while (shared.num_initialized < n) {
3018 shared.cv.Wait();
3019 }
3020
3021 shared.start = true;
3022 shared.cv.SignalAll();
3023 while (shared.num_done < n) {
3024 shared.cv.Wait();
3025 }
3026 shared.mu.Unlock();
3027
3028 // Stats for some threads can be excluded.
3029 Stats merge_stats;
3030 for (int i = 0; i < n; i++) {
3031 merge_stats.Merge(arg[i].thread->stats);
3032 }
3033 merge_stats.Report(name);
3034
3035 for (int i = 0; i < n; i++) {
3036 delete arg[i].thread;
3037 }
3038 delete[] arg;
3039
3040 return merge_stats;
3041 }
3042
3043 void Crc32c(ThreadState* thread) {
3044 // Checksum about 500MB of data total
3045 const int size = FLAGS_block_size; // use --block_size option for db_bench
3046 std::string labels = "(" + ToString(FLAGS_block_size) + " per op)";
3047 const char* label = labels.c_str();
3048
3049 std::string data(size, 'x');
3050 int64_t bytes = 0;
3051 uint32_t crc = 0;
3052 while (bytes < 500 * 1048576) {
3053 crc = crc32c::Value(data.data(), size);
3054 thread->stats.FinishedOps(nullptr, nullptr, 1, kCrc);
3055 bytes += size;
3056 }
3057 // Print so result is not dead
3058 fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
3059
3060 thread->stats.AddBytes(bytes);
3061 thread->stats.AddMessage(label);
3062 }
3063
3064 void xxHash(ThreadState* thread) {
3065 // Checksum about 500MB of data total
3066 const int size = 4096;
3067 const char* label = "(4K per op)";
3068 std::string data(size, 'x');
3069 int64_t bytes = 0;
3070 unsigned int xxh32 = 0;
3071 while (bytes < 500 * 1048576) {
3072 xxh32 = XXH32(data.data(), size, 0);
3073 thread->stats.FinishedOps(nullptr, nullptr, 1, kHash);
3074 bytes += size;
3075 }
3076 // Print so result is not dead
3077 fprintf(stderr, "... xxh32=0x%x\r", static_cast<unsigned int>(xxh32));
3078
3079 thread->stats.AddBytes(bytes);
3080 thread->stats.AddMessage(label);
3081 }
3082
3083 void AcquireLoad(ThreadState* thread) {
3084 int dummy;
3085 std::atomic<void*> ap(&dummy);
3086 int count = 0;
3087 void *ptr = nullptr;
3088 thread->stats.AddMessage("(each op is 1000 loads)");
3089 while (count < 100000) {
3090 for (int i = 0; i < 1000; i++) {
3091 ptr = ap.load(std::memory_order_acquire);
3092 }
3093 count++;
3094 thread->stats.FinishedOps(nullptr, nullptr, 1, kOthers);
3095 }
3096 if (ptr == nullptr) exit(1); // Disable unused variable warning.
3097 }
3098
3099 void Compress(ThreadState *thread) {
3100 RandomGenerator gen;
3101 Slice input = gen.Generate(FLAGS_block_size);
3102 int64_t bytes = 0;
3103 int64_t produced = 0;
3104 bool ok = true;
3105 std::string compressed;
3106 CompressionOptions opts;
3107 CompressionContext context(FLAGS_compression_type_e);
3108 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
3109 FLAGS_compression_type_e,
3110 FLAGS_sample_for_compression);
3111 // Compress 1G
3112 while (ok && bytes < int64_t(1) << 30) {
3113 compressed.clear();
3114 ok = CompressSlice(info, input, &compressed);
3115 produced += compressed.size();
3116 bytes += input.size();
3117 thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
3118 }
3119
3120 if (!ok) {
3121 thread->stats.AddMessage("(compression failure)");
3122 } else {
3123 char buf[340];
3124 snprintf(buf, sizeof(buf), "(output: %.1f%%)",
3125 (produced * 100.0) / bytes);
3126 thread->stats.AddMessage(buf);
3127 thread->stats.AddBytes(bytes);
3128 }
3129 }
3130
3131 void Uncompress(ThreadState *thread) {
3132 RandomGenerator gen;
3133 Slice input = gen.Generate(FLAGS_block_size);
3134 std::string compressed;
3135
3136 CompressionContext compression_ctx(FLAGS_compression_type_e);
3137 CompressionOptions compression_opts;
3138 CompressionInfo compression_info(
3139 compression_opts, compression_ctx, CompressionDict::GetEmptyDict(),
3140 FLAGS_compression_type_e, FLAGS_sample_for_compression);
3141 UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
3142 UncompressionInfo uncompression_info(uncompression_ctx,
3143 UncompressionDict::GetEmptyDict(),
3144 FLAGS_compression_type_e);
3145
3146 bool ok = CompressSlice(compression_info, input, &compressed);
3147 int64_t bytes = 0;
3148 int decompress_size;
3149 while (ok && bytes < 1024 * 1048576) {
3150 CacheAllocationPtr uncompressed;
3151 switch (FLAGS_compression_type_e) {
3152 case rocksdb::kSnappyCompression: {
3153 // get size and allocate here to make comparison fair
3154 size_t ulength = 0;
3155 if (!Snappy_GetUncompressedLength(compressed.data(),
3156 compressed.size(), &ulength)) {
3157 ok = false;
3158 break;
3159 }
3160 uncompressed = AllocateBlock(ulength, nullptr);
3161 ok = Snappy_Uncompress(compressed.data(), compressed.size(),
3162 uncompressed.get());
3163 break;
3164 }
3165 case rocksdb::kZlibCompression:
3166 uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(),
3167 compressed.size(), &decompress_size, 2);
3168 ok = uncompressed.get() != nullptr;
3169 break;
3170 case rocksdb::kBZip2Compression:
3171 uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
3172 &decompress_size, 2);
3173 ok = uncompressed.get() != nullptr;
3174 break;
3175 case rocksdb::kLZ4Compression:
3176 uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
3177 compressed.size(), &decompress_size, 2);
3178 ok = uncompressed.get() != nullptr;
3179 break;
3180 case rocksdb::kLZ4HCCompression:
3181 uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
3182 compressed.size(), &decompress_size, 2);
3183 ok = uncompressed.get() != nullptr;
3184 break;
3185 case rocksdb::kXpressCompression:
3186 uncompressed.reset(XPRESS_Uncompress(
3187 compressed.data(), compressed.size(), &decompress_size));
3188 ok = uncompressed.get() != nullptr;
3189 break;
3190 case rocksdb::kZSTD:
3191 uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
3192 compressed.size(), &decompress_size);
3193 ok = uncompressed.get() != nullptr;
3194 break;
3195 default:
3196 ok = false;
3197 }
3198 bytes += input.size();
3199 thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
3200 }
3201
3202 if (!ok) {
3203 thread->stats.AddMessage("(compression failure)");
3204 } else {
3205 thread->stats.AddBytes(bytes);
3206 }
3207 }
3208
3209 // Returns true if the options is initialized from the specified
3210 // options file.
3211 bool InitializeOptionsFromFile(Options* opts) {
3212 #ifndef ROCKSDB_LITE
3213 printf("Initializing RocksDB Options from the specified file\n");
3214 DBOptions db_opts;
3215 std::vector<ColumnFamilyDescriptor> cf_descs;
3216 if (FLAGS_options_file != "") {
3217 auto s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), &db_opts,
3218 &cf_descs);
3219 if (s.ok()) {
3220 *opts = Options(db_opts, cf_descs[0].options);
3221 return true;
3222 }
3223 fprintf(stderr, "Unable to load options file %s --- %s\n",
3224 FLAGS_options_file.c_str(), s.ToString().c_str());
3225 exit(1);
3226 }
3227 #else
3228 (void)opts;
3229 #endif
3230 return false;
3231 }
3232
3233 void InitializeOptionsFromFlags(Options* opts) {
3234 printf("Initializing RocksDB Options from command-line flags\n");
3235 Options& options = *opts;
3236
3237 assert(db_.db == nullptr);
3238
3239 options.max_open_files = FLAGS_open_files;
3240 if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) {
3241 options.write_buffer_manager.reset(
3242 new WriteBufferManager(FLAGS_db_write_buffer_size, cache_));
3243 }
3244 options.write_buffer_size = FLAGS_write_buffer_size;
3245 options.max_write_buffer_number = FLAGS_max_write_buffer_number;
3246 options.min_write_buffer_number_to_merge =
3247 FLAGS_min_write_buffer_number_to_merge;
3248 options.max_write_buffer_number_to_maintain =
3249 FLAGS_max_write_buffer_number_to_maintain;
3250 options.max_background_jobs = FLAGS_max_background_jobs;
3251 options.max_background_compactions = FLAGS_max_background_compactions;
3252 options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
3253 options.max_background_flushes = FLAGS_max_background_flushes;
3254 options.compaction_style = FLAGS_compaction_style_e;
3255 options.compaction_pri = FLAGS_compaction_pri_e;
3256 options.allow_mmap_reads = FLAGS_mmap_read;
3257 options.allow_mmap_writes = FLAGS_mmap_write;
3258 options.use_direct_reads = FLAGS_use_direct_reads;
3259 options.use_direct_io_for_flush_and_compaction =
3260 FLAGS_use_direct_io_for_flush_and_compaction;
3261 #ifndef ROCKSDB_LITE
3262 options.ttl = FLAGS_fifo_compaction_ttl;
3263 options.compaction_options_fifo = CompactionOptionsFIFO(
3264 FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024,
3265 FLAGS_fifo_compaction_allow_compaction);
3266 #endif // ROCKSDB_LITE
3267 if (FLAGS_prefix_size != 0) {
3268 options.prefix_extractor.reset(
3269 NewFixedPrefixTransform(FLAGS_prefix_size));
3270 }
3271 if (FLAGS_use_uint64_comparator) {
3272 options.comparator = test::Uint64Comparator();
3273 if (FLAGS_key_size != 8) {
3274 fprintf(stderr, "Using Uint64 comparator but key size is not 8.\n");
3275 exit(1);
3276 }
3277 }
3278 if (FLAGS_use_stderr_info_logger) {
3279 options.info_log.reset(new StderrLogger());
3280 }
3281 options.memtable_huge_page_size = FLAGS_memtable_use_huge_page ? 2048 : 0;
3282 options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_bloom_size_ratio;
3283 options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
3284 if (FLAGS_memtable_insert_with_hint_prefix_size > 0) {
3285 options.memtable_insert_with_hint_prefix_extractor.reset(
3286 NewCappedPrefixTransform(
3287 FLAGS_memtable_insert_with_hint_prefix_size));
3288 }
3289 options.bloom_locality = FLAGS_bloom_locality;
3290 options.max_file_opening_threads = FLAGS_file_opening_threads;
3291 options.new_table_reader_for_compaction_inputs =
3292 FLAGS_new_table_reader_for_compaction_inputs;
3293 options.compaction_readahead_size = FLAGS_compaction_readahead_size;
3294 options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
3295 options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
3296 options.use_fsync = FLAGS_use_fsync;
3297 options.num_levels = FLAGS_num_levels;
3298 options.target_file_size_base = FLAGS_target_file_size_base;
3299 options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
3300 options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
3301 options.level_compaction_dynamic_level_bytes =
3302 FLAGS_level_compaction_dynamic_level_bytes;
3303 options.max_bytes_for_level_multiplier =
3304 FLAGS_max_bytes_for_level_multiplier;
3305 if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash ||
3306 FLAGS_rep_factory == kHashLinkedList)) {
3307 fprintf(stderr, "prefix_size should be non-zero if PrefixHash or "
3308 "HashLinkedList memtablerep is used\n");
3309 exit(1);
3310 }
3311 switch (FLAGS_rep_factory) {
3312 case kSkipList:
3313 options.memtable_factory.reset(new SkipListFactory(
3314 FLAGS_skip_list_lookahead));
3315 break;
3316 #ifndef ROCKSDB_LITE
3317 case kPrefixHash:
3318 options.memtable_factory.reset(
3319 NewHashSkipListRepFactory(FLAGS_hash_bucket_count));
3320 break;
3321 case kHashLinkedList:
3322 options.memtable_factory.reset(NewHashLinkListRepFactory(
3323 FLAGS_hash_bucket_count));
3324 break;
3325 case kVectorRep:
3326 options.memtable_factory.reset(
3327 new VectorRepFactory
3328 );
3329 break;
3330 #else
3331 default:
3332 fprintf(stderr, "Only skip list is supported in lite mode\n");
3333 exit(1);
3334 #endif // ROCKSDB_LITE
3335 }
3336 if (FLAGS_use_plain_table) {
3337 #ifndef ROCKSDB_LITE
3338 if (FLAGS_rep_factory != kPrefixHash &&
3339 FLAGS_rep_factory != kHashLinkedList) {
3340 fprintf(stderr, "Waring: plain table is used with skipList\n");
3341 }
3342
3343 int bloom_bits_per_key = FLAGS_bloom_bits;
3344 if (bloom_bits_per_key < 0) {
3345 bloom_bits_per_key = 0;
3346 }
3347
3348 PlainTableOptions plain_table_options;
3349 plain_table_options.user_key_len = FLAGS_key_size;
3350 plain_table_options.bloom_bits_per_key = bloom_bits_per_key;
3351 plain_table_options.hash_table_ratio = 0.75;
3352 options.table_factory = std::shared_ptr<TableFactory>(
3353 NewPlainTableFactory(plain_table_options));
3354 #else
3355 fprintf(stderr, "Plain table is not supported in lite mode\n");
3356 exit(1);
3357 #endif // ROCKSDB_LITE
3358 } else if (FLAGS_use_cuckoo_table) {
3359 #ifndef ROCKSDB_LITE
3360 if (FLAGS_cuckoo_hash_ratio > 1 || FLAGS_cuckoo_hash_ratio < 0) {
3361 fprintf(stderr, "Invalid cuckoo_hash_ratio\n");
3362 exit(1);
3363 }
3364
3365 if (!FLAGS_mmap_read) {
3366 fprintf(stderr, "cuckoo table format requires mmap read to operate\n");
3367 exit(1);
3368 }
3369
3370 rocksdb::CuckooTableOptions table_options;
3371 table_options.hash_table_ratio = FLAGS_cuckoo_hash_ratio;
3372 table_options.identity_as_first_hash = FLAGS_identity_as_first_hash;
3373 options.table_factory = std::shared_ptr<TableFactory>(
3374 NewCuckooTableFactory(table_options));
3375 #else
3376 fprintf(stderr, "Cuckoo table is not supported in lite mode\n");
3377 exit(1);
3378 #endif // ROCKSDB_LITE
3379 } else {
3380 BlockBasedTableOptions block_based_options;
3381 if (FLAGS_use_hash_search) {
3382 if (FLAGS_prefix_size == 0) {
3383 fprintf(stderr,
3384 "prefix_size not assigned when enable use_hash_search \n");
3385 exit(1);
3386 }
3387 block_based_options.index_type = BlockBasedTableOptions::kHashSearch;
3388 } else {
3389 block_based_options.index_type = BlockBasedTableOptions::kBinarySearch;
3390 }
3391 if (FLAGS_partition_index_and_filters || FLAGS_partition_index) {
3392 if (FLAGS_use_hash_search) {
3393 fprintf(stderr,
3394 "use_hash_search is incompatible with "
3395 "partition index and is ignored");
3396 }
3397 block_based_options.index_type =
3398 BlockBasedTableOptions::kTwoLevelIndexSearch;
3399 block_based_options.metadata_block_size = FLAGS_metadata_block_size;
3400 if (FLAGS_partition_index_and_filters) {
3401 block_based_options.partition_filters = true;
3402 }
3403 }
3404 if (cache_ == nullptr) {
3405 block_based_options.no_block_cache = true;
3406 }
3407 block_based_options.cache_index_and_filter_blocks =
3408 FLAGS_cache_index_and_filter_blocks;
3409 block_based_options.pin_l0_filter_and_index_blocks_in_cache =
3410 FLAGS_pin_l0_filter_and_index_blocks_in_cache;
3411 block_based_options.pin_top_level_index_and_filter =
3412 FLAGS_pin_top_level_index_and_filter;
3413 if (FLAGS_cache_high_pri_pool_ratio > 1e-6) { // > 0.0 + eps
3414 block_based_options.cache_index_and_filter_blocks_with_high_priority =
3415 true;
3416 }
3417 block_based_options.block_cache = cache_;
3418 block_based_options.block_cache_compressed = compressed_cache_;
3419 block_based_options.block_size = FLAGS_block_size;
3420 block_based_options.block_restart_interval = FLAGS_block_restart_interval;
3421 block_based_options.index_block_restart_interval =
3422 FLAGS_index_block_restart_interval;
3423 block_based_options.filter_policy = filter_policy_;
3424 block_based_options.format_version =
3425 static_cast<uint32_t>(FLAGS_format_version);
3426 block_based_options.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;
3427 block_based_options.enable_index_compression =
3428 FLAGS_enable_index_compression;
3429 block_based_options.block_align = FLAGS_block_align;
3430 if (FLAGS_use_data_block_hash_index) {
3431 block_based_options.data_block_index_type =
3432 rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash;
3433 } else {
3434 block_based_options.data_block_index_type =
3435 rocksdb::BlockBasedTableOptions::kDataBlockBinarySearch;
3436 }
3437 block_based_options.data_block_hash_table_util_ratio =
3438 FLAGS_data_block_hash_table_util_ratio;
3439 if (FLAGS_read_cache_path != "") {
3440 #ifndef ROCKSDB_LITE
3441 Status rc_status;
3442
3443 // Read cache need to be provided with a the Logger, we will put all
3444 // reac cache logs in the read cache path in a file named rc_LOG
3445 rc_status = FLAGS_env->CreateDirIfMissing(FLAGS_read_cache_path);
3446 std::shared_ptr<Logger> read_cache_logger;
3447 if (rc_status.ok()) {
3448 rc_status = FLAGS_env->NewLogger(FLAGS_read_cache_path + "/rc_LOG",
3449 &read_cache_logger);
3450 }
3451
3452 if (rc_status.ok()) {
3453 PersistentCacheConfig rc_cfg(FLAGS_env, FLAGS_read_cache_path,
3454 FLAGS_read_cache_size,
3455 read_cache_logger);
3456
3457 rc_cfg.enable_direct_reads = FLAGS_read_cache_direct_read;
3458 rc_cfg.enable_direct_writes = FLAGS_read_cache_direct_write;
3459 rc_cfg.writer_qdepth = 4;
3460 rc_cfg.writer_dispatch_size = 4 * 1024;
3461
3462 auto pcache = std::make_shared<BlockCacheTier>(rc_cfg);
3463 block_based_options.persistent_cache = pcache;
3464 rc_status = pcache->Open();
3465 }
3466
3467 if (!rc_status.ok()) {
3468 fprintf(stderr, "Error initializing read cache, %s\n",
3469 rc_status.ToString().c_str());
3470 exit(1);
3471 }
3472 #else
3473 fprintf(stderr, "Read cache is not supported in LITE\n");
3474 exit(1);
3475
3476 #endif
3477 }
3478 options.table_factory.reset(
3479 NewBlockBasedTableFactory(block_based_options));
3480 }
3481 if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
3482 if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
3483 (unsigned int)FLAGS_num_levels) {
3484 fprintf(stderr, "Insufficient number of fanouts specified %d\n",
3485 (int)FLAGS_max_bytes_for_level_multiplier_additional_v.size());
3486 exit(1);
3487 }
3488 options.max_bytes_for_level_multiplier_additional =
3489 FLAGS_max_bytes_for_level_multiplier_additional_v;
3490 }
3491 options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
3492 options.level0_file_num_compaction_trigger =
3493 FLAGS_level0_file_num_compaction_trigger;
3494 options.level0_slowdown_writes_trigger =
3495 FLAGS_level0_slowdown_writes_trigger;
3496 options.compression = FLAGS_compression_type_e;
3497 options.sample_for_compression = FLAGS_sample_for_compression;
3498 options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
3499 options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
3500 options.max_total_wal_size = FLAGS_max_total_wal_size;
3501
3502 if (FLAGS_min_level_to_compress >= 0) {
3503 assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
3504 options.compression_per_level.resize(FLAGS_num_levels);
3505 for (int i = 0; i < FLAGS_min_level_to_compress; i++) {
3506 options.compression_per_level[i] = kNoCompression;
3507 }
3508 for (int i = FLAGS_min_level_to_compress;
3509 i < FLAGS_num_levels; i++) {
3510 options.compression_per_level[i] = FLAGS_compression_type_e;
3511 }
3512 }
3513 options.soft_rate_limit = FLAGS_soft_rate_limit;
3514 options.hard_rate_limit = FLAGS_hard_rate_limit;
3515 options.soft_pending_compaction_bytes_limit =
3516 FLAGS_soft_pending_compaction_bytes_limit;
3517 options.hard_pending_compaction_bytes_limit =
3518 FLAGS_hard_pending_compaction_bytes_limit;
3519 options.delayed_write_rate = FLAGS_delayed_write_rate;
3520 options.allow_concurrent_memtable_write =
3521 FLAGS_allow_concurrent_memtable_write;
3522 options.inplace_update_support = FLAGS_inplace_update_support;
3523 options.inplace_update_num_locks = FLAGS_inplace_update_num_locks;
3524 options.enable_write_thread_adaptive_yield =
3525 FLAGS_enable_write_thread_adaptive_yield;
3526 options.enable_pipelined_write = FLAGS_enable_pipelined_write;
3527 options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
3528 options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
3529 options.rate_limit_delay_max_milliseconds =
3530 FLAGS_rate_limit_delay_max_milliseconds;
3531 options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
3532 options.max_compaction_bytes = FLAGS_max_compaction_bytes;
3533 options.disable_auto_compactions = FLAGS_disable_auto_compactions;
3534 options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits;
3535
3536 // fill storage options
3537 options.advise_random_on_open = FLAGS_advise_random_on_open;
3538 options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e;
3539 options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
3540 options.bytes_per_sync = FLAGS_bytes_per_sync;
3541 options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
3542
3543 // merge operator options
3544 options.merge_operator = MergeOperators::CreateFromStringId(
3545 FLAGS_merge_operator);
3546 if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
3547 fprintf(stderr, "invalid merge operator: %s\n",
3548 FLAGS_merge_operator.c_str());
3549 exit(1);
3550 }
3551 options.max_successive_merges = FLAGS_max_successive_merges;
3552 options.report_bg_io_stats = FLAGS_report_bg_io_stats;
3553
3554 // set universal style compaction configurations, if applicable
3555 if (FLAGS_universal_size_ratio != 0) {
3556 options.compaction_options_universal.size_ratio =
3557 FLAGS_universal_size_ratio;
3558 }
3559 if (FLAGS_universal_min_merge_width != 0) {
3560 options.compaction_options_universal.min_merge_width =
3561 FLAGS_universal_min_merge_width;
3562 }
3563 if (FLAGS_universal_max_merge_width != 0) {
3564 options.compaction_options_universal.max_merge_width =
3565 FLAGS_universal_max_merge_width;
3566 }
3567 if (FLAGS_universal_max_size_amplification_percent != 0) {
3568 options.compaction_options_universal.max_size_amplification_percent =
3569 FLAGS_universal_max_size_amplification_percent;
3570 }
3571 if (FLAGS_universal_compression_size_percent != -1) {
3572 options.compaction_options_universal.compression_size_percent =
3573 FLAGS_universal_compression_size_percent;
3574 }
3575 options.compaction_options_universal.allow_trivial_move =
3576 FLAGS_universal_allow_trivial_move;
3577 if (FLAGS_thread_status_per_interval > 0) {
3578 options.enable_thread_tracking = true;
3579 }
3580
3581 #ifndef ROCKSDB_LITE
3582 if (FLAGS_readonly && FLAGS_transaction_db) {
3583 fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
3584 exit(1);
3585 }
3586 #endif // ROCKSDB_LITE
3587
3588 }
3589
3590 void InitializeOptionsGeneral(Options* opts) {
3591 Options& options = *opts;
3592
3593 options.create_missing_column_families = FLAGS_num_column_families > 1;
3594 options.statistics = dbstats;
3595 options.wal_dir = FLAGS_wal_dir;
3596 options.create_if_missing = !FLAGS_use_existing_db;
3597 options.dump_malloc_stats = FLAGS_dump_malloc_stats;
3598 options.stats_dump_period_sec =
3599 static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
3600 options.stats_persist_period_sec =
3601 static_cast<unsigned int>(FLAGS_stats_persist_period_sec);
3602 options.stats_history_buffer_size =
3603 static_cast<size_t>(FLAGS_stats_history_buffer_size);
3604
3605 options.compression_opts.level = FLAGS_compression_level;
3606 options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
3607 options.compression_opts.zstd_max_train_bytes =
3608 FLAGS_compression_zstd_max_train_bytes;
3609 // If this is a block based table, set some related options
3610 if (options.table_factory->Name() == BlockBasedTableFactory::kName &&
3611 options.table_factory->GetOptions() != nullptr) {
3612 BlockBasedTableOptions* table_options =
3613 reinterpret_cast<BlockBasedTableOptions*>(
3614 options.table_factory->GetOptions());
3615 if (FLAGS_cache_size) {
3616 table_options->block_cache = cache_;
3617 }
3618 if (FLAGS_bloom_bits >= 0) {
3619 table_options->filter_policy.reset(NewBloomFilterPolicy(
3620 FLAGS_bloom_bits, FLAGS_use_block_based_filter));
3621 }
3622 }
3623 if (FLAGS_row_cache_size) {
3624 if (FLAGS_cache_numshardbits >= 1) {
3625 options.row_cache =
3626 NewLRUCache(FLAGS_row_cache_size, FLAGS_cache_numshardbits);
3627 } else {
3628 options.row_cache = NewLRUCache(FLAGS_row_cache_size);
3629 }
3630 }
3631 if (FLAGS_enable_io_prio) {
3632 FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
3633 FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH);
3634 }
3635 if (FLAGS_enable_cpu_prio) {
3636 FLAGS_env->LowerThreadPoolCPUPriority(Env::LOW);
3637 FLAGS_env->LowerThreadPoolCPUPriority(Env::HIGH);
3638 }
3639 options.env = FLAGS_env;
3640 if (FLAGS_sine_write_rate) {
3641 FLAGS_benchmark_write_rate_limit = static_cast<uint64_t>(SineRate(0));
3642 }
3643
3644 if (FLAGS_rate_limiter_bytes_per_sec > 0) {
3645 if (FLAGS_rate_limit_bg_reads &&
3646 !FLAGS_new_table_reader_for_compaction_inputs) {
3647 fprintf(stderr,
3648 "rate limit compaction reads must have "
3649 "new_table_reader_for_compaction_inputs set\n");
3650 exit(1);
3651 }
3652 options.rate_limiter.reset(NewGenericRateLimiter(
3653 FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */,
3654 10 /* fairness */,
3655 FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
3656 : RateLimiter::Mode::kWritesOnly,
3657 FLAGS_rate_limiter_auto_tuned));
3658 }
3659
3660 options.listeners.emplace_back(listener_);
3661 if (FLAGS_num_multi_db <= 1) {
3662 OpenDb(options, FLAGS_db, &db_);
3663 } else {
3664 multi_dbs_.clear();
3665 multi_dbs_.resize(FLAGS_num_multi_db);
3666 auto wal_dir = options.wal_dir;
3667 for (int i = 0; i < FLAGS_num_multi_db; i++) {
3668 if (!wal_dir.empty()) {
3669 options.wal_dir = GetPathForMultiple(wal_dir, i);
3670 }
3671 OpenDb(options, GetPathForMultiple(FLAGS_db, i), &multi_dbs_[i]);
3672 }
3673 options.wal_dir = wal_dir;
3674 }
3675
3676 // KeepFilter is a noop filter, this can be used to test compaction filter
3677 if (FLAGS_use_keep_filter) {
3678 options.compaction_filter = new KeepFilter();
3679 fprintf(stdout, "A noop compaction filter is used\n");
3680 }
3681
3682 if (FLAGS_use_existing_keys) {
3683 // Only work on single database
3684 assert(db_.db != nullptr);
3685 ReadOptions read_opts;
3686 read_opts.total_order_seek = true;
3687 Iterator* iter = db_.db->NewIterator(read_opts);
3688 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
3689 keys_.emplace_back(iter->key().ToString());
3690 }
3691 delete iter;
3692 FLAGS_num = keys_.size();
3693 }
3694 }
3695
3696 void Open(Options* opts) {
3697 if (!InitializeOptionsFromFile(opts)) {
3698 InitializeOptionsFromFlags(opts);
3699 }
3700
3701 InitializeOptionsGeneral(opts);
3702 }
3703
3704 void OpenDb(Options options, const std::string& db_name,
3705 DBWithColumnFamilies* db) {
3706 Status s;
3707 // Open with column families if necessary.
3708 if (FLAGS_num_column_families > 1) {
3709 size_t num_hot = FLAGS_num_column_families;
3710 if (FLAGS_num_hot_column_families > 0 &&
3711 FLAGS_num_hot_column_families < FLAGS_num_column_families) {
3712 num_hot = FLAGS_num_hot_column_families;
3713 } else {
3714 FLAGS_num_hot_column_families = FLAGS_num_column_families;
3715 }
3716 std::vector<ColumnFamilyDescriptor> column_families;
3717 for (size_t i = 0; i < num_hot; i++) {
3718 column_families.push_back(ColumnFamilyDescriptor(
3719 ColumnFamilyName(i), ColumnFamilyOptions(options)));
3720 }
3721 std::vector<int> cfh_idx_to_prob;
3722 if (!FLAGS_column_family_distribution.empty()) {
3723 std::stringstream cf_prob_stream(FLAGS_column_family_distribution);
3724 std::string cf_prob;
3725 int sum = 0;
3726 while (std::getline(cf_prob_stream, cf_prob, ',')) {
3727 cfh_idx_to_prob.push_back(std::stoi(cf_prob));
3728 sum += cfh_idx_to_prob.back();
3729 }
3730 if (sum != 100) {
3731 fprintf(stderr, "column_family_distribution items must sum to 100\n");
3732 exit(1);
3733 }
3734 if (cfh_idx_to_prob.size() != num_hot) {
3735 fprintf(stderr,
3736 "got %" ROCKSDB_PRIszt
3737 " column_family_distribution items; expected "
3738 "%" ROCKSDB_PRIszt "\n",
3739 cfh_idx_to_prob.size(), num_hot);
3740 exit(1);
3741 }
3742 }
3743 #ifndef ROCKSDB_LITE
3744 if (FLAGS_readonly) {
3745 s = DB::OpenForReadOnly(options, db_name, column_families,
3746 &db->cfh, &db->db);
3747 } else if (FLAGS_optimistic_transaction_db) {
3748 s = OptimisticTransactionDB::Open(options, db_name, column_families,
3749 &db->cfh, &db->opt_txn_db);
3750 if (s.ok()) {
3751 db->db = db->opt_txn_db->GetBaseDB();
3752 }
3753 } else if (FLAGS_transaction_db) {
3754 TransactionDB* ptr;
3755 TransactionDBOptions txn_db_options;
3756 s = TransactionDB::Open(options, txn_db_options, db_name,
3757 column_families, &db->cfh, &ptr);
3758 if (s.ok()) {
3759 db->db = ptr;
3760 }
3761 } else {
3762 s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
3763 }
3764 #else
3765 s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
3766 #endif // ROCKSDB_LITE
3767 db->cfh.resize(FLAGS_num_column_families);
3768 db->num_created = num_hot;
3769 db->num_hot = num_hot;
3770 db->cfh_idx_to_prob = std::move(cfh_idx_to_prob);
3771 #ifndef ROCKSDB_LITE
3772 } else if (FLAGS_readonly) {
3773 s = DB::OpenForReadOnly(options, db_name, &db->db);
3774 } else if (FLAGS_optimistic_transaction_db) {
3775 s = OptimisticTransactionDB::Open(options, db_name, &db->opt_txn_db);
3776 if (s.ok()) {
3777 db->db = db->opt_txn_db->GetBaseDB();
3778 }
3779 } else if (FLAGS_transaction_db) {
3780 TransactionDB* ptr = nullptr;
3781 TransactionDBOptions txn_db_options;
3782 s = CreateLoggerFromOptions(db_name, options, &options.info_log);
3783 if (s.ok()) {
3784 s = TransactionDB::Open(options, txn_db_options, db_name, &ptr);
3785 }
3786 if (s.ok()) {
3787 db->db = ptr;
3788 }
3789 } else if (FLAGS_use_blob_db) {
3790 blob_db::BlobDBOptions blob_db_options;
3791 blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
3792 blob_db_options.is_fifo = FLAGS_blob_db_is_fifo;
3793 blob_db_options.max_db_size = FLAGS_blob_db_max_db_size;
3794 blob_db_options.ttl_range_secs = FLAGS_blob_db_ttl_range_secs;
3795 blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
3796 blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
3797 blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
3798 blob_db::BlobDB* ptr = nullptr;
3799 s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr);
3800 if (s.ok()) {
3801 db->db = ptr;
3802 }
3803 #endif // ROCKSDB_LITE
3804 } else {
3805 s = DB::Open(options, db_name, &db->db);
3806 }
3807 if (!s.ok()) {
3808 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
3809 exit(1);
3810 }
3811 }
3812
3813 enum WriteMode {
3814 RANDOM, SEQUENTIAL, UNIQUE_RANDOM
3815 };
3816
3817 void WriteSeqDeterministic(ThreadState* thread) {
3818 DoDeterministicCompact(thread, open_options_.compaction_style, SEQUENTIAL);
3819 }
3820
3821 void WriteUniqueRandomDeterministic(ThreadState* thread) {
3822 DoDeterministicCompact(thread, open_options_.compaction_style,
3823 UNIQUE_RANDOM);
3824 }
3825
3826 void WriteSeq(ThreadState* thread) {
3827 DoWrite(thread, SEQUENTIAL);
3828 }
3829
3830 void WriteRandom(ThreadState* thread) {
3831 DoWrite(thread, RANDOM);
3832 }
3833
3834 void WriteUniqueRandom(ThreadState* thread) {
3835 DoWrite(thread, UNIQUE_RANDOM);
3836 }
3837
3838 class KeyGenerator {
3839 public:
3840 KeyGenerator(Random64* rand, WriteMode mode, uint64_t num,
3841 uint64_t /*num_per_set*/ = 64 * 1024)
3842 : rand_(rand), mode_(mode), num_(num), next_(0) {
3843 if (mode_ == UNIQUE_RANDOM) {
3844 // NOTE: if memory consumption of this approach becomes a concern,
3845 // we can either break it into pieces and only random shuffle a section
3846 // each time. Alternatively, use a bit map implementation
3847 // (https://reviews.facebook.net/differential/diff/54627/)
3848 values_.resize(num_);
3849 for (uint64_t i = 0; i < num_; ++i) {
3850 values_[i] = i;
3851 }
3852 std::shuffle(
3853 values_.begin(), values_.end(),
3854 std::default_random_engine(static_cast<unsigned int>(FLAGS_seed)));
3855 }
3856 }
3857
3858 uint64_t Next() {
3859 switch (mode_) {
3860 case SEQUENTIAL:
3861 return next_++;
3862 case RANDOM:
3863 return rand_->Next() % num_;
3864 case UNIQUE_RANDOM:
3865 assert(next_ < num_);
3866 return values_[next_++];
3867 }
3868 assert(false);
3869 return std::numeric_limits<uint64_t>::max();
3870 }
3871
3872 private:
3873 Random64* rand_;
3874 WriteMode mode_;
3875 const uint64_t num_;
3876 uint64_t next_;
3877 std::vector<uint64_t> values_;
3878 };
3879
3880 DB* SelectDB(ThreadState* thread) {
3881 return SelectDBWithCfh(thread)->db;
3882 }
3883
3884 DBWithColumnFamilies* SelectDBWithCfh(ThreadState* thread) {
3885 return SelectDBWithCfh(thread->rand.Next());
3886 }
3887
3888 DBWithColumnFamilies* SelectDBWithCfh(uint64_t rand_int) {
3889 if (db_.db != nullptr) {
3890 return &db_;
3891 } else {
3892 return &multi_dbs_[rand_int % multi_dbs_.size()];
3893 }
3894 }
3895
3896 double SineRate(double x) {
3897 return FLAGS_sine_a*sin((FLAGS_sine_b*x) + FLAGS_sine_c) + FLAGS_sine_d;
3898 }
3899
3900 void DoWrite(ThreadState* thread, WriteMode write_mode) {
3901 const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
3902 const int64_t num_ops = writes_ == 0 ? num_ : writes_;
3903
3904 size_t num_key_gens = 1;
3905 if (db_.db == nullptr) {
3906 num_key_gens = multi_dbs_.size();
3907 }
3908 std::vector<std::unique_ptr<KeyGenerator>> key_gens(num_key_gens);
3909 int64_t max_ops = num_ops * num_key_gens;
3910 int64_t ops_per_stage = max_ops;
3911 if (FLAGS_num_column_families > 1 && FLAGS_num_hot_column_families > 0) {
3912 ops_per_stage = (max_ops - 1) / (FLAGS_num_column_families /
3913 FLAGS_num_hot_column_families) +
3914 1;
3915 }
3916
3917 Duration duration(test_duration, max_ops, ops_per_stage);
3918 for (size_t i = 0; i < num_key_gens; i++) {
3919 key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode,
3920 num_ + max_num_range_tombstones_,
3921 ops_per_stage));
3922 }
3923
3924 if (num_ != FLAGS_num) {
3925 char msg[100];
3926 snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_);
3927 thread->stats.AddMessage(msg);
3928 }
3929
3930 RandomGenerator gen;
3931 WriteBatch batch;
3932 Status s;
3933 int64_t bytes = 0;
3934
3935 std::unique_ptr<const char[]> key_guard;
3936 Slice key = AllocateKey(&key_guard);
3937 std::unique_ptr<const char[]> begin_key_guard;
3938 Slice begin_key = AllocateKey(&begin_key_guard);
3939 std::unique_ptr<const char[]> end_key_guard;
3940 Slice end_key = AllocateKey(&end_key_guard);
3941 std::vector<std::unique_ptr<const char[]>> expanded_key_guards;
3942 std::vector<Slice> expanded_keys;
3943 if (FLAGS_expand_range_tombstones) {
3944 expanded_key_guards.resize(range_tombstone_width_);
3945 for (auto& expanded_key_guard : expanded_key_guards) {
3946 expanded_keys.emplace_back(AllocateKey(&expanded_key_guard));
3947 }
3948 }
3949
3950 int64_t stage = 0;
3951 int64_t num_written = 0;
3952 while (!duration.Done(entries_per_batch_)) {
3953 if (duration.GetStage() != stage) {
3954 stage = duration.GetStage();
3955 if (db_.db != nullptr) {
3956 db_.CreateNewCf(open_options_, stage);
3957 } else {
3958 for (auto& db : multi_dbs_) {
3959 db.CreateNewCf(open_options_, stage);
3960 }
3961 }
3962 }
3963
3964 size_t id = thread->rand.Next() % num_key_gens;
3965 DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
3966 batch.Clear();
3967
3968 if (thread->shared->write_rate_limiter.get() != nullptr) {
3969 thread->shared->write_rate_limiter->Request(
3970 entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
3971 nullptr /* stats */, RateLimiter::OpType::kWrite);
3972 // Set time at which last op finished to Now() to hide latency and
3973 // sleep from rate limiter. Also, do the check once per batch, not
3974 // once per write.
3975 thread->stats.ResetLastOpTime();
3976 }
3977
3978 for (int64_t j = 0; j < entries_per_batch_; j++) {
3979 int64_t rand_num = key_gens[id]->Next();
3980 GenerateKeyFromInt(rand_num, FLAGS_num, &key);
3981 if (use_blob_db_) {
3982 #ifndef ROCKSDB_LITE
3983 Slice val = gen.Generate(value_size_);
3984 int ttl = rand() % FLAGS_blob_db_max_ttl_range;
3985 blob_db::BlobDB* blobdb =
3986 static_cast<blob_db::BlobDB*>(db_with_cfh->db);
3987 s = blobdb->PutWithTTL(write_options_, key, val, ttl);
3988 #endif // ROCKSDB_LITE
3989 } else if (FLAGS_num_column_families <= 1) {
3990 batch.Put(key, gen.Generate(value_size_));
3991 } else {
3992 // We use same rand_num as seed for key and column family so that we
3993 // can deterministically find the cfh corresponding to a particular
3994 // key while reading the key.
3995 batch.Put(db_with_cfh->GetCfh(rand_num), key,
3996 gen.Generate(value_size_));
3997 }
3998 bytes += value_size_ + key_size_;
3999 ++num_written;
4000 if (writes_per_range_tombstone_ > 0 &&
4001 num_written > writes_before_delete_range_ &&
4002 (num_written - writes_before_delete_range_) /
4003 writes_per_range_tombstone_ <=
4004 max_num_range_tombstones_ &&
4005 (num_written - writes_before_delete_range_) %
4006 writes_per_range_tombstone_ ==
4007 0) {
4008 int64_t begin_num = key_gens[id]->Next();
4009 if (FLAGS_expand_range_tombstones) {
4010 for (int64_t offset = 0; offset < range_tombstone_width_;
4011 ++offset) {
4012 GenerateKeyFromInt(begin_num + offset, FLAGS_num,
4013 &expanded_keys[offset]);
4014 if (use_blob_db_) {
4015 #ifndef ROCKSDB_LITE
4016 s = db_with_cfh->db->Delete(write_options_,
4017 expanded_keys[offset]);
4018 #endif // ROCKSDB_LITE
4019 } else if (FLAGS_num_column_families <= 1) {
4020 batch.Delete(expanded_keys[offset]);
4021 } else {
4022 batch.Delete(db_with_cfh->GetCfh(rand_num),
4023 expanded_keys[offset]);
4024 }
4025 }
4026 } else {
4027 GenerateKeyFromInt(begin_num, FLAGS_num, &begin_key);
4028 GenerateKeyFromInt(begin_num + range_tombstone_width_, FLAGS_num,
4029 &end_key);
4030 if (use_blob_db_) {
4031 #ifndef ROCKSDB_LITE
4032 s = db_with_cfh->db->DeleteRange(
4033 write_options_, db_with_cfh->db->DefaultColumnFamily(),
4034 begin_key, end_key);
4035 #endif // ROCKSDB_LITE
4036 } else if (FLAGS_num_column_families <= 1) {
4037 batch.DeleteRange(begin_key, end_key);
4038 } else {
4039 batch.DeleteRange(db_with_cfh->GetCfh(rand_num), begin_key,
4040 end_key);
4041 }
4042 }
4043 }
4044 }
4045 if (!use_blob_db_) {
4046 s = db_with_cfh->db->Write(write_options_, &batch);
4047 }
4048 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
4049 entries_per_batch_, kWrite);
4050 if (FLAGS_sine_write_rate) {
4051 uint64_t now = FLAGS_env->NowMicros();
4052
4053 uint64_t usecs_since_last;
4054 if (now > thread->stats.GetSineInterval()) {
4055 usecs_since_last = now - thread->stats.GetSineInterval();
4056 } else {
4057 usecs_since_last = 0;
4058 }
4059
4060 if (usecs_since_last >
4061 (FLAGS_sine_write_rate_interval_milliseconds * uint64_t{1000})) {
4062 double usecs_since_start =
4063 static_cast<double>(now - thread->stats.GetStart());
4064 thread->stats.ResetSineInterval();
4065 uint64_t write_rate =
4066 static_cast<uint64_t>(SineRate(usecs_since_start / 1000000.0));
4067 thread->shared->write_rate_limiter.reset(
4068 NewGenericRateLimiter(write_rate));
4069 }
4070 }
4071 if (!s.ok()) {
4072 s = listener_->WaitForRecovery(600000000) ? Status::OK() : s;
4073 }
4074
4075 if (!s.ok()) {
4076 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
4077 exit(1);
4078 }
4079 }
4080 thread->stats.AddBytes(bytes);
4081 }
4082
4083 Status DoDeterministicCompact(ThreadState* thread,
4084 CompactionStyle compaction_style,
4085 WriteMode write_mode) {
4086 #ifndef ROCKSDB_LITE
4087 ColumnFamilyMetaData meta;
4088 std::vector<DB*> db_list;
4089 if (db_.db != nullptr) {
4090 db_list.push_back(db_.db);
4091 } else {
4092 for (auto& db : multi_dbs_) {
4093 db_list.push_back(db.db);
4094 }
4095 }
4096 std::vector<Options> options_list;
4097 for (auto db : db_list) {
4098 options_list.push_back(db->GetOptions());
4099 if (compaction_style != kCompactionStyleFIFO) {
4100 db->SetOptions({{"disable_auto_compactions", "1"},
4101 {"level0_slowdown_writes_trigger", "400000000"},
4102 {"level0_stop_writes_trigger", "400000000"}});
4103 } else {
4104 db->SetOptions({{"disable_auto_compactions", "1"}});
4105 }
4106 }
4107
4108 assert(!db_list.empty());
4109 auto num_db = db_list.size();
4110 size_t num_levels = static_cast<size_t>(open_options_.num_levels);
4111 size_t output_level = open_options_.num_levels - 1;
4112 std::vector<std::vector<std::vector<SstFileMetaData>>> sorted_runs(num_db);
4113 std::vector<size_t> num_files_at_level0(num_db, 0);
4114 if (compaction_style == kCompactionStyleLevel) {
4115 if (num_levels == 0) {
4116 return Status::InvalidArgument("num_levels should be larger than 1");
4117 }
4118 bool should_stop = false;
4119 while (!should_stop) {
4120 if (sorted_runs[0].empty()) {
4121 DoWrite(thread, write_mode);
4122 } else {
4123 DoWrite(thread, UNIQUE_RANDOM);
4124 }
4125 for (size_t i = 0; i < num_db; i++) {
4126 auto db = db_list[i];
4127 db->Flush(FlushOptions());
4128 db->GetColumnFamilyMetaData(&meta);
4129 if (num_files_at_level0[i] == meta.levels[0].files.size() ||
4130 writes_ == 0) {
4131 should_stop = true;
4132 continue;
4133 }
4134 sorted_runs[i].emplace_back(
4135 meta.levels[0].files.begin(),
4136 meta.levels[0].files.end() - num_files_at_level0[i]);
4137 num_files_at_level0[i] = meta.levels[0].files.size();
4138 if (sorted_runs[i].back().size() == 1) {
4139 should_stop = true;
4140 continue;
4141 }
4142 if (sorted_runs[i].size() == output_level) {
4143 auto& L1 = sorted_runs[i].back();
4144 L1.erase(L1.begin(), L1.begin() + L1.size() / 3);
4145 should_stop = true;
4146 continue;
4147 }
4148 }
4149 writes_ /= static_cast<int64_t>(open_options_.max_bytes_for_level_multiplier);
4150 }
4151 for (size_t i = 0; i < num_db; i++) {
4152 if (sorted_runs[i].size() < num_levels - 1) {
4153 fprintf(stderr, "n is too small to fill %" ROCKSDB_PRIszt " levels\n", num_levels);
4154 exit(1);
4155 }
4156 }
4157 for (size_t i = 0; i < num_db; i++) {
4158 auto db = db_list[i];
4159 auto compactionOptions = CompactionOptions();
4160 compactionOptions.compression = FLAGS_compression_type_e;
4161 auto options = db->GetOptions();
4162 MutableCFOptions mutable_cf_options(options);
4163 for (size_t j = 0; j < sorted_runs[i].size(); j++) {
4164 compactionOptions.output_file_size_limit =
4165 MaxFileSizeForLevel(mutable_cf_options,
4166 static_cast<int>(output_level), compaction_style);
4167 std::cout << sorted_runs[i][j].size() << std::endl;
4168 db->CompactFiles(compactionOptions, {sorted_runs[i][j].back().name,
4169 sorted_runs[i][j].front().name},
4170 static_cast<int>(output_level - j) /*level*/);
4171 }
4172 }
4173 } else if (compaction_style == kCompactionStyleUniversal) {
4174 auto ratio = open_options_.compaction_options_universal.size_ratio;
4175 bool should_stop = false;
4176 while (!should_stop) {
4177 if (sorted_runs[0].empty()) {
4178 DoWrite(thread, write_mode);
4179 } else {
4180 DoWrite(thread, UNIQUE_RANDOM);
4181 }
4182 for (size_t i = 0; i < num_db; i++) {
4183 auto db = db_list[i];
4184 db->Flush(FlushOptions());
4185 db->GetColumnFamilyMetaData(&meta);
4186 if (num_files_at_level0[i] == meta.levels[0].files.size() ||
4187 writes_ == 0) {
4188 should_stop = true;
4189 continue;
4190 }
4191 sorted_runs[i].emplace_back(
4192 meta.levels[0].files.begin(),
4193 meta.levels[0].files.end() - num_files_at_level0[i]);
4194 num_files_at_level0[i] = meta.levels[0].files.size();
4195 if (sorted_runs[i].back().size() == 1) {
4196 should_stop = true;
4197 continue;
4198 }
4199 num_files_at_level0[i] = meta.levels[0].files.size();
4200 }
4201 writes_ = static_cast<int64_t>(writes_* static_cast<double>(100) / (ratio + 200));
4202 }
4203 for (size_t i = 0; i < num_db; i++) {
4204 if (sorted_runs[i].size() < num_levels) {
4205 fprintf(stderr, "n is too small to fill %" ROCKSDB_PRIszt " levels\n", num_levels);
4206 exit(1);
4207 }
4208 }
4209 for (size_t i = 0; i < num_db; i++) {
4210 auto db = db_list[i];
4211 auto compactionOptions = CompactionOptions();
4212 compactionOptions.compression = FLAGS_compression_type_e;
4213 auto options = db->GetOptions();
4214 MutableCFOptions mutable_cf_options(options);
4215 for (size_t j = 0; j < sorted_runs[i].size(); j++) {
4216 compactionOptions.output_file_size_limit =
4217 MaxFileSizeForLevel(mutable_cf_options,
4218 static_cast<int>(output_level), compaction_style);
4219 db->CompactFiles(
4220 compactionOptions,
4221 {sorted_runs[i][j].back().name, sorted_runs[i][j].front().name},
4222 (output_level > j ? static_cast<int>(output_level - j)
4223 : 0) /*level*/);
4224 }
4225 }
4226 } else if (compaction_style == kCompactionStyleFIFO) {
4227 if (num_levels != 1) {
4228 return Status::InvalidArgument(
4229 "num_levels should be 1 for FIFO compaction");
4230 }
4231 if (FLAGS_num_multi_db != 0) {
4232 return Status::InvalidArgument("Doesn't support multiDB");
4233 }
4234 auto db = db_list[0];
4235 std::vector<std::string> file_names;
4236 while (true) {
4237 if (sorted_runs[0].empty()) {
4238 DoWrite(thread, write_mode);
4239 } else {
4240 DoWrite(thread, UNIQUE_RANDOM);
4241 }
4242 db->Flush(FlushOptions());
4243 db->GetColumnFamilyMetaData(&meta);
4244 auto total_size = meta.levels[0].size;
4245 if (total_size >=
4246 db->GetOptions().compaction_options_fifo.max_table_files_size) {
4247 for (auto file_meta : meta.levels[0].files) {
4248 file_names.emplace_back(file_meta.name);
4249 }
4250 break;
4251 }
4252 }
4253 // TODO(shuzhang1989): Investigate why CompactFiles not working
4254 // auto compactionOptions = CompactionOptions();
4255 // db->CompactFiles(compactionOptions, file_names, 0);
4256 auto compactionOptions = CompactRangeOptions();
4257 db->CompactRange(compactionOptions, nullptr, nullptr);
4258 } else {
4259 fprintf(stdout,
4260 "%-12s : skipped (-compaction_stype=kCompactionStyleNone)\n",
4261 "filldeterministic");
4262 return Status::InvalidArgument("None compaction is not supported");
4263 }
4264
4265 // Verify seqno and key range
4266 // Note: the seqno get changed at the max level by implementation
4267 // optimization, so skip the check of the max level.
4268 #ifndef NDEBUG
4269 for (size_t k = 0; k < num_db; k++) {
4270 auto db = db_list[k];
4271 db->GetColumnFamilyMetaData(&meta);
4272 // verify the number of sorted runs
4273 if (compaction_style == kCompactionStyleLevel) {
4274 assert(num_levels - 1 == sorted_runs[k].size());
4275 } else if (compaction_style == kCompactionStyleUniversal) {
4276 assert(meta.levels[0].files.size() + num_levels - 1 ==
4277 sorted_runs[k].size());
4278 } else if (compaction_style == kCompactionStyleFIFO) {
4279 // TODO(gzh): FIFO compaction
4280 db->GetColumnFamilyMetaData(&meta);
4281 auto total_size = meta.levels[0].size;
4282 assert(total_size <=
4283 db->GetOptions().compaction_options_fifo.max_table_files_size);
4284 break;
4285 }
4286
4287 // verify smallest/largest seqno and key range of each sorted run
4288 auto max_level = num_levels - 1;
4289 int level;
4290 for (size_t i = 0; i < sorted_runs[k].size(); i++) {
4291 level = static_cast<int>(max_level - i);
4292 SequenceNumber sorted_run_smallest_seqno = kMaxSequenceNumber;
4293 SequenceNumber sorted_run_largest_seqno = 0;
4294 std::string sorted_run_smallest_key, sorted_run_largest_key;
4295 bool first_key = true;
4296 for (auto fileMeta : sorted_runs[k][i]) {
4297 sorted_run_smallest_seqno =
4298 std::min(sorted_run_smallest_seqno, fileMeta.smallest_seqno);
4299 sorted_run_largest_seqno =
4300 std::max(sorted_run_largest_seqno, fileMeta.largest_seqno);
4301 if (first_key ||
4302 db->DefaultColumnFamily()->GetComparator()->Compare(
4303 fileMeta.smallestkey, sorted_run_smallest_key) < 0) {
4304 sorted_run_smallest_key = fileMeta.smallestkey;
4305 }
4306 if (first_key ||
4307 db->DefaultColumnFamily()->GetComparator()->Compare(
4308 fileMeta.largestkey, sorted_run_largest_key) > 0) {
4309 sorted_run_largest_key = fileMeta.largestkey;
4310 }
4311 first_key = false;
4312 }
4313 if (compaction_style == kCompactionStyleLevel ||
4314 (compaction_style == kCompactionStyleUniversal && level > 0)) {
4315 SequenceNumber level_smallest_seqno = kMaxSequenceNumber;
4316 SequenceNumber level_largest_seqno = 0;
4317 for (auto fileMeta : meta.levels[level].files) {
4318 level_smallest_seqno =
4319 std::min(level_smallest_seqno, fileMeta.smallest_seqno);
4320 level_largest_seqno =
4321 std::max(level_largest_seqno, fileMeta.largest_seqno);
4322 }
4323 assert(sorted_run_smallest_key ==
4324 meta.levels[level].files.front().smallestkey);
4325 assert(sorted_run_largest_key ==
4326 meta.levels[level].files.back().largestkey);
4327 if (level != static_cast<int>(max_level)) {
4328 // compaction at max_level would change sequence number
4329 assert(sorted_run_smallest_seqno == level_smallest_seqno);
4330 assert(sorted_run_largest_seqno == level_largest_seqno);
4331 }
4332 } else if (compaction_style == kCompactionStyleUniversal) {
4333 // level <= 0 means sorted runs on level 0
4334 auto level0_file =
4335 meta.levels[0].files[sorted_runs[k].size() - 1 - i];
4336 assert(sorted_run_smallest_key == level0_file.smallestkey);
4337 assert(sorted_run_largest_key == level0_file.largestkey);
4338 if (level != static_cast<int>(max_level)) {
4339 assert(sorted_run_smallest_seqno == level0_file.smallest_seqno);
4340 assert(sorted_run_largest_seqno == level0_file.largest_seqno);
4341 }
4342 }
4343 }
4344 }
4345 #endif
4346 // print the size of each sorted_run
4347 for (size_t k = 0; k < num_db; k++) {
4348 auto db = db_list[k];
4349 fprintf(stdout,
4350 "---------------------- DB %" ROCKSDB_PRIszt " LSM ---------------------\n", k);
4351 db->GetColumnFamilyMetaData(&meta);
4352 for (auto& levelMeta : meta.levels) {
4353 if (levelMeta.files.empty()) {
4354 continue;
4355 }
4356 if (levelMeta.level == 0) {
4357 for (auto& fileMeta : levelMeta.files) {
4358 fprintf(stdout, "Level[%d]: %s(size: %" ROCKSDB_PRIszt " bytes)\n",
4359 levelMeta.level, fileMeta.name.c_str(), fileMeta.size);
4360 }
4361 } else {
4362 fprintf(stdout, "Level[%d]: %s - %s(total size: %" PRIi64 " bytes)\n",
4363 levelMeta.level, levelMeta.files.front().name.c_str(),
4364 levelMeta.files.back().name.c_str(), levelMeta.size);
4365 }
4366 }
4367 }
4368 for (size_t i = 0; i < num_db; i++) {
4369 db_list[i]->SetOptions(
4370 {{"disable_auto_compactions",
4371 std::to_string(options_list[i].disable_auto_compactions)},
4372 {"level0_slowdown_writes_trigger",
4373 std::to_string(options_list[i].level0_slowdown_writes_trigger)},
4374 {"level0_stop_writes_trigger",
4375 std::to_string(options_list[i].level0_stop_writes_trigger)}});
4376 }
4377 return Status::OK();
4378 #else
4379 (void)thread;
4380 (void)compaction_style;
4381 (void)write_mode;
4382 fprintf(stderr, "Rocksdb Lite doesn't support filldeterministic\n");
4383 return Status::NotSupported(
4384 "Rocksdb Lite doesn't support filldeterministic");
4385 #endif // ROCKSDB_LITE
4386 }
4387
4388 void ReadSequential(ThreadState* thread) {
4389 if (db_.db != nullptr) {
4390 ReadSequential(thread, db_.db);
4391 } else {
4392 for (const auto& db_with_cfh : multi_dbs_) {
4393 ReadSequential(thread, db_with_cfh.db);
4394 }
4395 }
4396 }
4397
4398 void ReadSequential(ThreadState* thread, DB* db) {
4399 ReadOptions options(FLAGS_verify_checksum, true);
4400 options.tailing = FLAGS_use_tailing_iterator;
4401
4402 Iterator* iter = db->NewIterator(options);
4403 int64_t i = 0;
4404 int64_t bytes = 0;
4405 for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
4406 bytes += iter->key().size() + iter->value().size();
4407 thread->stats.FinishedOps(nullptr, db, 1, kRead);
4408 ++i;
4409
4410 if (thread->shared->read_rate_limiter.get() != nullptr &&
4411 i % 1024 == 1023) {
4412 thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH,
4413 nullptr /* stats */,
4414 RateLimiter::OpType::kRead);
4415 }
4416 }
4417
4418 delete iter;
4419 thread->stats.AddBytes(bytes);
4420 if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
4421 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
4422 get_perf_context()->ToString());
4423 }
4424 }
4425
4426 void ReadReverse(ThreadState* thread) {
4427 if (db_.db != nullptr) {
4428 ReadReverse(thread, db_.db);
4429 } else {
4430 for (const auto& db_with_cfh : multi_dbs_) {
4431 ReadReverse(thread, db_with_cfh.db);
4432 }
4433 }
4434 }
4435
4436 void ReadReverse(ThreadState* thread, DB* db) {
4437 Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
4438 int64_t i = 0;
4439 int64_t bytes = 0;
4440 for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
4441 bytes += iter->key().size() + iter->value().size();
4442 thread->stats.FinishedOps(nullptr, db, 1, kRead);
4443 ++i;
4444 if (thread->shared->read_rate_limiter.get() != nullptr &&
4445 i % 1024 == 1023) {
4446 thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH,
4447 nullptr /* stats */,
4448 RateLimiter::OpType::kRead);
4449 }
4450 }
4451 delete iter;
4452 thread->stats.AddBytes(bytes);
4453 }
4454
4455 void ReadRandomFast(ThreadState* thread) {
4456 int64_t read = 0;
4457 int64_t found = 0;
4458 int64_t nonexist = 0;
4459 ReadOptions options(FLAGS_verify_checksum, true);
4460 std::unique_ptr<const char[]> key_guard;
4461 Slice key = AllocateKey(&key_guard);
4462 std::string value;
4463 DB* db = SelectDBWithCfh(thread)->db;
4464
4465 int64_t pot = 1;
4466 while (pot < FLAGS_num) {
4467 pot <<= 1;
4468 }
4469
4470 Duration duration(FLAGS_duration, reads_);
4471 do {
4472 for (int i = 0; i < 100; ++i) {
4473 int64_t key_rand = thread->rand.Next() & (pot - 1);
4474 GenerateKeyFromInt(key_rand, FLAGS_num, &key);
4475 ++read;
4476 auto status = db->Get(options, key, &value);
4477 if (status.ok()) {
4478 ++found;
4479 } else if (!status.IsNotFound()) {
4480 fprintf(stderr, "Get returned an error: %s\n",
4481 status.ToString().c_str());
4482 abort();
4483 }
4484 if (key_rand >= FLAGS_num) {
4485 ++nonexist;
4486 }
4487 }
4488 if (thread->shared->read_rate_limiter.get() != nullptr) {
4489 thread->shared->read_rate_limiter->Request(
4490 100, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
4491 }
4492
4493 thread->stats.FinishedOps(nullptr, db, 100, kRead);
4494 } while (!duration.Done(100));
4495
4496 char msg[100];
4497 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found, "
4498 "issued %" PRIu64 " non-exist keys)\n",
4499 found, read, nonexist);
4500
4501 thread->stats.AddMessage(msg);
4502
4503 if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
4504 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
4505 get_perf_context()->ToString());
4506 }
4507 }
4508
4509 int64_t GetRandomKey(Random64* rand) {
4510 uint64_t rand_int = rand->Next();
4511 int64_t key_rand;
4512 if (read_random_exp_range_ == 0) {
4513 key_rand = rand_int % FLAGS_num;
4514 } else {
4515 const uint64_t kBigInt = static_cast<uint64_t>(1U) << 62;
4516 long double order = -static_cast<long double>(rand_int % kBigInt) /
4517 static_cast<long double>(kBigInt) *
4518 read_random_exp_range_;
4519 long double exp_ran = std::exp(order);
4520 uint64_t rand_num =
4521 static_cast<int64_t>(exp_ran * static_cast<long double>(FLAGS_num));
4522 // Map to a different number to avoid locality.
4523 const uint64_t kBigPrime = 0x5bd1e995;
4524 // Overflow is like %(2^64). Will have little impact of results.
4525 key_rand = static_cast<int64_t>((rand_num * kBigPrime) % FLAGS_num);
4526 }
4527 return key_rand;
4528 }
4529
4530 void ReadRandom(ThreadState* thread) {
4531 int64_t read = 0;
4532 int64_t found = 0;
4533 int64_t bytes = 0;
4534 ReadOptions options(FLAGS_verify_checksum, true);
4535 std::unique_ptr<const char[]> key_guard;
4536 Slice key = AllocateKey(&key_guard);
4537 PinnableSlice pinnable_val;
4538
4539 Duration duration(FLAGS_duration, reads_);
4540 while (!duration.Done(1)) {
4541 DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
4542 // We use same key_rand as seed for key and column family so that we can
4543 // deterministically find the cfh corresponding to a particular key, as it
4544 // is done in DoWrite method.
4545 int64_t key_rand = GetRandomKey(&thread->rand);
4546 GenerateKeyFromInt(key_rand, FLAGS_num, &key);
4547 read++;
4548 Status s;
4549 if (FLAGS_num_column_families > 1) {
4550 s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
4551 &pinnable_val);
4552 } else {
4553 pinnable_val.Reset();
4554 s = db_with_cfh->db->Get(options,
4555 db_with_cfh->db->DefaultColumnFamily(), key,
4556 &pinnable_val);
4557 }
4558 if (s.ok()) {
4559 found++;
4560 bytes += key.size() + pinnable_val.size();
4561 } else if (!s.IsNotFound()) {
4562 fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
4563 abort();
4564 }
4565
4566 if (thread->shared->read_rate_limiter.get() != nullptr &&
4567 read % 256 == 255) {
4568 thread->shared->read_rate_limiter->Request(
4569 256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
4570 }
4571
4572 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
4573 }
4574
4575 char msg[100];
4576 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
4577 found, read);
4578
4579 thread->stats.AddBytes(bytes);
4580 thread->stats.AddMessage(msg);
4581
4582 if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
4583 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
4584 get_perf_context()->ToString());
4585 }
4586 }
4587
4588 // Calls MultiGet over a list of keys from a random distribution.
4589 // Returns the total number of keys found.
4590 void MultiReadRandom(ThreadState* thread) {
4591 int64_t read = 0;
4592 int64_t num_multireads = 0;
4593 int64_t found = 0;
4594 ReadOptions options(FLAGS_verify_checksum, true);
4595 std::vector<Slice> keys;
4596 std::vector<std::unique_ptr<const char[]> > key_guards;
4597 std::vector<std::string> values(entries_per_batch_);
4598 while (static_cast<int64_t>(keys.size()) < entries_per_batch_) {
4599 key_guards.push_back(std::unique_ptr<const char[]>());
4600 keys.push_back(AllocateKey(&key_guards.back()));
4601 }
4602
4603 Duration duration(FLAGS_duration, reads_);
4604 while (!duration.Done(1)) {
4605 DB* db = SelectDB(thread);
4606 for (int64_t i = 0; i < entries_per_batch_; ++i) {
4607 GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
4608 }
4609 std::vector<Status> statuses = db->MultiGet(options, keys, &values);
4610 assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
4611
4612 read += entries_per_batch_;
4613 num_multireads++;
4614 for (int64_t i = 0; i < entries_per_batch_; ++i) {
4615 if (statuses[i].ok()) {
4616 ++found;
4617 } else if (!statuses[i].IsNotFound()) {
4618 fprintf(stderr, "MultiGet returned an error: %s\n",
4619 statuses[i].ToString().c_str());
4620 abort();
4621 }
4622 }
4623 if (thread->shared->read_rate_limiter.get() != nullptr &&
4624 num_multireads % 256 == 255) {
4625 thread->shared->read_rate_limiter->Request(
4626 256 * entries_per_batch_, Env::IO_HIGH, nullptr /* stats */,
4627 RateLimiter::OpType::kRead);
4628 }
4629 thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead);
4630 }
4631
4632 char msg[100];
4633 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
4634 found, read);
4635 thread->stats.AddMessage(msg);
4636 }
4637
4638 // THe reverse function of Pareto function
4639 int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
4640 double ret;
4641 if (k == 0.0) {
4642 ret = theta - sigma * std::log(u);
4643 } else {
4644 ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k;
4645 }
4646 return static_cast<int64_t>(ceil(ret));
4647 }
4648 // inversion of y=ax^b
4649 int64_t PowerCdfInversion(double u, double a, double b) {
4650 double ret;
4651 ret = std::pow((u / a), (1 / b));
4652 return static_cast<int64_t>(ceil(ret));
4653 }
4654
4655 // Add the noice to the QPS
4656 double AddNoise(double origin, double noise_ratio) {
4657 if (noise_ratio < 0.0 || noise_ratio > 1.0) {
4658 return origin;
4659 }
4660 int band_int = static_cast<int>(FLAGS_sine_a);
4661 double delta = (rand() % band_int - band_int / 2) * noise_ratio;
4662 if (origin + delta < 0) {
4663 return origin;
4664 } else {
4665 return (origin + delta);
4666 }
4667 }
4668
4669 // decide the query type
4670 // 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
4671 class QueryDecider {
4672 public:
4673 std::vector<int> type_;
4674 std::vector<double> ratio_;
4675 int range_;
4676
4677 QueryDecider() {}
4678 ~QueryDecider() {}
4679
4680 Status Initiate(std::vector<double> ratio_input) {
4681 int range_max = 1000;
4682 double sum = 0.0;
4683 for (auto& ratio : ratio_input) {
4684 sum += ratio;
4685 }
4686 range_ = 0;
4687 for (auto& ratio : ratio_input) {
4688 range_ += static_cast<int>(ceil(range_max * (ratio / sum)));
4689 type_.push_back(range_);
4690 ratio_.push_back(ratio / sum);
4691 }
4692 return Status::OK();
4693 }
4694
4695 int GetType(int64_t rand_num) {
4696 if (rand_num < 0) {
4697 rand_num = rand_num * (-1);
4698 }
4699 assert(range_ != 0);
4700 int pos = static_cast<int>(rand_num % range_);
4701 for (int i = 0; i < static_cast<int>(type_.size()); i++) {
4702 if (pos < type_[i]) {
4703 return i;
4704 }
4705 }
4706 return 0;
4707 }
4708 };
4709
4710 // The graph wokrload mixed with Get, Put, Iterator
4711 void MixGraph(ThreadState* thread) {
4712 int64_t read = 0; // including single gets and Next of iterators
4713 int64_t gets = 0;
4714 int64_t puts = 0;
4715 int64_t found = 0;
4716 int64_t seek = 0;
4717 int64_t seek_found = 0;
4718 int64_t bytes = 0;
4719 const int64_t default_value_max = 1 * 1024 * 1024;
4720 int64_t value_max = default_value_max;
4721 int64_t scan_len_max = FLAGS_mix_max_scan_len;
4722 double write_rate = 1000000.0;
4723 double read_rate = 1000000.0;
4724 std::vector<double> ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio,
4725 FLAGS_mix_seek_ratio};
4726 char value_buffer[default_value_max];
4727 QueryDecider query;
4728 RandomGenerator gen;
4729 Status s;
4730 if (value_max > FLAGS_mix_max_value_size) {
4731 value_max = FLAGS_mix_max_value_size;
4732 }
4733
4734 ReadOptions options(FLAGS_verify_checksum, true);
4735 std::unique_ptr<const char[]> key_guard;
4736 Slice key = AllocateKey(&key_guard);
4737 PinnableSlice pinnable_val;
4738 query.Initiate(ratio);
4739
4740 // the limit of qps initiation
4741 if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
4742 thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
4743 read_rate, 100000 /* refill_period_us */, 10 /* fairness */,
4744 RateLimiter::Mode::kReadsOnly));
4745 thread->shared->write_rate_limiter.reset(
4746 NewGenericRateLimiter(write_rate));
4747 }
4748
4749 Duration duration(FLAGS_duration, reads_);
4750 while (!duration.Done(1)) {
4751 DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
4752 int64_t rand_v, key_rand, key_seed;
4753 rand_v = GetRandomKey(&thread->rand) % FLAGS_num;
4754 double u = static_cast<double>(rand_v) / FLAGS_num;
4755 key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
4756 Random64 rand(key_seed);
4757 key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
4758 GenerateKeyFromInt(key_rand, FLAGS_num, &key);
4759 int query_type = query.GetType(rand_v);
4760
4761 // change the qps
4762 uint64_t now = FLAGS_env->NowMicros();
4763 uint64_t usecs_since_last;
4764 if (now > thread->stats.GetSineInterval()) {
4765 usecs_since_last = now - thread->stats.GetSineInterval();
4766 } else {
4767 usecs_since_last = 0;
4768 }
4769
4770 if (usecs_since_last >
4771 (FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
4772 double usecs_since_start =
4773 static_cast<double>(now - thread->stats.GetStart());
4774 thread->stats.ResetSineInterval();
4775 double mix_rate_with_noise = AddNoise(
4776 SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);
4777 read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);
4778 write_rate =
4779 mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;
4780
4781 thread->shared->write_rate_limiter.reset(
4782 NewGenericRateLimiter(write_rate));
4783 thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
4784 read_rate,
4785 FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
4786 RateLimiter::Mode::kReadsOnly));
4787 }
4788 // Start the query
4789 if (query_type == 0) {
4790 // the Get query
4791 gets++;
4792 read++;
4793 if (FLAGS_num_column_families > 1) {
4794 s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
4795 &pinnable_val);
4796 } else {
4797 pinnable_val.Reset();
4798 s = db_with_cfh->db->Get(options,
4799 db_with_cfh->db->DefaultColumnFamily(), key,
4800 &pinnable_val);
4801 }
4802
4803 if (s.ok()) {
4804 found++;
4805 bytes += key.size() + pinnable_val.size();
4806 } else if (!s.IsNotFound()) {
4807 fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
4808 abort();
4809 }
4810
4811 if (thread->shared->read_rate_limiter.get() != nullptr &&
4812 read % 256 == 255) {
4813 thread->shared->read_rate_limiter->Request(
4814 256, Env::IO_HIGH, nullptr /* stats */,
4815 RateLimiter::OpType::kRead);
4816 }
4817 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
4818 } else if (query_type == 1) {
4819 // the Put query
4820 puts++;
4821 int64_t value_size = ParetoCdfInversion(
4822 u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
4823 if (value_size < 0) {
4824 value_size = 10;
4825 } else if (value_size > value_max) {
4826 value_size = value_size % value_max;
4827 }
4828 s = db_with_cfh->db->Put(
4829 write_options_, key,
4830 gen.Generate(static_cast<unsigned int>(value_size)));
4831 if (!s.ok()) {
4832 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
4833 exit(1);
4834 }
4835
4836 if (thread->shared->write_rate_limiter) {
4837 thread->shared->write_rate_limiter->Request(
4838 key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/,
4839 RateLimiter::OpType::kWrite);
4840 }
4841 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
4842 } else if (query_type == 2) {
4843 // Seek query
4844 if (db_with_cfh->db != nullptr) {
4845 Iterator* single_iter = nullptr;
4846 single_iter = db_with_cfh->db->NewIterator(options);
4847 if (single_iter != nullptr) {
4848 single_iter->Seek(key);
4849 seek++;
4850 read++;
4851 if (single_iter->Valid() && single_iter->key().compare(key) == 0) {
4852 seek_found++;
4853 }
4854 int64_t scan_length =
4855 ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k,
4856 FLAGS_iter_sigma) %
4857 scan_len_max;
4858 for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) {
4859 Slice value = single_iter->value();
4860 memcpy(value_buffer, value.data(),
4861 std::min(value.size(), sizeof(value_buffer)));
4862 bytes += single_iter->key().size() + single_iter->value().size();
4863 single_iter->Next();
4864 assert(single_iter->status().ok());
4865 }
4866 }
4867 delete single_iter;
4868 }
4869 thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kSeek);
4870 }
4871 }
4872 char msg[256];
4873 snprintf(msg, sizeof(msg),
4874 "( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64 " of %" PRIu64
4875 " in %" PRIu64 " found)\n",
4876 gets, puts, seek, found, read);
4877
4878 thread->stats.AddBytes(bytes);
4879 thread->stats.AddMessage(msg);
4880
4881 if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
4882 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
4883 get_perf_context()->ToString());
4884 }
4885 }
4886
4887 void IteratorCreation(ThreadState* thread) {
4888 Duration duration(FLAGS_duration, reads_);
4889 ReadOptions options(FLAGS_verify_checksum, true);
4890 while (!duration.Done(1)) {
4891 DB* db = SelectDB(thread);
4892 Iterator* iter = db->NewIterator(options);
4893 delete iter;
4894 thread->stats.FinishedOps(nullptr, db, 1, kOthers);
4895 }
4896 }
4897
4898 void IteratorCreationWhileWriting(ThreadState* thread) {
4899 if (thread->tid > 0) {
4900 IteratorCreation(thread);
4901 } else {
4902 BGWriter(thread, kWrite);
4903 }
4904 }
4905
4906 void SeekRandom(ThreadState* thread) {
4907 int64_t read = 0;
4908 int64_t found = 0;
4909 int64_t bytes = 0;
4910 ReadOptions options(FLAGS_verify_checksum, true);
4911 options.tailing = FLAGS_use_tailing_iterator;
4912
4913 Iterator* single_iter = nullptr;
4914 std::vector<Iterator*> multi_iters;
4915 if (db_.db != nullptr) {
4916 single_iter = db_.db->NewIterator(options);
4917 } else {
4918 for (const auto& db_with_cfh : multi_dbs_) {
4919 multi_iters.push_back(db_with_cfh.db->NewIterator(options));
4920 }
4921 }
4922
4923 std::unique_ptr<const char[]> key_guard;
4924 Slice key = AllocateKey(&key_guard);
4925
4926 std::unique_ptr<const char[]> upper_bound_key_guard;
4927 Slice upper_bound = AllocateKey(&upper_bound_key_guard);
4928 std::unique_ptr<const char[]> lower_bound_key_guard;
4929 Slice lower_bound = AllocateKey(&lower_bound_key_guard);
4930
4931 Duration duration(FLAGS_duration, reads_);
4932 char value_buffer[256];
4933 while (!duration.Done(1)) {
4934 int64_t seek_pos = thread->rand.Next() % FLAGS_num;
4935 GenerateKeyFromInt((uint64_t)seek_pos, FLAGS_num, &key);
4936 if (FLAGS_max_scan_distance != 0) {
4937 if (FLAGS_reverse_iterator) {
4938 GenerateKeyFromInt(
4939 static_cast<uint64_t>(std::max(
4940 static_cast<int64_t>(0), seek_pos - FLAGS_max_scan_distance)),
4941 FLAGS_num, &lower_bound);
4942 options.iterate_lower_bound = &lower_bound;
4943 } else {
4944 GenerateKeyFromInt(
4945 (uint64_t)std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance),
4946 FLAGS_num, &upper_bound);
4947 options.iterate_upper_bound = &upper_bound;
4948 }
4949 }
4950
4951 if (!FLAGS_use_tailing_iterator) {
4952 if (db_.db != nullptr) {
4953 delete single_iter;
4954 single_iter = db_.db->NewIterator(options);
4955 } else {
4956 for (auto iter : multi_iters) {
4957 delete iter;
4958 }
4959 multi_iters.clear();
4960 for (const auto& db_with_cfh : multi_dbs_) {
4961 multi_iters.push_back(db_with_cfh.db->NewIterator(options));
4962 }
4963 }
4964 }
4965 // Pick a Iterator to use
4966 Iterator* iter_to_use = single_iter;
4967 if (single_iter == nullptr) {
4968 iter_to_use = multi_iters[thread->rand.Next() % multi_iters.size()];
4969 }
4970
4971 iter_to_use->Seek(key);
4972 read++;
4973 if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
4974 found++;
4975 }
4976
4977 for (int j = 0; j < FLAGS_seek_nexts && iter_to_use->Valid(); ++j) {
4978 // Copy out iterator's value to make sure we read them.
4979 Slice value = iter_to_use->value();
4980 memcpy(value_buffer, value.data(),
4981 std::min(value.size(), sizeof(value_buffer)));
4982 bytes += iter_to_use->key().size() + iter_to_use->value().size();
4983
4984 if (!FLAGS_reverse_iterator) {
4985 iter_to_use->Next();
4986 } else {
4987 iter_to_use->Prev();
4988 }
4989 assert(iter_to_use->status().ok());
4990 }
4991
4992 if (thread->shared->read_rate_limiter.get() != nullptr &&
4993 read % 256 == 255) {
4994 thread->shared->read_rate_limiter->Request(
4995 256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
4996 }
4997
4998 thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
4999 }
5000 delete single_iter;
5001 for (auto iter : multi_iters) {
5002 delete iter;
5003 }
5004
5005 char msg[100];
5006 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
5007 found, read);
5008 thread->stats.AddBytes(bytes);
5009 thread->stats.AddMessage(msg);
5010 if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
5011 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
5012 get_perf_context()->ToString());
5013 }
5014 }
5015
5016 void SeekRandomWhileWriting(ThreadState* thread) {
5017 if (thread->tid > 0) {
5018 SeekRandom(thread);
5019 } else {
5020 BGWriter(thread, kWrite);
5021 }
5022 }
5023
5024 void SeekRandomWhileMerging(ThreadState* thread) {
5025 if (thread->tid > 0) {
5026 SeekRandom(thread);
5027 } else {
5028 BGWriter(thread, kMerge);
5029 }
5030 }
5031
5032 void DoDelete(ThreadState* thread, bool seq) {
5033 WriteBatch batch;
5034 Duration duration(seq ? 0 : FLAGS_duration, deletes_);
5035 int64_t i = 0;
5036 std::unique_ptr<const char[]> key_guard;
5037 Slice key = AllocateKey(&key_guard);
5038
5039 while (!duration.Done(entries_per_batch_)) {
5040 DB* db = SelectDB(thread);
5041 batch.Clear();
5042 for (int64_t j = 0; j < entries_per_batch_; ++j) {
5043 const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
5044 GenerateKeyFromInt(k, FLAGS_num, &key);
5045 batch.Delete(key);
5046 }
5047 auto s = db->Write(write_options_, &batch);
5048 thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete);
5049 if (!s.ok()) {
5050 fprintf(stderr, "del error: %s\n", s.ToString().c_str());
5051 exit(1);
5052 }
5053 i += entries_per_batch_;
5054 }
5055 }
5056
5057 void DeleteSeq(ThreadState* thread) {
5058 DoDelete(thread, true);
5059 }
5060
5061 void DeleteRandom(ThreadState* thread) {
5062 DoDelete(thread, false);
5063 }
5064
5065 void ReadWhileWriting(ThreadState* thread) {
5066 if (thread->tid > 0) {
5067 ReadRandom(thread);
5068 } else {
5069 BGWriter(thread, kWrite);
5070 }
5071 }
5072
5073 void ReadWhileMerging(ThreadState* thread) {
5074 if (thread->tid > 0) {
5075 ReadRandom(thread);
5076 } else {
5077 BGWriter(thread, kMerge);
5078 }
5079 }
5080
5081 void BGWriter(ThreadState* thread, enum OperationType write_merge) {
5082 // Special thread that keeps writing until other threads are done.
5083 RandomGenerator gen;
5084 int64_t bytes = 0;
5085
5086 std::unique_ptr<RateLimiter> write_rate_limiter;
5087 if (FLAGS_benchmark_write_rate_limit > 0) {
5088 write_rate_limiter.reset(
5089 NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
5090 }
5091
5092 // Don't merge stats from this thread with the readers.
5093 thread->stats.SetExcludeFromMerge();
5094
5095 std::unique_ptr<const char[]> key_guard;
5096 Slice key = AllocateKey(&key_guard);
5097 uint32_t written = 0;
5098 bool hint_printed = false;
5099
5100 while (true) {
5101 DB* db = SelectDB(thread);
5102 {
5103 MutexLock l(&thread->shared->mu);
5104 if (FLAGS_finish_after_writes && written == writes_) {
5105 fprintf(stderr, "Exiting the writer after %u writes...\n", written);
5106 break;
5107 }
5108 if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
5109 // Other threads have finished
5110 if (FLAGS_finish_after_writes) {
5111 // Wait for the writes to be finished
5112 if (!hint_printed) {
5113 fprintf(stderr, "Reads are finished. Have %d more writes to do\n",
5114 (int)writes_ - written);
5115 hint_printed = true;
5116 }
5117 } else {
5118 // Finish the write immediately
5119 break;
5120 }
5121 }
5122 }
5123
5124 GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
5125 Status s;
5126
5127 if (write_merge == kWrite) {
5128 s = db->Put(write_options_, key, gen.Generate(value_size_));
5129 } else {
5130 s = db->Merge(write_options_, key, gen.Generate(value_size_));
5131 }
5132 written++;
5133
5134 if (!s.ok()) {
5135 fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
5136 exit(1);
5137 }
5138 bytes += key.size() + value_size_;
5139 thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
5140
5141 if (FLAGS_benchmark_write_rate_limit > 0) {
5142 write_rate_limiter->Request(
5143 entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
5144 nullptr /* stats */, RateLimiter::OpType::kWrite);
5145 }
5146 }
5147 thread->stats.AddBytes(bytes);
5148 }
5149
5150 void ReadWhileScanning(ThreadState* thread) {
5151 if (thread->tid > 0) {
5152 ReadRandom(thread);
5153 } else {
5154 BGScan(thread);
5155 }
5156 }
5157
5158 void BGScan(ThreadState* thread) {
5159 if (FLAGS_num_multi_db > 0) {
5160 fprintf(stderr, "Not supporting multiple DBs.\n");
5161 abort();
5162 }
5163 assert(db_.db != nullptr);
5164 ReadOptions read_options;
5165 Iterator* iter = db_.db->NewIterator(read_options);
5166
5167 fprintf(stderr, "num reads to do %" PRIu64 "\n", reads_);
5168 Duration duration(FLAGS_duration, reads_);
5169 uint64_t num_seek_to_first = 0;
5170 uint64_t num_next = 0;
5171 while (!duration.Done(1)) {
5172 if (!iter->Valid()) {
5173 iter->SeekToFirst();
5174 num_seek_to_first++;
5175 } else if (!iter->status().ok()) {
5176 fprintf(stderr, "Iterator error: %s\n",
5177 iter->status().ToString().c_str());
5178 abort();
5179 } else {
5180 iter->Next();
5181 num_next++;
5182 }
5183
5184 thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
5185 }
5186 delete iter;
5187 }
5188
5189 // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
5190 // in DB atomically i.e in a single batch. Also refer GetMany.
5191 Status PutMany(DB* db, const WriteOptions& writeoptions, const Slice& key,
5192 const Slice& value) {
5193 std::string suffixes[3] = {"2", "1", "0"};
5194 std::string keys[3];
5195
5196 WriteBatch batch;
5197 Status s;
5198 for (int i = 0; i < 3; i++) {
5199 keys[i] = key.ToString() + suffixes[i];
5200 batch.Put(keys[i], value);
5201 }
5202
5203 s = db->Write(writeoptions, &batch);
5204 return s;
5205 }
5206
5207
5208 // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
5209 // in DB atomically i.e in a single batch. Also refer GetMany.
5210 Status DeleteMany(DB* db, const WriteOptions& writeoptions,
5211 const Slice& key) {
5212 std::string suffixes[3] = {"1", "2", "0"};
5213 std::string keys[3];
5214
5215 WriteBatch batch;
5216 Status s;
5217 for (int i = 0; i < 3; i++) {
5218 keys[i] = key.ToString() + suffixes[i];
5219 batch.Delete(keys[i]);
5220 }
5221
5222 s = db->Write(writeoptions, &batch);
5223 return s;
5224 }
5225
5226 // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
5227 // in the same snapshot, and verifies that all the values are identical.
5228 // ASSUMES that PutMany was used to put (K, V) into the DB.
5229 Status GetMany(DB* db, const ReadOptions& readoptions, const Slice& key,
5230 std::string* value) {
5231 std::string suffixes[3] = {"0", "1", "2"};
5232 std::string keys[3];
5233 Slice key_slices[3];
5234 std::string values[3];
5235 ReadOptions readoptionscopy = readoptions;
5236 readoptionscopy.snapshot = db->GetSnapshot();
5237 Status s;
5238 for (int i = 0; i < 3; i++) {
5239 keys[i] = key.ToString() + suffixes[i];
5240 key_slices[i] = keys[i];
5241 s = db->Get(readoptionscopy, key_slices[i], value);
5242 if (!s.ok() && !s.IsNotFound()) {
5243 fprintf(stderr, "get error: %s\n", s.ToString().c_str());
5244 values[i] = "";
5245 // we continue after error rather than exiting so that we can
5246 // find more errors if any
5247 } else if (s.IsNotFound()) {
5248 values[i] = "";
5249 } else {
5250 values[i] = *value;
5251 }
5252 }
5253 db->ReleaseSnapshot(readoptionscopy.snapshot);
5254
5255 if ((values[0] != values[1]) || (values[1] != values[2])) {
5256 fprintf(stderr, "inconsistent values for key %s: %s, %s, %s\n",
5257 key.ToString().c_str(), values[0].c_str(), values[1].c_str(),
5258 values[2].c_str());
5259 // we continue after error rather than exiting so that we can
5260 // find more errors if any
5261 }
5262
5263 return s;
5264 }
5265
5266 // Differs from readrandomwriterandom in the following ways:
5267 // (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs.
5268 // (b) Does deletes as well (per FLAGS_deletepercent)
5269 // (c) In order to achieve high % of 'found' during lookups, and to do
5270 // multiple writes (including puts and deletes) it uses upto
5271 // FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys.
5272 // (d) Does not have a MultiGet option.
5273 void RandomWithVerify(ThreadState* thread) {
5274 ReadOptions options(FLAGS_verify_checksum, true);
5275 RandomGenerator gen;
5276 std::string value;
5277 int64_t found = 0;
5278 int get_weight = 0;
5279 int put_weight = 0;
5280 int delete_weight = 0;
5281 int64_t gets_done = 0;
5282 int64_t puts_done = 0;
5283 int64_t deletes_done = 0;
5284
5285 std::unique_ptr<const char[]> key_guard;
5286 Slice key = AllocateKey(&key_guard);
5287
5288 // the number of iterations is the larger of read_ or write_
5289 for (int64_t i = 0; i < readwrites_; i++) {
5290 DB* db = SelectDB(thread);
5291 if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
5292 // one batch completed, reinitialize for next batch
5293 get_weight = FLAGS_readwritepercent;
5294 delete_weight = FLAGS_deletepercent;
5295 put_weight = 100 - get_weight - delete_weight;
5296 }
5297 GenerateKeyFromInt(thread->rand.Next() % FLAGS_numdistinct,
5298 FLAGS_numdistinct, &key);
5299 if (get_weight > 0) {
5300 // do all the gets first
5301 Status s = GetMany(db, options, key, &value);
5302 if (!s.ok() && !s.IsNotFound()) {
5303 fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
5304 // we continue after error rather than exiting so that we can
5305 // find more errors if any
5306 } else if (!s.IsNotFound()) {
5307 found++;
5308 }
5309 get_weight--;
5310 gets_done++;
5311 thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
5312 } else if (put_weight > 0) {
5313 // then do all the corresponding number of puts
5314 // for all the gets we have done earlier
5315 Status s = PutMany(db, write_options_, key, gen.Generate(value_size_));
5316 if (!s.ok()) {
5317 fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
5318 exit(1);
5319 }
5320 put_weight--;
5321 puts_done++;
5322 thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
5323 } else if (delete_weight > 0) {
5324 Status s = DeleteMany(db, write_options_, key);
5325 if (!s.ok()) {
5326 fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
5327 exit(1);
5328 }
5329 delete_weight--;
5330 deletes_done++;
5331 thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
5332 }
5333 }
5334 char msg[128];
5335 snprintf(msg, sizeof(msg),
5336 "( get:%" PRIu64 " put:%" PRIu64 " del:%" PRIu64 " total:%" \
5337 PRIu64 " found:%" PRIu64 ")",
5338 gets_done, puts_done, deletes_done, readwrites_, found);
5339 thread->stats.AddMessage(msg);
5340 }
5341
5342 // This is different from ReadWhileWriting because it does not use
5343 // an extra thread.
5344 void ReadRandomWriteRandom(ThreadState* thread) {
5345 ReadOptions options(FLAGS_verify_checksum, true);
5346 RandomGenerator gen;
5347 std::string value;
5348 int64_t found = 0;
5349 int get_weight = 0;
5350 int put_weight = 0;
5351 int64_t reads_done = 0;
5352 int64_t writes_done = 0;
5353 Duration duration(FLAGS_duration, readwrites_);
5354
5355 std::unique_ptr<const char[]> key_guard;
5356 Slice key = AllocateKey(&key_guard);
5357
5358 // the number of iterations is the larger of read_ or write_
5359 while (!duration.Done(1)) {
5360 DB* db = SelectDB(thread);
5361 GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
5362 if (get_weight == 0 && put_weight == 0) {
5363 // one batch completed, reinitialize for next batch
5364 get_weight = FLAGS_readwritepercent;
5365 put_weight = 100 - get_weight;
5366 }
5367 if (get_weight > 0) {
5368 // do all the gets first
5369 Status s = db->Get(options, key, &value);
5370 if (!s.ok() && !s.IsNotFound()) {
5371 fprintf(stderr, "get error: %s\n", s.ToString().c_str());
5372 // we continue after error rather than exiting so that we can
5373 // find more errors if any
5374 } else if (!s.IsNotFound()) {
5375 found++;
5376 }
5377 get_weight--;
5378 reads_done++;
5379 thread->stats.FinishedOps(nullptr, db, 1, kRead);
5380 } else if (put_weight > 0) {
5381 // then do all the corresponding number of puts
5382 // for all the gets we have done earlier
5383 Status s = db->Put(write_options_, key, gen.Generate(value_size_));
5384 if (!s.ok()) {
5385 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
5386 exit(1);
5387 }
5388 put_weight--;
5389 writes_done++;
5390 thread->stats.FinishedOps(nullptr, db, 1, kWrite);
5391 }
5392 }
5393 char msg[100];
5394 snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
5395 " total:%" PRIu64 " found:%" PRIu64 ")",
5396 reads_done, writes_done, readwrites_, found);
5397 thread->stats.AddMessage(msg);
5398 }
5399
5400 //
5401 // Read-modify-write for random keys
5402 void UpdateRandom(ThreadState* thread) {
5403 ReadOptions options(FLAGS_verify_checksum, true);
5404 RandomGenerator gen;
5405 std::string value;
5406 int64_t found = 0;
5407 int64_t bytes = 0;
5408 Duration duration(FLAGS_duration, readwrites_);
5409
5410 std::unique_ptr<const char[]> key_guard;
5411 Slice key = AllocateKey(&key_guard);
5412 // the number of iterations is the larger of read_ or write_
5413 while (!duration.Done(1)) {
5414 DB* db = SelectDB(thread);
5415 GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
5416
5417 auto status = db->Get(options, key, &value);
5418 if (status.ok()) {
5419 ++found;
5420 bytes += key.size() + value.size();
5421 } else if (!status.IsNotFound()) {
5422 fprintf(stderr, "Get returned an error: %s\n",
5423 status.ToString().c_str());
5424 abort();
5425 }
5426
5427 if (thread->shared->write_rate_limiter) {
5428 thread->shared->write_rate_limiter->Request(
5429 key.size() + value_size_, Env::IO_HIGH, nullptr /*stats*/,
5430 RateLimiter::OpType::kWrite);
5431 }
5432
5433 Status s = db->Put(write_options_, key, gen.Generate(value_size_));
5434 if (!s.ok()) {
5435 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
5436 exit(1);
5437 }
5438 bytes += key.size() + value_size_;
5439 thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
5440 }
5441 char msg[100];
5442 snprintf(msg, sizeof(msg),
5443 "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
5444 thread->stats.AddBytes(bytes);
5445 thread->stats.AddMessage(msg);
5446 }
5447
5448 // Read-XOR-write for random keys. Xors the existing value with a randomly
5449 // generated value, and stores the result. Assuming A in the array of bytes
5450 // representing the existing value, we generate an array B of the same size,
5451 // then compute C = A^B as C[i]=A[i]^B[i], and store C
5452 void XORUpdateRandom(ThreadState* thread) {
5453 ReadOptions options(FLAGS_verify_checksum, true);
5454 RandomGenerator gen;
5455 std::string existing_value;
5456 int64_t found = 0;
5457 Duration duration(FLAGS_duration, readwrites_);
5458
5459 BytesXOROperator xor_operator;
5460
5461 std::unique_ptr<const char[]> key_guard;
5462 Slice key = AllocateKey(&key_guard);
5463 // the number of iterations is the larger of read_ or write_
5464 while (!duration.Done(1)) {
5465 DB* db = SelectDB(thread);
5466 GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
5467
5468 auto status = db->Get(options, key, &existing_value);
5469 if (status.ok()) {
5470 ++found;
5471 } else if (!status.IsNotFound()) {
5472 fprintf(stderr, "Get returned an error: %s\n",
5473 status.ToString().c_str());
5474 exit(1);
5475 }
5476
5477 Slice value = gen.Generate(value_size_);
5478 std::string new_value;
5479
5480 if (status.ok()) {
5481 Slice existing_value_slice = Slice(existing_value);
5482 xor_operator.XOR(&existing_value_slice, value, &new_value);
5483 } else {
5484 xor_operator.XOR(nullptr, value, &new_value);
5485 }
5486
5487 Status s = db->Put(write_options_, key, Slice(new_value));
5488 if (!s.ok()) {
5489 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
5490 exit(1);
5491 }
5492 thread->stats.FinishedOps(nullptr, db, 1);
5493 }
5494 char msg[100];
5495 snprintf(msg, sizeof(msg),
5496 "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
5497 thread->stats.AddMessage(msg);
5498 }
5499
5500 // Read-modify-write for random keys.
5501 // Each operation causes the key grow by value_size (simulating an append).
5502 // Generally used for benchmarking against merges of similar type
5503 void AppendRandom(ThreadState* thread) {
5504 ReadOptions options(FLAGS_verify_checksum, true);
5505 RandomGenerator gen;
5506 std::string value;
5507 int64_t found = 0;
5508 int64_t bytes = 0;
5509
5510 std::unique_ptr<const char[]> key_guard;
5511 Slice key = AllocateKey(&key_guard);
5512 // The number of iterations is the larger of read_ or write_
5513 Duration duration(FLAGS_duration, readwrites_);
5514 while (!duration.Done(1)) {
5515 DB* db = SelectDB(thread);
5516 GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
5517
5518 auto status = db->Get(options, key, &value);
5519 if (status.ok()) {
5520 ++found;
5521 bytes += key.size() + value.size();
5522 } else if (!status.IsNotFound()) {
5523 fprintf(stderr, "Get returned an error: %s\n",
5524 status.ToString().c_str());
5525 abort();
5526 } else {
5527 // If not existing, then just assume an empty string of data
5528 value.clear();
5529 }
5530
5531 // Update the value (by appending data)
5532 Slice operand = gen.Generate(value_size_);
5533 if (value.size() > 0) {
5534 // Use a delimiter to match the semantics for StringAppendOperator
5535 value.append(1,',');
5536 }
5537 value.append(operand.data(), operand.size());
5538
5539 // Write back to the database
5540 Status s = db->Put(write_options_, key, value);
5541 if (!s.ok()) {
5542 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
5543 exit(1);
5544 }
5545 bytes += key.size() + value.size();
5546 thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
5547 }
5548
5549 char msg[100];
5550 snprintf(msg, sizeof(msg), "( updates:%" PRIu64 " found:%" PRIu64 ")",
5551 readwrites_, found);
5552 thread->stats.AddBytes(bytes);
5553 thread->stats.AddMessage(msg);
5554 }
5555
5556 // Read-modify-write for random keys (using MergeOperator)
5557 // The merge operator to use should be defined by FLAGS_merge_operator
5558 // Adjust FLAGS_value_size so that the keys are reasonable for this operator
5559 // Assumes that the merge operator is non-null (i.e.: is well-defined)
5560 //
5561 // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
5562 // to simulate random additions over 64-bit integers using merge.
5563 //
5564 // The number of merges on the same key can be controlled by adjusting
5565 // FLAGS_merge_keys.
5566 void MergeRandom(ThreadState* thread) {
5567 RandomGenerator gen;
5568 int64_t bytes = 0;
5569 std::unique_ptr<const char[]> key_guard;
5570 Slice key = AllocateKey(&key_guard);
5571 // The number of iterations is the larger of read_ or write_
5572 Duration duration(FLAGS_duration, readwrites_);
5573 while (!duration.Done(1)) {
5574 DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
5575 int64_t key_rand = thread->rand.Next() % merge_keys_;
5576 GenerateKeyFromInt(key_rand, merge_keys_, &key);
5577
5578 Status s;
5579 if (FLAGS_num_column_families > 1) {
5580 s = db_with_cfh->db->Merge(write_options_,
5581 db_with_cfh->GetCfh(key_rand), key,
5582 gen.Generate(value_size_));
5583 } else {
5584 s = db_with_cfh->db->Merge(write_options_,
5585 db_with_cfh->db->DefaultColumnFamily(), key,
5586 gen.Generate(value_size_));
5587 }
5588
5589 if (!s.ok()) {
5590 fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
5591 exit(1);
5592 }
5593 bytes += key.size() + value_size_;
5594 thread->stats.FinishedOps(nullptr, db_with_cfh->db, 1, kMerge);
5595 }
5596
5597 // Print some statistics
5598 char msg[100];
5599 snprintf(msg, sizeof(msg), "( updates:%" PRIu64 ")", readwrites_);
5600 thread->stats.AddBytes(bytes);
5601 thread->stats.AddMessage(msg);
5602 }
5603
5604 // Read and merge random keys. The amount of reads and merges are controlled
5605 // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
5606 // keys (and thus also the number of reads and merges on the same key) can be
5607 // adjusted with FLAGS_merge_keys.
5608 //
5609 // As with MergeRandom, the merge operator to use should be defined by
5610 // FLAGS_merge_operator.
5611 void ReadRandomMergeRandom(ThreadState* thread) {
5612 ReadOptions options(FLAGS_verify_checksum, true);
5613 RandomGenerator gen;
5614 std::string value;
5615 int64_t num_hits = 0;
5616 int64_t num_gets = 0;
5617 int64_t num_merges = 0;
5618 size_t max_length = 0;
5619
5620 std::unique_ptr<const char[]> key_guard;
5621 Slice key = AllocateKey(&key_guard);
5622 // the number of iterations is the larger of read_ or write_
5623 Duration duration(FLAGS_duration, readwrites_);
5624 while (!duration.Done(1)) {
5625 DB* db = SelectDB(thread);
5626 GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
5627
5628 bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;
5629
5630 if (do_merge) {
5631 Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
5632 if (!s.ok()) {
5633 fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
5634 exit(1);
5635 }
5636 num_merges++;
5637 thread->stats.FinishedOps(nullptr, db, 1, kMerge);
5638 } else {
5639 Status s = db->Get(options, key, &value);
5640 if (value.length() > max_length)
5641 max_length = value.length();
5642
5643 if (!s.ok() && !s.IsNotFound()) {
5644 fprintf(stderr, "get error: %s\n", s.ToString().c_str());
5645 // we continue after error rather than exiting so that we can
5646 // find more errors if any
5647 } else if (!s.IsNotFound()) {
5648 num_hits++;
5649 }
5650 num_gets++;
5651 thread->stats.FinishedOps(nullptr, db, 1, kRead);
5652 }
5653 }
5654
5655 char msg[100];
5656 snprintf(msg, sizeof(msg),
5657 "(reads:%" PRIu64 " merges:%" PRIu64 " total:%" PRIu64
5658 " hits:%" PRIu64 " maxlength:%" ROCKSDB_PRIszt ")",
5659 num_gets, num_merges, readwrites_, num_hits, max_length);
5660 thread->stats.AddMessage(msg);
5661 }
5662
5663 void WriteSeqSeekSeq(ThreadState* thread) {
5664 writes_ = FLAGS_num;
5665 DoWrite(thread, SEQUENTIAL);
5666 // exclude writes from the ops/sec calculation
5667 thread->stats.Start(thread->tid);
5668
5669 DB* db = SelectDB(thread);
5670 std::unique_ptr<Iterator> iter(
5671 db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)));
5672
5673 std::unique_ptr<const char[]> key_guard;
5674 Slice key = AllocateKey(&key_guard);
5675 for (int64_t i = 0; i < FLAGS_num; ++i) {
5676 GenerateKeyFromInt(i, FLAGS_num, &key);
5677 iter->Seek(key);
5678 assert(iter->Valid() && iter->key() == key);
5679 thread->stats.FinishedOps(nullptr, db, 1, kSeek);
5680
5681 for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) {
5682 if (!FLAGS_reverse_iterator) {
5683 iter->Next();
5684 } else {
5685 iter->Prev();
5686 }
5687 GenerateKeyFromInt(++i, FLAGS_num, &key);
5688 assert(iter->Valid() && iter->key() == key);
5689 thread->stats.FinishedOps(nullptr, db, 1, kSeek);
5690 }
5691
5692 iter->Seek(key);
5693 assert(iter->Valid() && iter->key() == key);
5694 thread->stats.FinishedOps(nullptr, db, 1, kSeek);
5695 }
5696 }
5697
5698 #ifndef ROCKSDB_LITE
5699 // This benchmark stress tests Transactions. For a given --duration (or
5700 // total number of --writes, a Transaction will perform a read-modify-write
5701 // to increment the value of a key in each of N(--transaction-sets) sets of
5702 // keys (where each set has --num keys). If --threads is set, this will be
5703 // done in parallel.
5704 //
5705 // To test transactions, use --transaction_db=true. Not setting this
5706 // parameter
5707 // will run the same benchmark without transactions.
5708 //
5709 // RandomTransactionVerify() will then validate the correctness of the results
5710 // by checking if the sum of all keys in each set is the same.
5711 void RandomTransaction(ThreadState* thread) {
5712 ReadOptions options(FLAGS_verify_checksum, true);
5713 Duration duration(FLAGS_duration, readwrites_);
5714 ReadOptions read_options(FLAGS_verify_checksum, true);
5715 uint16_t num_prefix_ranges = static_cast<uint16_t>(FLAGS_transaction_sets);
5716 uint64_t transactions_done = 0;
5717
5718 if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) {
5719 fprintf(stderr, "invalid value for transaction_sets\n");
5720 abort();
5721 }
5722
5723 TransactionOptions txn_options;
5724 txn_options.lock_timeout = FLAGS_transaction_lock_timeout;
5725 txn_options.set_snapshot = FLAGS_transaction_set_snapshot;
5726
5727 RandomTransactionInserter inserter(&thread->rand, write_options_,
5728 read_options, FLAGS_num,
5729 num_prefix_ranges);
5730
5731 if (FLAGS_num_multi_db > 1) {
5732 fprintf(stderr,
5733 "Cannot run RandomTransaction benchmark with "
5734 "FLAGS_multi_db > 1.");
5735 abort();
5736 }
5737
5738 while (!duration.Done(1)) {
5739 bool success;
5740
5741 // RandomTransactionInserter will attempt to insert a key for each
5742 // # of FLAGS_transaction_sets
5743 if (FLAGS_optimistic_transaction_db) {
5744 success = inserter.OptimisticTransactionDBInsert(db_.opt_txn_db);
5745 } else if (FLAGS_transaction_db) {
5746 TransactionDB* txn_db = reinterpret_cast<TransactionDB*>(db_.db);
5747 success = inserter.TransactionDBInsert(txn_db, txn_options);
5748 } else {
5749 success = inserter.DBInsert(db_.db);
5750 }
5751
5752 if (!success) {
5753 fprintf(stderr, "Unexpected error: %s\n",
5754 inserter.GetLastStatus().ToString().c_str());
5755 abort();
5756 }
5757
5758 thread->stats.FinishedOps(nullptr, db_.db, 1, kOthers);
5759 transactions_done++;
5760 }
5761
5762 char msg[100];
5763 if (FLAGS_optimistic_transaction_db || FLAGS_transaction_db) {
5764 snprintf(msg, sizeof(msg),
5765 "( transactions:%" PRIu64 " aborts:%" PRIu64 ")",
5766 transactions_done, inserter.GetFailureCount());
5767 } else {
5768 snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done);
5769 }
5770 thread->stats.AddMessage(msg);
5771
5772 if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
5773 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
5774 get_perf_context()->ToString());
5775 }
5776 thread->stats.AddBytes(static_cast<int64_t>(inserter.GetBytesInserted()));
5777 }
5778
5779 // Verifies consistency of data after RandomTransaction() has been run.
5780 // Since each iteration of RandomTransaction() incremented a key in each set
5781 // by the same value, the sum of the keys in each set should be the same.
5782 void RandomTransactionVerify() {
5783 if (!FLAGS_transaction_db && !FLAGS_optimistic_transaction_db) {
5784 // transactions not used, nothing to verify.
5785 return;
5786 }
5787
5788 Status s =
5789 RandomTransactionInserter::Verify(db_.db,
5790 static_cast<uint16_t>(FLAGS_transaction_sets));
5791
5792 if (s.ok()) {
5793 fprintf(stdout, "RandomTransactionVerify Success.\n");
5794 } else {
5795 fprintf(stdout, "RandomTransactionVerify FAILED!!\n");
5796 }
5797 }
5798 #endif // ROCKSDB_LITE
5799
5800 // Writes and deletes random keys without overwriting keys.
5801 //
5802 // This benchmark is intended to partially replicate the behavior of MyRocks
5803 // secondary indices: All data is stored in keys and updates happen by
5804 // deleting the old version of the key and inserting the new version.
5805 void RandomReplaceKeys(ThreadState* thread) {
5806 std::unique_ptr<const char[]> key_guard;
5807 Slice key = AllocateKey(&key_guard);
5808 std::vector<uint32_t> counters(FLAGS_numdistinct, 0);
5809 size_t max_counter = 50;
5810 RandomGenerator gen;
5811
5812 Status s;
5813 DB* db = SelectDB(thread);
5814 for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
5815 GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
5816 s = db->Put(write_options_, key, gen.Generate(value_size_));
5817 if (!s.ok()) {
5818 fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
5819 exit(1);
5820 }
5821 }
5822
5823 db->GetSnapshot();
5824
5825 std::default_random_engine generator;
5826 std::normal_distribution<double> distribution(FLAGS_numdistinct / 2.0,
5827 FLAGS_stddev);
5828 Duration duration(FLAGS_duration, FLAGS_num);
5829 while (!duration.Done(1)) {
5830 int64_t rnd_id = static_cast<int64_t>(distribution(generator));
5831 int64_t key_id = std::max(std::min(FLAGS_numdistinct - 1, rnd_id),
5832 static_cast<int64_t>(0));
5833 GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
5834 &key);
5835 s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key)
5836 : db->Delete(write_options_, key);
5837 if (s.ok()) {
5838 counters[key_id] = (counters[key_id] + 1) % max_counter;
5839 GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
5840 &key);
5841 s = db->Put(write_options_, key, Slice());
5842 }
5843
5844 if (!s.ok()) {
5845 fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
5846 exit(1);
5847 }
5848
5849 thread->stats.FinishedOps(nullptr, db, 1, kOthers);
5850 }
5851
5852 char msg[200];
5853 snprintf(msg, sizeof(msg),
5854 "use single deletes: %d, "
5855 "standard deviation: %lf\n",
5856 FLAGS_use_single_deletes, FLAGS_stddev);
5857 thread->stats.AddMessage(msg);
5858 }
5859
5860 void TimeSeriesReadOrDelete(ThreadState* thread, bool do_deletion) {
5861 ReadOptions options(FLAGS_verify_checksum, true);
5862 int64_t read = 0;
5863 int64_t found = 0;
5864 int64_t bytes = 0;
5865
5866 Iterator* iter = nullptr;
5867 // Only work on single database
5868 assert(db_.db != nullptr);
5869 iter = db_.db->NewIterator(options);
5870
5871 std::unique_ptr<const char[]> key_guard;
5872 Slice key = AllocateKey(&key_guard);
5873
5874 char value_buffer[256];
5875 while (true) {
5876 {
5877 MutexLock l(&thread->shared->mu);
5878 if (thread->shared->num_done >= 1) {
5879 // Write thread have finished
5880 break;
5881 }
5882 }
5883 if (!FLAGS_use_tailing_iterator) {
5884 delete iter;
5885 iter = db_.db->NewIterator(options);
5886 }
5887 // Pick a Iterator to use
5888
5889 int64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
5890 GenerateKeyFromInt(key_id, FLAGS_num, &key);
5891 // Reset last 8 bytes to 0
5892 char* start = const_cast<char*>(key.data());
5893 start += key.size() - 8;
5894 memset(start, 0, 8);
5895 ++read;
5896
5897 bool key_found = false;
5898 // Seek the prefix
5899 for (iter->Seek(key); iter->Valid() && iter->key().starts_with(key);
5900 iter->Next()) {
5901 key_found = true;
5902 // Copy out iterator's value to make sure we read them.
5903 if (do_deletion) {
5904 bytes += iter->key().size();
5905 if (KeyExpired(timestamp_emulator_.get(), iter->key())) {
5906 thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
5907 db_.db->Delete(write_options_, iter->key());
5908 } else {
5909 break;
5910 }
5911 } else {
5912 bytes += iter->key().size() + iter->value().size();
5913 thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
5914 Slice value = iter->value();
5915 memcpy(value_buffer, value.data(),
5916 std::min(value.size(), sizeof(value_buffer)));
5917
5918 assert(iter->status().ok());
5919 }
5920 }
5921 found += key_found;
5922
5923 if (thread->shared->read_rate_limiter.get() != nullptr) {
5924 thread->shared->read_rate_limiter->Request(
5925 1, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
5926 }
5927 }
5928 delete iter;
5929
5930 char msg[100];
5931 snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", found,
5932 read);
5933 thread->stats.AddBytes(bytes);
5934 thread->stats.AddMessage(msg);
5935 if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
5936 thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
5937 get_perf_context()->ToString());
5938 }
5939 }
5940
5941 void TimeSeriesWrite(ThreadState* thread) {
5942 // Special thread that keeps writing until other threads are done.
5943 RandomGenerator gen;
5944 int64_t bytes = 0;
5945
5946 // Don't merge stats from this thread with the readers.
5947 thread->stats.SetExcludeFromMerge();
5948
5949 std::unique_ptr<RateLimiter> write_rate_limiter;
5950 if (FLAGS_benchmark_write_rate_limit > 0) {
5951 write_rate_limiter.reset(
5952 NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
5953 }
5954
5955 std::unique_ptr<const char[]> key_guard;
5956 Slice key = AllocateKey(&key_guard);
5957
5958 Duration duration(FLAGS_duration, writes_);
5959 while (!duration.Done(1)) {
5960 DB* db = SelectDB(thread);
5961
5962 uint64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
5963 // Write key id
5964 GenerateKeyFromInt(key_id, FLAGS_num, &key);
5965 // Write timestamp
5966
5967 char* start = const_cast<char*>(key.data());
5968 char* pos = start + 8;
5969 int bytes_to_fill =
5970 std::min(key_size_ - static_cast<int>(pos - start), 8);
5971 uint64_t timestamp_value = timestamp_emulator_->Get();
5972 if (port::kLittleEndian) {
5973 for (int i = 0; i < bytes_to_fill; ++i) {
5974 pos[i] = (timestamp_value >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
5975 }
5976 } else {
5977 memcpy(pos, static_cast<void*>(&timestamp_value), bytes_to_fill);
5978 }
5979
5980 timestamp_emulator_->Inc();
5981
5982 Status s;
5983
5984 s = db->Put(write_options_, key, gen.Generate(value_size_));
5985
5986 if (!s.ok()) {
5987 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
5988 exit(1);
5989 }
5990 bytes = key.size() + value_size_;
5991 thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
5992 thread->stats.AddBytes(bytes);
5993
5994 if (FLAGS_benchmark_write_rate_limit > 0) {
5995 write_rate_limiter->Request(
5996 entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
5997 nullptr /* stats */, RateLimiter::OpType::kWrite);
5998 }
5999 }
6000 }
6001
6002 void TimeSeries(ThreadState* thread) {
6003 if (thread->tid > 0) {
6004 bool do_deletion = FLAGS_expire_style == "delete" &&
6005 thread->tid <= FLAGS_num_deletion_threads;
6006 TimeSeriesReadOrDelete(thread, do_deletion);
6007 } else {
6008 TimeSeriesWrite(thread);
6009 thread->stats.Stop();
6010 thread->stats.Report("timeseries write");
6011 }
6012 }
6013
6014 void Compact(ThreadState* thread) {
6015 DB* db = SelectDB(thread);
6016 CompactRangeOptions cro;
6017 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
6018 db->CompactRange(cro, nullptr, nullptr);
6019 }
6020
6021 void CompactAll() {
6022 if (db_.db != nullptr) {
6023 db_.db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
6024 }
6025 for (const auto& db_with_cfh : multi_dbs_) {
6026 db_with_cfh.db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
6027 }
6028 }
6029
6030 void ResetStats() {
6031 if (db_.db != nullptr) {
6032 db_.db->ResetStats();
6033 }
6034 for (const auto& db_with_cfh : multi_dbs_) {
6035 db_with_cfh.db->ResetStats();
6036 }
6037 }
6038
6039 void PrintStats(const char* key) {
6040 if (db_.db != nullptr) {
6041 PrintStats(db_.db, key, false);
6042 }
6043 for (const auto& db_with_cfh : multi_dbs_) {
6044 PrintStats(db_with_cfh.db, key, true);
6045 }
6046 }
6047
6048 void PrintStats(DB* db, const char* key, bool print_header = false) {
6049 if (print_header) {
6050 fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
6051 }
6052 std::string stats;
6053 if (!db->GetProperty(key, &stats)) {
6054 stats = "(failed)";
6055 }
6056 fprintf(stdout, "\n%s\n", stats.c_str());
6057 }
6058
6059 void Replay(ThreadState* thread) {
6060 if (db_.db != nullptr) {
6061 Replay(thread, &db_);
6062 }
6063 }
6064
6065 void Replay(ThreadState* /*thread*/, DBWithColumnFamilies* db_with_cfh) {
6066 Status s;
6067 std::unique_ptr<TraceReader> trace_reader;
6068 s = NewFileTraceReader(FLAGS_env, EnvOptions(), FLAGS_trace_file,
6069 &trace_reader);
6070 if (!s.ok()) {
6071 fprintf(
6072 stderr,
6073 "Encountered an error creating a TraceReader from the trace file. "
6074 "Error: %s\n",
6075 s.ToString().c_str());
6076 exit(1);
6077 }
6078 Replayer replayer(db_with_cfh->db, db_with_cfh->cfh,
6079 std::move(trace_reader));
6080 s = replayer.Replay();
6081 if (s.ok()) {
6082 fprintf(stdout, "Replay started from trace_file: %s\n",
6083 FLAGS_trace_file.c_str());
6084 } else {
6085 fprintf(stderr, "Starting replay failed. Error: %s\n",
6086 s.ToString().c_str());
6087 }
6088 }
6089 };
6090
6091 int db_bench_tool(int argc, char** argv) {
6092 rocksdb::port::InstallStackTraceHandler();
6093 static bool initialized = false;
6094 if (!initialized) {
6095 SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
6096 " [OPTIONS]...");
6097 initialized = true;
6098 }
6099 ParseCommandLineFlags(&argc, &argv, true);
6100 FLAGS_compaction_style_e = (rocksdb::CompactionStyle) FLAGS_compaction_style;
6101 #ifndef ROCKSDB_LITE
6102 if (FLAGS_statistics && !FLAGS_statistics_string.empty()) {
6103 fprintf(stderr,
6104 "Cannot provide both --statistics and --statistics_string.\n");
6105 exit(1);
6106 }
6107 if (!FLAGS_statistics_string.empty()) {
6108 std::unique_ptr<Statistics> custom_stats_guard;
6109 dbstats.reset(NewCustomObject<Statistics>(FLAGS_statistics_string,
6110 &custom_stats_guard));
6111 custom_stats_guard.release();
6112 if (dbstats == nullptr) {
6113 fprintf(stderr, "No Statistics registered matching string: %s\n",
6114 FLAGS_statistics_string.c_str());
6115 exit(1);
6116 }
6117 }
6118 #endif // ROCKSDB_LITE
6119 if (FLAGS_statistics) {
6120 dbstats = rocksdb::CreateDBStatistics();
6121 }
6122 if (dbstats) {
6123 dbstats->set_stats_level(static_cast<StatsLevel>(FLAGS_stats_level));
6124 }
6125 FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
6126
6127 std::vector<std::string> fanout = rocksdb::StringSplit(
6128 FLAGS_max_bytes_for_level_multiplier_additional, ',');
6129 for (size_t j = 0; j < fanout.size(); j++) {
6130 FLAGS_max_bytes_for_level_multiplier_additional_v.push_back(
6131 #ifndef CYGWIN
6132 std::stoi(fanout[j]));
6133 #else
6134 stoi(fanout[j]));
6135 #endif
6136 }
6137
6138 FLAGS_compression_type_e =
6139 StringToCompressionType(FLAGS_compression_type.c_str());
6140
6141 #ifndef ROCKSDB_LITE
6142 std::unique_ptr<Env> custom_env_guard;
6143 if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
6144 fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
6145 exit(1);
6146 } else if (!FLAGS_env_uri.empty()) {
6147 FLAGS_env = NewCustomObject<Env>(FLAGS_env_uri, &custom_env_guard);
6148 if (FLAGS_env == nullptr) {
6149 fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
6150 exit(1);
6151 }
6152 }
6153 #endif // ROCKSDB_LITE
6154 if (FLAGS_use_existing_keys && !FLAGS_use_existing_db) {
6155 fprintf(stderr,
6156 "`-use_existing_db` must be true for `-use_existing_keys` to be "
6157 "settable\n");
6158 exit(1);
6159 }
6160
6161 if (!FLAGS_hdfs.empty()) {
6162 FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs);
6163 }
6164
6165 if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
6166 FLAGS_compaction_fadvice_e = rocksdb::Options::NONE;
6167 else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
6168 FLAGS_compaction_fadvice_e = rocksdb::Options::NORMAL;
6169 else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "SEQUENTIAL"))
6170 FLAGS_compaction_fadvice_e = rocksdb::Options::SEQUENTIAL;
6171 else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "WILLNEED"))
6172 FLAGS_compaction_fadvice_e = rocksdb::Options::WILLNEED;
6173 else {
6174 fprintf(stdout, "Unknown compaction fadvice:%s\n",
6175 FLAGS_compaction_fadvice.c_str());
6176 }
6177
6178 FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
6179
6180 // Note options sanitization may increase thread pool sizes according to
6181 // max_background_flushes/max_background_compactions/max_background_jobs
6182 FLAGS_env->SetBackgroundThreads(FLAGS_num_high_pri_threads,
6183 rocksdb::Env::Priority::HIGH);
6184 FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
6185 rocksdb::Env::Priority::BOTTOM);
6186 FLAGS_env->SetBackgroundThreads(FLAGS_num_low_pri_threads,
6187 rocksdb::Env::Priority::LOW);
6188
6189 // Choose a location for the test database if none given with --db=<path>
6190 if (FLAGS_db.empty()) {
6191 std::string default_db_path;
6192 rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
6193 default_db_path += "/dbbench";
6194 FLAGS_db = default_db_path;
6195 }
6196
6197 if (FLAGS_stats_interval_seconds > 0) {
6198 // When both are set then FLAGS_stats_interval determines the frequency
6199 // at which the timer is checked for FLAGS_stats_interval_seconds
6200 FLAGS_stats_interval = 1000;
6201 }
6202
6203 rocksdb::Benchmark benchmark;
6204 benchmark.Run();
6205
6206 #ifndef ROCKSDB_LITE
6207 if (FLAGS_print_malloc_stats) {
6208 std::string stats_string;
6209 rocksdb::DumpMallocStats(&stats_string);
6210 fprintf(stdout, "Malloc stats:\n%s\n", stats_string.c_str());
6211 }
6212 #endif // ROCKSDB_LITE
6213
6214 return 0;
6215 }
6216 } // namespace rocksdb
6217 #endif