1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #ifndef __STDC_FORMAT_MACROS
11 #define __STDC_FORMAT_MACROS
27 #include <sys/types.h>
29 #include <condition_variable>
34 #include <unordered_map>
36 #include "db/db_impl.h"
37 #include "db/malloc_stats.h"
38 #include "db/version_set.h"
39 #include "hdfs/env_hdfs.h"
40 #include "monitoring/histogram.h"
41 #include "monitoring/statistics.h"
42 #include "options/cf_options.h"
43 #include "port/port.h"
44 #include "port/stack_trace.h"
45 #include "rocksdb/cache.h"
46 #include "rocksdb/db.h"
47 #include "rocksdb/env.h"
48 #include "rocksdb/filter_policy.h"
49 #include "rocksdb/memtablerep.h"
50 #include "rocksdb/options.h"
51 #include "rocksdb/perf_context.h"
52 #include "rocksdb/persistent_cache.h"
53 #include "rocksdb/rate_limiter.h"
54 #include "rocksdb/slice.h"
55 #include "rocksdb/slice_transform.h"
56 #include "rocksdb/utilities/object_registry.h"
57 #include "rocksdb/utilities/optimistic_transaction_db.h"
58 #include "rocksdb/utilities/options_util.h"
59 #include "rocksdb/utilities/sim_cache.h"
60 #include "rocksdb/utilities/transaction.h"
61 #include "rocksdb/utilities/transaction_db.h"
62 #include "rocksdb/write_batch.h"
63 #include "util/cast_util.h"
64 #include "util/compression.h"
65 #include "util/crc32c.h"
66 #include "util/gflags_compat.h"
67 #include "util/mutexlock.h"
68 #include "util/random.h"
69 #include "util/stderr_logger.h"
70 #include "util/string_util.h"
71 #include "util/testutil.h"
72 #include "util/transaction_test_util.h"
73 #include "util/xxhash.h"
74 #include "utilities/blob_db/blob_db.h"
75 #include "utilities/merge_operators.h"
76 #include "utilities/merge_operators/bytesxor.h"
77 #include "utilities/persistent_cache/block_cache_tier.h"
80 #include <io.h> // open/close
83 using GFLAGS_NAMESPACE::ParseCommandLineFlags
;
84 using GFLAGS_NAMESPACE::RegisterFlagValidator
;
85 using GFLAGS_NAMESPACE::SetUsageMessage
;
90 "fillseqdeterministic,"
93 "filluniquerandomdeterministic,"
97 "newiteratorwhilewriting,"
99 "seekrandomwhilewriting,"
100 "seekrandomwhilemerging,"
113 "readrandomwriterandom,"
128 "Comma-separated list of operations to run in the specified"
129 " order. Available benchmarks:\n"
130 "\tfillseq -- write N values in sequential key"
131 " order in async mode\n"
132 "\tfillseqdeterministic -- write N values in the specified"
133 " key order and keep the shape of the LSM tree\n"
134 "\tfillrandom -- write N values in random key order in async"
136 "\tfilluniquerandomdeterministic -- write N values in a random"
137 " key order and keep the shape of the LSM tree\n"
138 "\toverwrite -- overwrite N values in random key order in"
140 "\tfillsync -- write N/100 values in random key order in "
142 "\tfill100K -- write N/1000 100K values in random order in"
144 "\tdeleteseq -- delete N keys in sequential order\n"
145 "\tdeleterandom -- delete N keys in random order\n"
146 "\treadseq -- read N times sequentially\n"
147 "\treadtocache -- 1 thread reading database sequentially\n"
148 "\treadreverse -- read N times in reverse order\n"
149 "\treadrandom -- read N times in random order\n"
150 "\treadmissing -- read N missing keys in random order\n"
151 "\treadwhilewriting -- 1 writer, N threads doing random "
153 "\treadwhilemerging -- 1 merger, N threads doing random "
155 "\treadwhilescanning -- 1 thread doing full table scan, "
156 "N threads doing random reads\n"
157 "\treadrandomwriterandom -- N threads doing random-read, "
159 "\tupdaterandom -- N threads doing read-modify-write for random "
161 "\txorupdaterandom -- N threads doing read-XOR-write for "
163 "\tappendrandom -- N threads doing read-modify-write with "
165 "\tmergerandom -- same as updaterandom/appendrandom using merge"
167 "Must be used with merge_operator\n"
168 "\treadrandommergerandom -- perform N random read-or-merge "
169 "operations. Must be used with merge_operator\n"
170 "\tnewiterator -- repeated iterator creation\n"
171 "\tseekrandom -- N random seeks, call Next seek_nexts times "
173 "\tseekrandomwhilewriting -- seekrandom and 1 thread doing "
175 "\tseekrandomwhilemerging -- seekrandom and 1 thread doing "
177 "\tcrc32c -- repeated crc32c of 4K of data\n"
178 "\txxhash -- repeated xxHash of 4K of data\n"
179 "\tacquireload -- load N*1000 times\n"
180 "\tfillseekseq -- write N values in sequential key, then read "
181 "them by seeking to each key\n"
182 "\trandomtransaction -- execute N random transactions and "
183 "verify correctness\n"
184 "\trandomreplacekeys -- randomly replaces N keys by deleting "
185 "the old version and putting the new version\n\n"
186 "\ttimeseries -- 1 writer generates time series data "
187 "and multiple readers doing random reads on id\n\n"
189 "\tcompact -- Compact the entire DB; If multiple, randomly choose one\n"
190 "\tcompactall -- Compact the entire DB\n"
191 "\tstats -- Print DB stats\n"
192 "\tresetstats -- Reset DB stats\n"
193 "\tlevelstats -- Print the number of files and bytes per level\n"
194 "\tsstables -- Print sstable info\n"
195 "\theapprofile -- Dump a heap profile (if supported by this port)\n"
196 "\treplay -- replay the trace file specified with trace_file\n");
198 DEFINE_int64(num
, 1000000, "Number of key/values to place in database");
200 DEFINE_int64(numdistinct
, 1000,
201 "Number of distinct keys to use. Used in RandomWithVerify to "
202 "read/write on fewer keys so that gets are more likely to find the"
203 " key and puts are more likely to update the same key");
205 DEFINE_int64(merge_keys
, -1,
206 "Number of distinct keys to use for MergeRandom and "
207 "ReadRandomMergeRandom. "
208 "If negative, there will be FLAGS_num keys.");
209 DEFINE_int32(num_column_families
, 1, "Number of Column Families to use.");
212 num_hot_column_families
, 0,
213 "Number of Hot Column Families. If more than 0, only write to this "
214 "number of column families. After finishing all the writes to them, "
215 "create new set of column families and insert to them. Only used "
216 "when num_column_families > 1.");
218 DEFINE_string(column_family_distribution
, "",
219 "Comma-separated list of percentages, where the ith element "
220 "indicates the probability of an op using the ith column family. "
221 "The number of elements must be `num_hot_column_families` if "
222 "specified; otherwise, it must be `num_column_families`. The "
223 "sum of elements must be 100. E.g., if `num_column_families=4`, "
224 "and `num_hot_column_families=0`, a valid list could be "
227 DEFINE_int64(reads
, -1, "Number of read operations to do. "
228 "If negative, do FLAGS_num reads.");
230 DEFINE_int64(deletes
, -1, "Number of delete operations to do. "
231 "If negative, do FLAGS_num deletions.");
233 DEFINE_int32(bloom_locality
, 0, "Control bloom filter probes locality");
235 DEFINE_int64(seed
, 0, "Seed base for random number generators. "
236 "When 0 it is deterministic.");
238 DEFINE_int32(threads
, 1, "Number of concurrent threads to run.");
240 DEFINE_int32(duration
, 0, "Time in seconds for the random-ops tests to run."
241 " When 0 then num & reads determine the test duration");
243 DEFINE_int32(value_size
, 100, "Size of each value");
245 DEFINE_int32(seek_nexts
, 0,
246 "How many times to call Next() after Seek() in "
247 "fillseekseq, seekrandom, seekrandomwhilewriting and "
248 "seekrandomwhilemerging");
250 DEFINE_bool(reverse_iterator
, false,
251 "When true use Prev rather than Next for iterators that do "
252 "Seek and then Next");
254 DEFINE_int64(max_scan_distance
, 0,
255 "Used to define iterate_upper_bound (or iterate_lower_bound "
256 "if FLAGS_reverse_iterator is set to true) when value is nonzero");
258 DEFINE_bool(use_uint64_comparator
, false, "use Uint64 user comparator");
260 DEFINE_int64(batch_size
, 1, "Batch size");
262 static bool ValidateKeySize(const char* /*flagname*/, int32_t /*value*/) {
266 static bool ValidateUint32Range(const char* flagname
, uint64_t value
) {
267 if (value
> std::numeric_limits
<uint32_t>::max()) {
268 fprintf(stderr
, "Invalid value for --%s: %lu, overflow\n", flagname
,
269 (unsigned long)value
);
275 DEFINE_int32(key_size
, 16, "size of each key");
277 DEFINE_int32(num_multi_db
, 0,
278 "Number of DBs used in the benchmark. 0 means single DB.");
280 DEFINE_double(compression_ratio
, 0.5, "Arrange to generate values that shrink"
281 " to this fraction of their original size after compression");
283 DEFINE_double(read_random_exp_range
, 0.0,
284 "Read random's key will be generated using distribution of "
285 "num * exp(-r) where r is uniform number from 0 to this value. "
286 "The larger the number is, the more skewed the reads are. "
287 "Only used in readrandom and multireadrandom benchmarks.");
289 DEFINE_bool(histogram
, false, "Print histogram of operation timings");
291 DEFINE_bool(enable_numa
, false,
292 "Make operations aware of NUMA architecture and bind memory "
293 "and cpus corresponding to nodes together. In NUMA, memory "
294 "in same node as CPUs are closer when compared to memory in "
295 "other nodes. Reads can be faster when the process is bound to "
296 "CPU and memory of same node. Use \"$numactl --hardware\" command "
297 "to see NUMA memory architecture.");
299 DEFINE_int64(db_write_buffer_size
, rocksdb::Options().db_write_buffer_size
,
300 "Number of bytes to buffer in all memtables before compacting");
302 DEFINE_bool(cost_write_buffer_to_cache
, false,
303 "The usage of memtable is costed to the block cache");
305 DEFINE_int64(write_buffer_size
, rocksdb::Options().write_buffer_size
,
306 "Number of bytes to buffer in memtable before compacting");
308 DEFINE_int32(max_write_buffer_number
,
309 rocksdb::Options().max_write_buffer_number
,
310 "The number of in-memory memtables. Each memtable is of size"
311 " write_buffer_size bytes.");
313 DEFINE_int32(min_write_buffer_number_to_merge
,
314 rocksdb::Options().min_write_buffer_number_to_merge
,
315 "The minimum number of write buffers that will be merged together"
316 "before writing to storage. This is cheap because it is an"
317 "in-memory merge. If this feature is not enabled, then all these"
318 "write buffers are flushed to L0 as separate files and this "
319 "increases read amplification because a get request has to check"
320 " in all of these files. Also, an in-memory merge may result in"
321 " writing less data to storage if there are duplicate records "
322 " in each of these individual write buffers.");
324 DEFINE_int32(max_write_buffer_number_to_maintain
,
325 rocksdb::Options().max_write_buffer_number_to_maintain
,
326 "The total maximum number of write buffers to maintain in memory "
327 "including copies of buffers that have already been flushed. "
328 "Unlike max_write_buffer_number, this parameter does not affect "
329 "flushing. This controls the minimum amount of write history "
330 "that will be available in memory for conflict checking when "
331 "Transactions are used. If this value is too low, some "
332 "transactions may fail at commit time due to not being able to "
333 "determine whether there were any write conflicts. Setting this "
334 "value to 0 will cause write buffers to be freed immediately "
335 "after they are flushed. If this value is set to -1, "
336 "'max_write_buffer_number' will be used.");
338 DEFINE_int32(max_background_jobs
,
339 rocksdb::Options().max_background_jobs
,
340 "The maximum number of concurrent background jobs that can occur "
343 DEFINE_int32(num_bottom_pri_threads
, 0,
344 "The number of threads in the bottom-priority thread pool (used "
345 "by universal compaction only).");
347 DEFINE_int32(num_high_pri_threads
, 0,
348 "The maximum number of concurrent background compactions"
349 " that can occur in parallel.");
351 DEFINE_int32(num_low_pri_threads
, 0,
352 "The maximum number of concurrent background compactions"
353 " that can occur in parallel.");
355 DEFINE_int32(max_background_compactions
,
356 rocksdb::Options().max_background_compactions
,
357 "The maximum number of concurrent background compactions"
358 " that can occur in parallel.");
360 DEFINE_int32(base_background_compactions
, -1, "DEPRECATED");
362 DEFINE_uint64(subcompactions
, 1,
363 "Maximum number of subcompactions to divide L0-L1 compactions "
365 static const bool FLAGS_subcompactions_dummy
366 __attribute__((__unused__
)) = RegisterFlagValidator(&FLAGS_subcompactions
,
367 &ValidateUint32Range
);
369 DEFINE_int32(max_background_flushes
,
370 rocksdb::Options().max_background_flushes
,
371 "The maximum number of concurrent background flushes"
372 " that can occur in parallel.");
374 static rocksdb::CompactionStyle FLAGS_compaction_style_e
;
375 DEFINE_int32(compaction_style
, (int32_t) rocksdb::Options().compaction_style
,
376 "style of compaction: level-based, universal and fifo");
378 static rocksdb::CompactionPri FLAGS_compaction_pri_e
;
379 DEFINE_int32(compaction_pri
, (int32_t)rocksdb::Options().compaction_pri
,
380 "priority of files to compaction: by size or by data age");
382 DEFINE_int32(universal_size_ratio
, 0,
383 "Percentage flexibility while comparing file size"
384 " (for universal compaction only).");
386 DEFINE_int32(universal_min_merge_width
, 0, "The minimum number of files in a"
387 " single compaction run (for universal compaction only).");
389 DEFINE_int32(universal_max_merge_width
, 0, "The max number of files to compact"
390 " in universal style compaction");
392 DEFINE_int32(universal_max_size_amplification_percent
, 0,
393 "The max size amplification for universal style compaction");
395 DEFINE_int32(universal_compression_size_percent
, -1,
396 "The percentage of the database to compress for universal "
397 "compaction. -1 means compress everything.");
399 DEFINE_bool(universal_allow_trivial_move
, false,
400 "Allow trivial move in universal compaction.");
402 DEFINE_int64(cache_size
, 8 << 20, // 8MB
403 "Number of bytes to use as a cache of uncompressed data");
405 DEFINE_int32(cache_numshardbits
, 6,
406 "Number of shards for the block cache"
407 " is 2 ** cache_numshardbits. Negative means use default settings."
408 " This is applied only if FLAGS_cache_size is non-negative.");
410 DEFINE_double(cache_high_pri_pool_ratio
, 0.0,
411 "Ratio of block cache reserve for high pri blocks. "
412 "If > 0.0, we also enable "
413 "cache_index_and_filter_blocks_with_high_priority.");
415 DEFINE_bool(use_clock_cache
, false,
416 "Replace default LRU block cache with clock cache.");
418 DEFINE_int64(simcache_size
, -1,
419 "Number of bytes to use as a simcache of "
420 "uncompressed data. Nagative value disables simcache.");
422 DEFINE_bool(cache_index_and_filter_blocks
, false,
423 "Cache index/filter blocks in block cache.");
425 DEFINE_bool(partition_index_and_filters
, false,
426 "Partition index and filter blocks.");
428 DEFINE_bool(partition_index
, false, "Partition index blocks");
430 DEFINE_int64(metadata_block_size
,
431 rocksdb::BlockBasedTableOptions().metadata_block_size
,
432 "Max partition size when partitioning index/filters");
434 // The default reduces the overhead of reading time with flash. With HDD, which
435 // offers much less throughput, however, this number better to be set to 1.
436 DEFINE_int32(ops_between_duration_checks
, 1000,
437 "Check duration limit every x ops");
439 DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache
, false,
440 "Pin index/filter blocks of L0 files in block cache.");
443 pin_top_level_index_and_filter
, false,
444 "Pin top-level index of partitioned index/filter blocks in block cache.");
446 DEFINE_int32(block_size
,
447 static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size
),
448 "Number of bytes in a block.");
452 static_cast<int32_t>(rocksdb::BlockBasedTableOptions().format_version
),
453 "Format version of SST files.");
455 DEFINE_int32(block_restart_interval
,
456 rocksdb::BlockBasedTableOptions().block_restart_interval
,
457 "Number of keys between restart points "
458 "for delta encoding of keys in data block.");
460 DEFINE_int32(index_block_restart_interval
,
461 rocksdb::BlockBasedTableOptions().index_block_restart_interval
,
462 "Number of keys between restart points "
463 "for delta encoding of keys in index block.");
465 DEFINE_int32(read_amp_bytes_per_bit
,
466 rocksdb::BlockBasedTableOptions().read_amp_bytes_per_bit
,
467 "Number of bytes per bit to be used in block read-amp bitmap");
469 DEFINE_bool(enable_index_compression
,
470 rocksdb::BlockBasedTableOptions().enable_index_compression
,
471 "Compress the index block");
473 DEFINE_bool(block_align
, rocksdb::BlockBasedTableOptions().block_align
,
474 "Align data blocks on page size");
476 DEFINE_bool(use_data_block_hash_index
, false,
477 "if use kDataBlockBinaryAndHash "
478 "instead of kDataBlockBinarySearch. "
479 "This is valid if only we use BlockTable");
481 DEFINE_double(data_block_hash_table_util_ratio
, 0.75,
482 "util ratio for data block hash index table. "
483 "This is only valid if use_data_block_hash_index is "
486 DEFINE_int64(compressed_cache_size
, -1,
487 "Number of bytes to use as a cache of compressed data.");
489 DEFINE_int64(row_cache_size
, 0,
490 "Number of bytes to use as a cache of individual rows"
493 DEFINE_int32(open_files
, rocksdb::Options().max_open_files
,
494 "Maximum number of files to keep open at the same time"
495 " (use default if == 0)");
497 DEFINE_int32(file_opening_threads
, rocksdb::Options().max_file_opening_threads
,
498 "If open_files is set to -1, this option set the number of "
499 "threads that will be used to open files during DB::Open()");
501 DEFINE_bool(new_table_reader_for_compaction_inputs
, true,
502 "If true, uses a separate file handle for compaction inputs");
504 DEFINE_int32(compaction_readahead_size
, 0, "Compaction readahead size");
506 DEFINE_int32(random_access_max_buffer_size
, 1024 * 1024,
507 "Maximum windows randomaccess buffer size");
509 DEFINE_int32(writable_file_max_buffer_size
, 1024 * 1024,
510 "Maximum write buffer for Writable File");
512 DEFINE_int32(bloom_bits
, -1, "Bloom filter bits per key. Negative means"
513 " use default settings.");
514 DEFINE_double(memtable_bloom_size_ratio
, 0,
515 "Ratio of memtable size used for bloom filter. 0 means no bloom "
517 DEFINE_bool(memtable_whole_key_filtering
, false,
518 "Try to use whole key bloom filter in memtables.");
519 DEFINE_bool(memtable_use_huge_page
, false,
520 "Try to use huge page in memtables.");
522 DEFINE_bool(use_existing_db
, false, "If true, do not destroy the existing"
523 " database. If you set this flag and also specify a benchmark that"
524 " wants a fresh database, that benchmark will fail.");
526 DEFINE_bool(use_existing_keys
, false,
527 "If true, uses existing keys in the DB, "
528 "rather than generating new ones. This involves some startup "
529 "latency to load all keys into memory. It is supported for the "
530 "same read/overwrite benchmarks as `-use_existing_db=true`, which "
531 "must also be set for this flag to be enabled. When this flag is "
532 "set, the value for `-num` will be ignored.");
534 DEFINE_bool(show_table_properties
, false,
535 "If true, then per-level table"
536 " properties will be printed on every stats-interval when"
537 " stats_interval is set and stats_per_interval is on.");
539 DEFINE_string(db
, "", "Use the db with the following name.");
543 DEFINE_string(read_cache_path
, "",
544 "If not empty string, a read cache will be used in this path");
546 DEFINE_int64(read_cache_size
, 4LL * 1024 * 1024 * 1024,
547 "Maximum size of the read cache");
549 DEFINE_bool(read_cache_direct_write
, true,
550 "Whether to use Direct IO for writing to the read cache");
552 DEFINE_bool(read_cache_direct_read
, true,
553 "Whether to use Direct IO for reading from read cache");
555 DEFINE_bool(use_keep_filter
, false, "Whether to use a noop compaction filter");
557 static bool ValidateCacheNumshardbits(const char* flagname
, int32_t value
) {
559 fprintf(stderr
, "Invalid value for --%s: %d, must be < 20\n",
566 DEFINE_bool(verify_checksum
, true,
567 "Verify checksum for every block read"
570 DEFINE_bool(statistics
, false, "Database statistics");
571 DEFINE_int32(stats_level
, rocksdb::StatsLevel::kExceptDetailedTimers
,
572 "stats level for statistics");
573 DEFINE_string(statistics_string
, "", "Serialized statistics string");
574 static class std::shared_ptr
<rocksdb::Statistics
> dbstats
;
576 DEFINE_int64(writes
, -1, "Number of write operations to do. If negative, do"
579 DEFINE_bool(finish_after_writes
, false, "Write thread terminates after all writes are finished");
581 DEFINE_bool(sync
, false, "Sync all writes to disk");
583 DEFINE_bool(use_fsync
, false, "If true, issue fsync instead of fdatasync");
585 DEFINE_bool(disable_wal
, false, "If true, do not write WAL for write.");
587 DEFINE_string(wal_dir
, "", "If not empty, use the given dir for WAL");
589 DEFINE_string(truth_db
, "/dev/shm/truth_db/dbbench",
590 "Truth key/values used when using verify");
592 DEFINE_int32(num_levels
, 7, "The total number of levels");
594 DEFINE_int64(target_file_size_base
, rocksdb::Options().target_file_size_base
,
595 "Target file size at level-1");
597 DEFINE_int32(target_file_size_multiplier
,
598 rocksdb::Options().target_file_size_multiplier
,
599 "A multiplier to compute target level-N file size (N >= 2)");
601 DEFINE_uint64(max_bytes_for_level_base
,
602 rocksdb::Options().max_bytes_for_level_base
,
603 "Max bytes for level-1");
605 DEFINE_bool(level_compaction_dynamic_level_bytes
, false,
606 "Whether level size base is dynamic");
608 DEFINE_double(max_bytes_for_level_multiplier
, 10,
609 "A multiplier to compute max bytes for level-N (N >= 2)");
611 static std::vector
<int> FLAGS_max_bytes_for_level_multiplier_additional_v
;
612 DEFINE_string(max_bytes_for_level_multiplier_additional
, "",
613 "A vector that specifies additional fanout per level");
615 DEFINE_int32(level0_stop_writes_trigger
,
616 rocksdb::Options().level0_stop_writes_trigger
,
617 "Number of files in level-0"
618 " that will trigger put stop.");
620 DEFINE_int32(level0_slowdown_writes_trigger
,
621 rocksdb::Options().level0_slowdown_writes_trigger
,
622 "Number of files in level-0"
623 " that will slow down writes.");
625 DEFINE_int32(level0_file_num_compaction_trigger
,
626 rocksdb::Options().level0_file_num_compaction_trigger
,
627 "Number of files in level-0"
628 " when compactions start");
630 static bool ValidateInt32Percent(const char* flagname
, int32_t value
) {
631 if (value
<= 0 || value
>=100) {
632 fprintf(stderr
, "Invalid value for --%s: %d, 0< pct <100 \n",
638 DEFINE_int32(readwritepercent
, 90, "Ratio of reads to reads/writes (expressed"
639 " as percentage) for the ReadRandomWriteRandom workload. The "
640 "default value 90 means 90% operations out of all reads and writes"
641 " operations are reads. In other words, 9 gets for every 1 put.");
643 DEFINE_int32(mergereadpercent
, 70, "Ratio of merges to merges&reads (expressed"
644 " as percentage) for the ReadRandomMergeRandom workload. The"
645 " default value 70 means 70% out of all read and merge operations"
646 " are merges. In other words, 7 merges for every 3 gets.");
648 DEFINE_int32(deletepercent
, 2, "Percentage of deletes out of reads/writes/"
649 "deletes (used in RandomWithVerify only). RandomWithVerify "
650 "calculates writepercent as (100 - FLAGS_readwritepercent - "
651 "deletepercent), so deletepercent must be smaller than (100 - "
652 "FLAGS_readwritepercent)");
654 DEFINE_bool(optimize_filters_for_hits
, false,
655 "Optimizes bloom filters for workloads for most lookups return "
656 "a value. For now this doesn't create bloom filters for the max "
657 "level of the LSM to reduce metadata that should fit in RAM. ");
659 DEFINE_uint64(delete_obsolete_files_period_micros
, 0,
660 "Ignored. Left here for backward compatibility");
662 DEFINE_int64(writes_before_delete_range
, 0,
663 "Number of writes before DeleteRange is called regularly.");
665 DEFINE_int64(writes_per_range_tombstone
, 0,
666 "Number of writes between range tombstones");
668 DEFINE_int64(range_tombstone_width
, 100, "Number of keys in tombstone's range");
670 DEFINE_int64(max_num_range_tombstones
, 0,
671 "Maximum number of range tombstones "
674 DEFINE_bool(expand_range_tombstones
, false,
675 "Expand range tombstone into sequential regular tombstones.");
678 // Transactions Options
679 DEFINE_bool(optimistic_transaction_db
, false,
680 "Open a OptimisticTransactionDB instance. "
681 "Required for randomtransaction benchmark.");
683 DEFINE_bool(transaction_db
, false,
684 "Open a TransactionDB instance. "
685 "Required for randomtransaction benchmark.");
687 DEFINE_uint64(transaction_sets
, 2,
688 "Number of keys each transaction will "
689 "modify (use in RandomTransaction only). Max: 9999");
691 DEFINE_bool(transaction_set_snapshot
, false,
692 "Setting to true will have each transaction call SetSnapshot()"
695 DEFINE_int32(transaction_sleep
, 0,
696 "Max microseconds to sleep in between "
697 "reading and writing a value (used in RandomTransaction only). ");
699 DEFINE_uint64(transaction_lock_timeout
, 100,
700 "If using a transaction_db, specifies the lock wait timeout in"
701 " milliseconds before failing a transaction waiting on a lock");
704 "The path to a RocksDB options file. If specified, then db_bench will "
705 "run with the RocksDB options in the default column family of the "
706 "specified options file. "
707 "Note that with this setting, db_bench will ONLY accept the following "
708 "RocksDB options related command-line arguments, all other arguments "
709 "that are related to RocksDB options will be ignored:\n"
710 "\t--use_existing_db\n"
711 "\t--use_existing_keys\n"
713 "\t--row_cache_size\n"
714 "\t--row_cache_numshardbits\n"
715 "\t--enable_io_prio\n"
716 "\t--dump_malloc_stats\n"
717 "\t--num_multi_db\n");
719 // FIFO Compaction Options
720 DEFINE_uint64(fifo_compaction_max_table_files_size_mb
, 0,
721 "The limit of total table file sizes to trigger FIFO compaction");
723 DEFINE_bool(fifo_compaction_allow_compaction
, true,
724 "Allow compaction in FIFO compaction.");
726 DEFINE_uint64(fifo_compaction_ttl
, 0, "TTL for the SST Files in seconds.");
729 DEFINE_bool(use_blob_db
, false,
730 "Open a BlobDB instance. "
731 "Required for large value benchmark.");
733 DEFINE_bool(blob_db_enable_gc
, false, "Enable BlobDB garbage collection.");
735 DEFINE_bool(blob_db_is_fifo
, false, "Enable FIFO eviction strategy in BlobDB.");
737 DEFINE_uint64(blob_db_max_db_size
, 0,
738 "Max size limit of the directory where blob files are stored.");
740 DEFINE_uint64(blob_db_max_ttl_range
, 86400,
741 "TTL range to generate BlobDB data (in seconds).");
743 DEFINE_uint64(blob_db_ttl_range_secs
, 3600,
744 "TTL bucket size to use when creating blob files.");
746 DEFINE_uint64(blob_db_min_blob_size
, 0,
747 "Smallest blob to store in a file. Blobs smaller than this "
748 "will be inlined with the key in the LSM tree.");
750 DEFINE_uint64(blob_db_bytes_per_sync
, 0, "Bytes to sync blob file at.");
752 DEFINE_uint64(blob_db_file_size
, 256 * 1024 * 1024,
753 "Target size of each blob file.");
755 #endif // ROCKSDB_LITE
757 DEFINE_bool(report_bg_io_stats
, false,
758 "Measure times spents on I/Os while in compactions. ");
760 DEFINE_bool(use_stderr_info_logger
, false,
761 "Write info logs to stderr instead of to LOG file. ");
763 DEFINE_string(trace_file
, "", "Trace workload to a file. ");
765 static enum rocksdb::CompressionType
StringToCompressionType(const char* ctype
) {
768 if (!strcasecmp(ctype
, "none"))
769 return rocksdb::kNoCompression
;
770 else if (!strcasecmp(ctype
, "snappy"))
771 return rocksdb::kSnappyCompression
;
772 else if (!strcasecmp(ctype
, "zlib"))
773 return rocksdb::kZlibCompression
;
774 else if (!strcasecmp(ctype
, "bzip2"))
775 return rocksdb::kBZip2Compression
;
776 else if (!strcasecmp(ctype
, "lz4"))
777 return rocksdb::kLZ4Compression
;
778 else if (!strcasecmp(ctype
, "lz4hc"))
779 return rocksdb::kLZ4HCCompression
;
780 else if (!strcasecmp(ctype
, "xpress"))
781 return rocksdb::kXpressCompression
;
782 else if (!strcasecmp(ctype
, "zstd"))
783 return rocksdb::kZSTD
;
785 fprintf(stdout
, "Cannot parse compression type '%s'\n", ctype
);
786 return rocksdb::kSnappyCompression
; // default value
789 static std::string
ColumnFamilyName(size_t i
) {
791 return rocksdb::kDefaultColumnFamilyName
;
794 snprintf(name
, sizeof(name
), "column_family_name_%06zu", i
);
795 return std::string(name
);
799 DEFINE_string(compression_type
, "snappy",
800 "Algorithm to use to compress the database");
801 static enum rocksdb::CompressionType FLAGS_compression_type_e
=
802 rocksdb::kSnappyCompression
;
804 DEFINE_int64(sample_for_compression
, 0, "Sample every N block for compression");
806 DEFINE_int32(compression_level
, rocksdb::CompressionOptions().level
,
807 "Compression level. The meaning of this value is library-"
808 "dependent. If unset, we try to use the default for the library "
809 "specified in `--compression_type`");
811 DEFINE_int32(compression_max_dict_bytes
,
812 rocksdb::CompressionOptions().max_dict_bytes
,
813 "Maximum size of dictionary used to prime the compression "
816 DEFINE_int32(compression_zstd_max_train_bytes
,
817 rocksdb::CompressionOptions().zstd_max_train_bytes
,
818 "Maximum size of training data passed to zstd's dictionary "
821 DEFINE_int32(min_level_to_compress
, -1, "If non-negative, compression starts"
822 " from this level. Levels with number < min_level_to_compress are"
823 " not compressed. Otherwise, apply compression_type to "
826 static bool ValidateTableCacheNumshardbits(const char* flagname
,
828 if (0 >= value
|| value
> 20) {
829 fprintf(stderr
, "Invalid value for --%s: %d, must be 0 < val <= 20\n",
835 DEFINE_int32(table_cache_numshardbits
, 4, "");
838 DEFINE_string(env_uri
, "", "URI for registry Env lookup. Mutually exclusive"
840 #endif // ROCKSDB_LITE
841 DEFINE_string(hdfs
, "", "Name of hdfs environment. Mutually exclusive with"
843 static rocksdb::Env
* FLAGS_env
= rocksdb::Env::Default();
845 DEFINE_int64(stats_interval
, 0, "Stats are reported every N operations when "
846 "this is greater than zero. When 0 the interval grows over time.");
848 DEFINE_int64(stats_interval_seconds
, 0, "Report stats every N seconds. This "
849 "overrides stats_interval when both are > 0.");
851 DEFINE_int32(stats_per_interval
, 0, "Reports additional stats per interval when"
852 " this is greater than 0.");
854 DEFINE_int64(report_interval_seconds
, 0,
855 "If greater than zero, it will write simple stats in CVS format "
856 "to --report_file every N seconds");
858 DEFINE_string(report_file
, "report.csv",
859 "Filename where some simple stats are reported to (if "
860 "--report_interval_seconds is bigger than 0)");
862 DEFINE_int32(thread_status_per_interval
, 0,
863 "Takes and report a snapshot of the current status of each thread"
864 " when this is greater than 0.");
866 DEFINE_int32(perf_level
, rocksdb::PerfLevel::kDisable
, "Level of perf collection");
868 static bool ValidateRateLimit(const char* flagname
, double value
) {
869 const double EPSILON
= 1e-10;
870 if ( value
< -EPSILON
) {
871 fprintf(stderr
, "Invalid value for --%s: %12.6f, must be >= 0.0\n",
877 DEFINE_double(soft_rate_limit
, 0.0, "DEPRECATED");
879 DEFINE_double(hard_rate_limit
, 0.0, "DEPRECATED");
881 DEFINE_uint64(soft_pending_compaction_bytes_limit
, 64ull * 1024 * 1024 * 1024,
882 "Slowdown writes if pending compaction bytes exceed this number");
884 DEFINE_uint64(hard_pending_compaction_bytes_limit
, 128ull * 1024 * 1024 * 1024,
885 "Stop writes if pending compaction bytes exceed this number");
887 DEFINE_uint64(delayed_write_rate
, 8388608u,
888 "Limited bytes allowed to DB when soft_rate_limit or "
889 "level0_slowdown_writes_trigger triggers");
891 DEFINE_bool(enable_pipelined_write
, true,
892 "Allow WAL and memtable writes to be pipelined");
894 DEFINE_bool(allow_concurrent_memtable_write
, true,
895 "Allow multi-writers to update mem tables in parallel.");
897 DEFINE_bool(inplace_update_support
, rocksdb::Options().inplace_update_support
,
898 "Support in-place memtable update for smaller or same-size values");
900 DEFINE_uint64(inplace_update_num_locks
,
901 rocksdb::Options().inplace_update_num_locks
,
902 "Number of RW locks to protect in-place memtable updates");
904 DEFINE_bool(enable_write_thread_adaptive_yield
, true,
905 "Use a yielding spin loop for brief writer thread waits.");
908 write_thread_max_yield_usec
, 100,
909 "Maximum microseconds for enable_write_thread_adaptive_yield operation.");
911 DEFINE_uint64(write_thread_slow_yield_usec
, 3,
912 "The threshold at which a slow yield is considered a signal that "
913 "other processes or threads want the core.");
915 DEFINE_int32(rate_limit_delay_max_milliseconds
, 1000,
916 "When hard_rate_limit is set then this is the max time a put will"
919 DEFINE_uint64(rate_limiter_bytes_per_sec
, 0, "Set options.rate_limiter value.");
921 DEFINE_bool(rate_limiter_auto_tuned
, false,
922 "Enable dynamic adjustment of rate limit according to demand for "
926 DEFINE_bool(sine_write_rate
, false,
927 "Use a sine wave write_rate_limit");
929 DEFINE_uint64(sine_write_rate_interval_milliseconds
, 10000,
930 "Interval of which the sine wave write_rate_limit is recalculated");
932 DEFINE_double(sine_a
, 1,
933 "A in f(x) = A sin(bx + c) + d");
935 DEFINE_double(sine_b
, 1,
936 "B in f(x) = A sin(bx + c) + d");
938 DEFINE_double(sine_c
, 0,
939 "C in f(x) = A sin(bx + c) + d");
941 DEFINE_double(sine_d
, 1,
942 "D in f(x) = A sin(bx + c) + d");
944 DEFINE_bool(rate_limit_bg_reads
, false,
945 "Use options.rate_limiter on compaction reads");
948 benchmark_write_rate_limit
, 0,
949 "If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
950 "is the global rate in bytes/second.");
952 // the parameters of mix_graph
953 DEFINE_double(key_dist_a
, 0.0,
954 "The parameter 'a' of key access distribution model "
956 DEFINE_double(key_dist_b
, 0.0,
957 "The parameter 'b' of key access distribution model "
959 DEFINE_double(value_theta
, 0.0,
960 "The parameter 'theta' of Generized Pareto Distribution "
961 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
962 DEFINE_double(value_k
, 0.0,
963 "The parameter 'k' of Generized Pareto Distribution "
964 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
965 DEFINE_double(value_sigma
, 0.0,
966 "The parameter 'theta' of Generized Pareto Distribution "
967 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
968 DEFINE_double(iter_theta
, 0.0,
969 "The parameter 'theta' of Generized Pareto Distribution "
970 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
971 DEFINE_double(iter_k
, 0.0,
972 "The parameter 'k' of Generized Pareto Distribution "
973 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
974 DEFINE_double(iter_sigma
, 0.0,
975 "The parameter 'sigma' of Generized Pareto Distribution "
976 "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
977 DEFINE_double(mix_get_ratio
, 1.0,
978 "The ratio of Get queries of mix_graph workload");
979 DEFINE_double(mix_put_ratio
, 0.0,
980 "The ratio of Put queries of mix_graph workload");
981 DEFINE_double(mix_seek_ratio
, 0.0,
982 "The ratio of Seek queries of mix_graph workload");
983 DEFINE_int64(mix_max_scan_len
, 10000, "The max scan length of Iterator");
984 DEFINE_int64(mix_ave_kv_size
, 512,
985 "The average key-value size of this workload");
986 DEFINE_int64(mix_max_value_size
, 1024, "The max value size of this workload");
988 sine_mix_rate_noise
, 0.0,
989 "Add the noise ratio to the sine rate, it is between 0.0 and 1.0");
990 DEFINE_bool(sine_mix_rate
, false,
991 "Enable the sine QPS control on the mix workload");
993 sine_mix_rate_interval_milliseconds
, 10000,
994 "Interval of which the sine wave read_rate_limit is recalculated");
995 DEFINE_int64(mix_accesses
, -1,
996 "The total query accesses of mix_graph workload");
999 benchmark_read_rate_limit
, 0,
1000 "If non-zero, db_bench will rate-limit the reads from RocksDB. This "
1001 "is the global rate in ops/second.");
1003 DEFINE_uint64(max_compaction_bytes
, rocksdb::Options().max_compaction_bytes
,
1004 "Max bytes allowed in one compaction");
1006 #ifndef ROCKSDB_LITE
1007 DEFINE_bool(readonly
, false, "Run read only benchmarks.");
1009 DEFINE_bool(print_malloc_stats
, false,
1010 "Print malloc stats to stdout after benchmarks finish.");
1011 #endif // ROCKSDB_LITE
1013 DEFINE_bool(disable_auto_compactions
, false, "Do not auto trigger compactions");
1015 DEFINE_uint64(wal_ttl_seconds
, 0, "Set the TTL for the WAL Files in seconds.");
1016 DEFINE_uint64(wal_size_limit_MB
, 0, "Set the size limit for the WAL Files"
1018 DEFINE_uint64(max_total_wal_size
, 0, "Set total max WAL size");
1020 DEFINE_bool(mmap_read
, rocksdb::Options().allow_mmap_reads
,
1021 "Allow reads to occur via mmap-ing files");
1023 DEFINE_bool(mmap_write
, rocksdb::Options().allow_mmap_writes
,
1024 "Allow writes to occur via mmap-ing files");
1026 DEFINE_bool(use_direct_reads
, rocksdb::Options().use_direct_reads
,
1027 "Use O_DIRECT for reading data");
1029 DEFINE_bool(use_direct_io_for_flush_and_compaction
,
1030 rocksdb::Options().use_direct_io_for_flush_and_compaction
,
1031 "Use O_DIRECT for background flush and compaction writes");
1033 DEFINE_bool(advise_random_on_open
, rocksdb::Options().advise_random_on_open
,
1034 "Advise random access on table file open");
1036 DEFINE_string(compaction_fadvice
, "NORMAL",
1037 "Access pattern advice when a file is compacted");
1038 static auto FLAGS_compaction_fadvice_e
=
1039 rocksdb::Options().access_hint_on_compaction_start
;
1041 DEFINE_bool(use_tailing_iterator
, false,
1042 "Use tailing iterator to access a series of keys instead of get");
1044 DEFINE_bool(use_adaptive_mutex
, rocksdb::Options().use_adaptive_mutex
,
1045 "Use adaptive mutex");
1047 DEFINE_uint64(bytes_per_sync
, rocksdb::Options().bytes_per_sync
,
1048 "Allows OS to incrementally sync SST files to disk while they are"
1049 " being written, in the background. Issue one request for every"
1050 " bytes_per_sync written. 0 turns it off.");
1052 DEFINE_uint64(wal_bytes_per_sync
, rocksdb::Options().wal_bytes_per_sync
,
1053 "Allows OS to incrementally sync WAL files to disk while they are"
1054 " being written, in the background. Issue one request for every"
1055 " wal_bytes_per_sync written. 0 turns it off.");
1057 DEFINE_bool(use_single_deletes
, true,
1058 "Use single deletes (used in RandomReplaceKeys only).");
1060 DEFINE_double(stddev
, 2000.0,
1061 "Standard deviation of normal distribution used for picking keys"
1062 " (used in RandomReplaceKeys only).");
1064 DEFINE_int32(key_id_range
, 100000,
1065 "Range of possible value of key id (used in TimeSeries only).");
1067 DEFINE_string(expire_style
, "none",
1068 "Style to remove expired time entries. Can be one of the options "
1069 "below: none (do not expired data), compaction_filter (use a "
1070 "compaction filter to remove expired data), delete (seek IDs and "
1071 "remove expired data) (used in TimeSeries only).");
1075 "Range of timestamp that store in the database (used in TimeSeries"
1078 DEFINE_int32(num_deletion_threads
, 1,
1079 "Number of threads to do deletion (used in TimeSeries and delete "
1080 "expire_style only).");
1082 DEFINE_int32(max_successive_merges
, 0, "Maximum number of successive merge"
1083 " operations on a key in the memtable");
1085 static bool ValidatePrefixSize(const char* flagname
, int32_t value
) {
1086 if (value
< 0 || value
>=2000000000) {
1087 fprintf(stderr
, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
1093 DEFINE_int32(prefix_size
, 0, "control the prefix size for HashSkipList and "
1095 DEFINE_int64(keys_per_prefix
, 0, "control average number of keys generated "
1096 "per prefix, 0 means no special handling of the prefix, "
1097 "i.e. use the prefix comes with the generated random number.");
1098 DEFINE_int32(memtable_insert_with_hint_prefix_size
, 0,
1099 "If non-zero, enable "
1100 "memtable insert with hint with the given prefix size.");
1101 DEFINE_bool(enable_io_prio
, false, "Lower the background flush/compaction "
1102 "threads' IO priority");
1103 DEFINE_bool(enable_cpu_prio
, false, "Lower the background flush/compaction "
1104 "threads' CPU priority");
1105 DEFINE_bool(identity_as_first_hash
, false, "the first hash function of cuckoo "
1106 "table becomes an identity function. This is only valid when key "
1108 DEFINE_bool(dump_malloc_stats
, true, "Dump malloc stats in LOG ");
1109 DEFINE_uint64(stats_dump_period_sec
, rocksdb::Options().stats_dump_period_sec
,
1110 "Gap between printing stats to log in seconds");
1111 DEFINE_uint64(stats_persist_period_sec
,
1112 rocksdb::Options().stats_persist_period_sec
,
1113 "Gap between persisting stats in seconds");
1114 DEFINE_uint64(stats_history_buffer_size
,
1115 rocksdb::Options().stats_history_buffer_size
,
1116 "Max number of stats snapshots to keep in memory");
1125 static enum RepFactory
StringToRepFactory(const char* ctype
) {
1128 if (!strcasecmp(ctype
, "skip_list"))
1130 else if (!strcasecmp(ctype
, "prefix_hash"))
1132 else if (!strcasecmp(ctype
, "vector"))
1134 else if (!strcasecmp(ctype
, "hash_linkedlist"))
1135 return kHashLinkedList
;
1137 fprintf(stdout
, "Cannot parse memreptable %s\n", ctype
);
1141 static enum RepFactory FLAGS_rep_factory
;
1142 DEFINE_string(memtablerep
, "skip_list", "");
1143 DEFINE_int64(hash_bucket_count
, 1024 * 1024, "hash bucket count");
1144 DEFINE_bool(use_plain_table
, false, "if use plain table "
1145 "instead of block-based table format");
1146 DEFINE_bool(use_cuckoo_table
, false, "if use cuckoo table format");
1147 DEFINE_double(cuckoo_hash_ratio
, 0.9, "Hash ratio for Cuckoo SST table.");
1148 DEFINE_bool(use_hash_search
, false, "if use kHashSearch "
1149 "instead of kBinarySearch. "
1150 "This is valid if only we use BlockTable");
1151 DEFINE_bool(use_block_based_filter
, false, "if use kBlockBasedFilter "
1152 "instead of kFullFilter for filter block. "
1153 "This is valid if only we use BlockTable");
1154 DEFINE_string(merge_operator
, "", "The merge operator to use with the database."
1155 "If a new merge operator is specified, be sure to use fresh"
1156 " database The possible merge operators are defined in"
1157 " utilities/merge_operators.h");
1158 DEFINE_int32(skip_list_lookahead
, 0, "Used with skip_list memtablerep; try "
1159 "linear search first for this many steps from the previous "
1161 DEFINE_bool(report_file_operations
, false, "if report number of file "
1164 static const bool FLAGS_soft_rate_limit_dummy
__attribute__((__unused__
)) =
1165 RegisterFlagValidator(&FLAGS_soft_rate_limit
, &ValidateRateLimit
);
1167 static const bool FLAGS_hard_rate_limit_dummy
__attribute__((__unused__
)) =
1168 RegisterFlagValidator(&FLAGS_hard_rate_limit
, &ValidateRateLimit
);
1170 static const bool FLAGS_prefix_size_dummy
__attribute__((__unused__
)) =
1171 RegisterFlagValidator(&FLAGS_prefix_size
, &ValidatePrefixSize
);
1173 static const bool FLAGS_key_size_dummy
__attribute__((__unused__
)) =
1174 RegisterFlagValidator(&FLAGS_key_size
, &ValidateKeySize
);
1176 static const bool FLAGS_cache_numshardbits_dummy
__attribute__((__unused__
)) =
1177 RegisterFlagValidator(&FLAGS_cache_numshardbits
,
1178 &ValidateCacheNumshardbits
);
1180 static const bool FLAGS_readwritepercent_dummy
__attribute__((__unused__
)) =
1181 RegisterFlagValidator(&FLAGS_readwritepercent
, &ValidateInt32Percent
);
1183 DEFINE_int32(disable_seek_compaction
, false,
1184 "Not used, left here for backwards compatibility");
1186 static const bool FLAGS_deletepercent_dummy
__attribute__((__unused__
)) =
1187 RegisterFlagValidator(&FLAGS_deletepercent
, &ValidateInt32Percent
);
1188 static const bool FLAGS_table_cache_numshardbits_dummy
__attribute__((__unused__
)) =
1189 RegisterFlagValidator(&FLAGS_table_cache_numshardbits
,
1190 &ValidateTableCacheNumshardbits
);
1195 struct ReportFileOpCounters
{
1196 std::atomic
<int> open_counter_
;
1197 std::atomic
<int> read_counter_
;
1198 std::atomic
<int> append_counter_
;
1199 std::atomic
<uint64_t> bytes_read_
;
1200 std::atomic
<uint64_t> bytes_written_
;
1203 // A special Env to records and report file operations in db_bench
1204 class ReportFileOpEnv
: public EnvWrapper
{
1206 explicit ReportFileOpEnv(Env
* base
) : EnvWrapper(base
) { reset(); }
1209 counters_
.open_counter_
= 0;
1210 counters_
.read_counter_
= 0;
1211 counters_
.append_counter_
= 0;
1212 counters_
.bytes_read_
= 0;
1213 counters_
.bytes_written_
= 0;
1216 Status
NewSequentialFile(const std::string
& f
,
1217 std::unique_ptr
<SequentialFile
>* r
,
1218 const EnvOptions
& soptions
) override
{
1219 class CountingFile
: public SequentialFile
{
1221 std::unique_ptr
<SequentialFile
> target_
;
1222 ReportFileOpCounters
* counters_
;
1225 CountingFile(std::unique_ptr
<SequentialFile
>&& target
,
1226 ReportFileOpCounters
* counters
)
1227 : target_(std::move(target
)), counters_(counters
) {}
1229 Status
Read(size_t n
, Slice
* result
, char* scratch
) override
{
1230 counters_
->read_counter_
.fetch_add(1, std::memory_order_relaxed
);
1231 Status rv
= target_
->Read(n
, result
, scratch
);
1232 counters_
->bytes_read_
.fetch_add(result
->size(),
1233 std::memory_order_relaxed
);
1237 Status
Skip(uint64_t n
) override
{ return target_
->Skip(n
); }
1240 Status s
= target()->NewSequentialFile(f
, r
, soptions
);
1242 counters()->open_counter_
.fetch_add(1, std::memory_order_relaxed
);
1243 r
->reset(new CountingFile(std::move(*r
), counters()));
1248 Status
NewRandomAccessFile(const std::string
& f
,
1249 std::unique_ptr
<RandomAccessFile
>* r
,
1250 const EnvOptions
& soptions
) override
{
1251 class CountingFile
: public RandomAccessFile
{
1253 std::unique_ptr
<RandomAccessFile
> target_
;
1254 ReportFileOpCounters
* counters_
;
1257 CountingFile(std::unique_ptr
<RandomAccessFile
>&& target
,
1258 ReportFileOpCounters
* counters
)
1259 : target_(std::move(target
)), counters_(counters
) {}
1260 Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
1261 char* scratch
) const override
{
1262 counters_
->read_counter_
.fetch_add(1, std::memory_order_relaxed
);
1263 Status rv
= target_
->Read(offset
, n
, result
, scratch
);
1264 counters_
->bytes_read_
.fetch_add(result
->size(),
1265 std::memory_order_relaxed
);
1270 Status s
= target()->NewRandomAccessFile(f
, r
, soptions
);
1272 counters()->open_counter_
.fetch_add(1, std::memory_order_relaxed
);
1273 r
->reset(new CountingFile(std::move(*r
), counters()));
1278 Status
NewWritableFile(const std::string
& f
, std::unique_ptr
<WritableFile
>* r
,
1279 const EnvOptions
& soptions
) override
{
1280 class CountingFile
: public WritableFile
{
1282 std::unique_ptr
<WritableFile
> target_
;
1283 ReportFileOpCounters
* counters_
;
1286 CountingFile(std::unique_ptr
<WritableFile
>&& target
,
1287 ReportFileOpCounters
* counters
)
1288 : target_(std::move(target
)), counters_(counters
) {}
1290 Status
Append(const Slice
& data
) override
{
1291 counters_
->append_counter_
.fetch_add(1, std::memory_order_relaxed
);
1292 Status rv
= target_
->Append(data
);
1293 counters_
->bytes_written_
.fetch_add(data
.size(),
1294 std::memory_order_relaxed
);
1298 Status
Truncate(uint64_t size
) override
{ return target_
->Truncate(size
); }
1299 Status
Close() override
{ return target_
->Close(); }
1300 Status
Flush() override
{ return target_
->Flush(); }
1301 Status
Sync() override
{ return target_
->Sync(); }
1304 Status s
= target()->NewWritableFile(f
, r
, soptions
);
1306 counters()->open_counter_
.fetch_add(1, std::memory_order_relaxed
);
1307 r
->reset(new CountingFile(std::move(*r
), counters()));
1313 ReportFileOpCounters
* counters() { return &counters_
; }
1316 ReportFileOpCounters counters_
;
1321 // Helper for quickly generating random data.
1322 class RandomGenerator
{
1329 // We use a limited amount of data over and over again and ensure
1330 // that it is larger than the compression window (32KB), and also
1331 // large enough to serve all typical value sizes we want to write.
1334 while (data_
.size() < (unsigned)std::max(1048576, FLAGS_value_size
)) {
1335 // Add a short fragment that is as compressible as specified
1336 // by FLAGS_compression_ratio.
1337 test::CompressibleString(&rnd
, FLAGS_compression_ratio
, 100, &piece
);
1338 data_
.append(piece
);
1343 Slice
Generate(unsigned int len
) {
1344 assert(len
<= data_
.size());
1345 if (pos_
+ len
> data_
.size()) {
1349 return Slice(data_
.data() + pos_
- len
, len
);
1352 Slice
GenerateWithTTL(unsigned int len
) {
1353 assert(len
<= data_
.size());
1354 if (pos_
+ len
> data_
.size()) {
1358 return Slice(data_
.data() + pos_
- len
, len
);
1362 static void AppendWithSpace(std::string
* str
, Slice msg
) {
1363 if (msg
.empty()) return;
1364 if (!str
->empty()) {
1365 str
->push_back(' ');
1367 str
->append(msg
.data(), msg
.size());
1370 struct DBWithColumnFamilies
{
1371 std::vector
<ColumnFamilyHandle
*> cfh
;
1373 #ifndef ROCKSDB_LITE
1374 OptimisticTransactionDB
* opt_txn_db
;
1375 #endif // ROCKSDB_LITE
1376 std::atomic
<size_t> num_created
; // Need to be updated after all the
1377 // new entries in cfh are set.
1378 size_t num_hot
; // Number of column families to be queried at each moment.
1379 // After each CreateNewCf(), another num_hot number of new
1380 // Column families will be created and used to be queried.
1381 port::Mutex create_cf_mutex
; // Only one thread can execute CreateNewCf()
1382 std::vector
<int> cfh_idx_to_prob
; // ith index holds probability of operating
1385 DBWithColumnFamilies()
1387 #ifndef ROCKSDB_LITE
1388 , opt_txn_db(nullptr)
1389 #endif // ROCKSDB_LITE
1396 DBWithColumnFamilies(const DBWithColumnFamilies
& other
)
1399 #ifndef ROCKSDB_LITE
1400 opt_txn_db(other
.opt_txn_db
),
1401 #endif // ROCKSDB_LITE
1402 num_created(other
.num_created
.load()),
1403 num_hot(other
.num_hot
),
1404 cfh_idx_to_prob(other
.cfh_idx_to_prob
) {
1408 std::for_each(cfh
.begin(), cfh
.end(),
1409 [](ColumnFamilyHandle
* cfhi
) { delete cfhi
; });
1411 #ifndef ROCKSDB_LITE
1414 opt_txn_db
= nullptr;
1422 #endif // ROCKSDB_LITE
1425 ColumnFamilyHandle
* GetCfh(int64_t rand_num
) {
1426 assert(num_hot
> 0);
1427 size_t rand_offset
= 0;
1428 if (!cfh_idx_to_prob
.empty()) {
1429 assert(cfh_idx_to_prob
.size() == num_hot
);
1431 while (sum
+ cfh_idx_to_prob
[rand_offset
] < rand_num
% 100) {
1432 sum
+= cfh_idx_to_prob
[rand_offset
];
1435 assert(rand_offset
< cfh_idx_to_prob
.size());
1437 rand_offset
= rand_num
% num_hot
;
1439 return cfh
[num_created
.load(std::memory_order_acquire
) - num_hot
+
1443 // stage: assume CF from 0 to stage * num_hot has be created. Need to create
1444 // stage * num_hot + 1 to stage * (num_hot + 1).
1445 void CreateNewCf(ColumnFamilyOptions options
, int64_t stage
) {
1446 MutexLock
l(&create_cf_mutex
);
1447 if ((stage
+ 1) * num_hot
<= num_created
) {
1451 auto new_num_created
= num_created
+ num_hot
;
1452 assert(new_num_created
<= cfh
.size());
1453 for (size_t i
= num_created
; i
< new_num_created
; i
++) {
1455 db
->CreateColumnFamily(options
, ColumnFamilyName(i
), &(cfh
[i
]));
1457 fprintf(stderr
, "create column family error: %s\n",
1458 s
.ToString().c_str());
1462 num_created
.store(new_num_created
, std::memory_order_release
);
1466 // a class that reports stats to CSV file
1467 class ReporterAgent
{
1469 ReporterAgent(Env
* env
, const std::string
& fname
,
1470 uint64_t report_interval_secs
)
1474 report_interval_secs_(report_interval_secs
),
1476 auto s
= env_
->NewWritableFile(fname
, &report_file_
, EnvOptions());
1478 s
= report_file_
->Append(Header() + "\n");
1481 s
= report_file_
->Flush();
1484 fprintf(stderr
, "Can't open %s: %s\n", fname
.c_str(),
1485 s
.ToString().c_str());
1489 reporting_thread_
= port::Thread([&]() { SleepAndReport(); });
1494 std::unique_lock
<std::mutex
> lk(mutex_
);
1496 stop_cv_
.notify_all();
1498 reporting_thread_
.join();
1502 void ReportFinishedOps(int64_t num_ops
) {
1503 total_ops_done_
.fetch_add(num_ops
);
1507 std::string
Header() const { return "secs_elapsed,interval_qps"; }
1508 void SleepAndReport() {
1509 uint64_t kMicrosInSecond
= 1000 * 1000;
1510 auto time_started
= env_
->NowMicros();
1513 std::unique_lock
<std::mutex
> lk(mutex_
);
1515 stop_cv_
.wait_for(lk
, std::chrono::seconds(report_interval_secs_
),
1516 [&]() { return stop_
; })) {
1520 // else -> timeout, which means time for a report!
1522 auto total_ops_done_snapshot
= total_ops_done_
.load();
1523 // round the seconds elapsed
1525 (env_
->NowMicros() - time_started
+ kMicrosInSecond
/ 2) /
1527 std::string report
= ToString(secs_elapsed
) + "," +
1528 ToString(total_ops_done_snapshot
- last_report_
) +
1530 auto s
= report_file_
->Append(report
);
1532 s
= report_file_
->Flush();
1536 "Can't write to report file (%s), stopping the reporting\n",
1537 s
.ToString().c_str());
1540 last_report_
= total_ops_done_snapshot
;
1545 std::unique_ptr
<WritableFile
> report_file_
;
1546 std::atomic
<int64_t> total_ops_done_
;
1547 int64_t last_report_
;
1548 const uint64_t report_interval_secs_
;
1549 rocksdb::port::Thread reporting_thread_
;
1551 // will notify on stop
1552 std::condition_variable stop_cv_
;
1556 enum OperationType
: unsigned char {
1570 static std::unordered_map
<OperationType
, std::string
, std::hash
<unsigned char>>
1571 OperationTypeString
= {
1574 {kDelete
, "delete"},
1577 {kUpdate
, "update"},
1578 {kCompress
, "compress"},
1579 {kCompress
, "uncompress"},
1585 class CombinedStats
;
1590 uint64_t sine_interval_
;
1594 uint64_t last_report_done_
;
1595 uint64_t next_report_
;
1597 uint64_t last_op_finish_
;
1598 uint64_t last_report_finish_
;
1599 std::unordered_map
<OperationType
, std::shared_ptr
<HistogramImpl
>,
1600 std::hash
<unsigned char>> hist_
;
1601 std::string message_
;
1602 bool exclude_from_merge_
;
1603 ReporterAgent
* reporter_agent_
; // does not own
1604 friend class CombinedStats
;
1607 Stats() { Start(-1); }
1609 void SetReporterAgent(ReporterAgent
* reporter_agent
) {
1610 reporter_agent_
= reporter_agent
;
1613 void Start(int id
) {
1615 next_report_
= FLAGS_stats_interval
? FLAGS_stats_interval
: 100;
1616 last_op_finish_
= start_
;
1619 last_report_done_
= 0;
1622 start_
= FLAGS_env
->NowMicros();
1623 sine_interval_
= FLAGS_env
->NowMicros();
1625 last_report_finish_
= start_
;
1627 // When set, stats from this thread won't be merged with others.
1628 exclude_from_merge_
= false;
1631 void Merge(const Stats
& other
) {
1632 if (other
.exclude_from_merge_
)
1635 for (auto it
= other
.hist_
.begin(); it
!= other
.hist_
.end(); ++it
) {
1636 auto this_it
= hist_
.find(it
->first
);
1637 if (this_it
!= hist_
.end()) {
1638 this_it
->second
->Merge(*(other
.hist_
.at(it
->first
)));
1640 hist_
.insert({ it
->first
, it
->second
});
1644 done_
+= other
.done_
;
1645 bytes_
+= other
.bytes_
;
1646 seconds_
+= other
.seconds_
;
1647 if (other
.start_
< start_
) start_
= other
.start_
;
1648 if (other
.finish_
> finish_
) finish_
= other
.finish_
;
1650 // Just keep the messages from one thread
1651 if (message_
.empty()) message_
= other
.message_
;
1655 finish_
= FLAGS_env
->NowMicros();
1656 seconds_
= (finish_
- start_
) * 1e-6;
1659 void AddMessage(Slice msg
) {
1660 AppendWithSpace(&message_
, msg
);
1663 void SetId(int id
) { id_
= id
; }
1664 void SetExcludeFromMerge() { exclude_from_merge_
= true; }
1666 void PrintThreadStatus() {
1667 std::vector
<ThreadStatus
> thread_list
;
1668 FLAGS_env
->GetThreadList(&thread_list
);
1670 fprintf(stderr
, "\n%18s %10s %12s %20s %13s %45s %12s %s\n",
1671 "ThreadID", "ThreadType", "cfName", "Operation",
1672 "ElapsedTime", "Stage", "State", "OperationProperties");
1674 int64_t current_time
= 0;
1675 Env::Default()->GetCurrentTime(¤t_time
);
1676 for (auto ts
: thread_list
) {
1677 fprintf(stderr
, "%18" PRIu64
" %10s %12s %20s %13s %45s %12s",
1679 ThreadStatus::GetThreadTypeName(ts
.thread_type
).c_str(),
1681 ThreadStatus::GetOperationName(ts
.operation_type
).c_str(),
1682 ThreadStatus::MicrosToString(ts
.op_elapsed_micros
).c_str(),
1683 ThreadStatus::GetOperationStageName(ts
.operation_stage
).c_str(),
1684 ThreadStatus::GetStateName(ts
.state_type
).c_str());
1686 auto op_properties
= ThreadStatus::InterpretOperationProperties(
1687 ts
.operation_type
, ts
.op_properties
);
1688 for (const auto& op_prop
: op_properties
) {
1689 fprintf(stderr
, " %s %" PRIu64
" |",
1690 op_prop
.first
.c_str(), op_prop
.second
);
1692 fprintf(stderr
, "\n");
1696 void ResetSineInterval() {
1697 sine_interval_
= FLAGS_env
->NowMicros();
1700 uint64_t GetSineInterval() {
1701 return sine_interval_
;
1704 uint64_t GetStart() {
1708 void ResetLastOpTime() {
1709 // Set to now to avoid latency from calls to SleepForMicroseconds
1710 last_op_finish_
= FLAGS_env
->NowMicros();
1713 void FinishedOps(DBWithColumnFamilies
* db_with_cfh
, DB
* db
, int64_t num_ops
,
1714 enum OperationType op_type
= kOthers
) {
1715 if (reporter_agent_
) {
1716 reporter_agent_
->ReportFinishedOps(num_ops
);
1718 if (FLAGS_histogram
) {
1719 uint64_t now
= FLAGS_env
->NowMicros();
1720 uint64_t micros
= now
- last_op_finish_
;
1722 if (hist_
.find(op_type
) == hist_
.end())
1724 auto hist_temp
= std::make_shared
<HistogramImpl
>();
1725 hist_
.insert({op_type
, std::move(hist_temp
)});
1727 hist_
[op_type
]->Add(micros
);
1729 if (micros
> 20000 && !FLAGS_stats_interval
) {
1730 fprintf(stderr
, "long op: %" PRIu64
" micros%30s\r", micros
, "");
1733 last_op_finish_
= now
;
1737 if (done_
>= next_report_
) {
1738 if (!FLAGS_stats_interval
) {
1739 if (next_report_
< 1000) next_report_
+= 100;
1740 else if (next_report_
< 5000) next_report_
+= 500;
1741 else if (next_report_
< 10000) next_report_
+= 1000;
1742 else if (next_report_
< 50000) next_report_
+= 5000;
1743 else if (next_report_
< 100000) next_report_
+= 10000;
1744 else if (next_report_
< 500000) next_report_
+= 50000;
1745 else next_report_
+= 100000;
1746 fprintf(stderr
, "... finished %" PRIu64
" ops%30s\r", done_
, "");
1748 uint64_t now
= FLAGS_env
->NowMicros();
1749 int64_t usecs_since_last
= now
- last_report_finish_
;
1751 // Determine whether to print status where interval is either
1752 // each N operations or each N seconds.
1754 if (FLAGS_stats_interval_seconds
&&
1755 usecs_since_last
< (FLAGS_stats_interval_seconds
* 1000000)) {
1756 // Don't check again for this many operations
1757 next_report_
+= FLAGS_stats_interval
;
1762 "%s ... thread %d: (%" PRIu64
",%" PRIu64
") ops and "
1763 "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
1764 FLAGS_env
->TimeToString(now
/1000000).c_str(),
1766 done_
- last_report_done_
, done_
,
1767 (done_
- last_report_done_
) /
1768 (usecs_since_last
/ 1000000.0),
1769 done_
/ ((now
- start_
) / 1000000.0),
1770 (now
- last_report_finish_
) / 1000000.0,
1771 (now
- start_
) / 1000000.0);
1773 if (id_
== 0 && FLAGS_stats_per_interval
) {
1776 if (db_with_cfh
&& db_with_cfh
->num_created
.load()) {
1777 for (size_t i
= 0; i
< db_with_cfh
->num_created
.load(); ++i
) {
1778 if (db
->GetProperty(db_with_cfh
->cfh
[i
], "rocksdb.cfstats",
1780 fprintf(stderr
, "%s\n", stats
.c_str());
1781 if (FLAGS_show_table_properties
) {
1782 for (int level
= 0; level
< FLAGS_num_levels
; ++level
) {
1783 if (db
->GetProperty(
1784 db_with_cfh
->cfh
[i
],
1785 "rocksdb.aggregated-table-properties-at-level" +
1788 if (stats
.find("# entries=0") == std::string::npos
) {
1789 fprintf(stderr
, "Level[%d]: %s\n", level
,
1797 if (db
->GetProperty("rocksdb.stats", &stats
)) {
1798 fprintf(stderr
, "%s\n", stats
.c_str());
1800 if (FLAGS_show_table_properties
) {
1801 for (int level
= 0; level
< FLAGS_num_levels
; ++level
) {
1802 if (db
->GetProperty(
1803 "rocksdb.aggregated-table-properties-at-level" +
1806 if (stats
.find("# entries=0") == std::string::npos
) {
1807 fprintf(stderr
, "Level[%d]: %s\n", level
, stats
.c_str());
1815 next_report_
+= FLAGS_stats_interval
;
1816 last_report_finish_
= now
;
1817 last_report_done_
= done_
;
1820 if (id_
== 0 && FLAGS_thread_status_per_interval
) {
1821 PrintThreadStatus();
1827 void AddBytes(int64_t n
) {
1831 void Report(const Slice
& name
) {
1832 // Pretend at least one op was done in case we are running a benchmark
1833 // that does not call FinishedOps().
1834 if (done_
< 1) done_
= 1;
1838 // Rate is computed on actual elapsed time, not the sum of per-thread
1840 double elapsed
= (finish_
- start_
) * 1e-6;
1842 snprintf(rate
, sizeof(rate
), "%6.1f MB/s",
1843 (bytes_
/ 1048576.0) / elapsed
);
1846 AppendWithSpace(&extra
, message_
);
1847 double elapsed
= (finish_
- start_
) * 1e-6;
1848 double throughput
= (double)done_
/elapsed
;
1850 fprintf(stdout
, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
1851 name
.ToString().c_str(),
1852 seconds_
* 1e6
/ done_
,
1854 (extra
.empty() ? "" : " "),
1856 if (FLAGS_histogram
) {
1857 for (auto it
= hist_
.begin(); it
!= hist_
.end(); ++it
) {
1858 fprintf(stdout
, "Microseconds per %s:\n%s\n",
1859 OperationTypeString
[it
->first
].c_str(),
1860 it
->second
->ToString().c_str());
1863 if (FLAGS_report_file_operations
) {
1864 ReportFileOpEnv
* env
= static_cast<ReportFileOpEnv
*>(FLAGS_env
);
1865 ReportFileOpCounters
* counters
= env
->counters();
1866 fprintf(stdout
, "Num files opened: %d\n",
1867 counters
->open_counter_
.load(std::memory_order_relaxed
));
1868 fprintf(stdout
, "Num Read(): %d\n",
1869 counters
->read_counter_
.load(std::memory_order_relaxed
));
1870 fprintf(stdout
, "Num Append(): %d\n",
1871 counters
->append_counter_
.load(std::memory_order_relaxed
));
1872 fprintf(stdout
, "Num bytes read: %" PRIu64
"\n",
1873 counters
->bytes_read_
.load(std::memory_order_relaxed
));
1874 fprintf(stdout
, "Num bytes written: %" PRIu64
"\n",
1875 counters
->bytes_written_
.load(std::memory_order_relaxed
));
1882 class CombinedStats
{
1884 void AddStats(const Stats
& stat
) {
1885 uint64_t total_ops
= stat
.done_
;
1886 uint64_t total_bytes_
= stat
.bytes_
;
1889 if (total_ops
< 1) {
1893 elapsed
= (stat
.finish_
- stat
.start_
) * 1e-6;
1894 throughput_ops_
.emplace_back(total_ops
/ elapsed
);
1896 if (total_bytes_
> 0) {
1897 double mbs
= (total_bytes_
/ 1048576.0);
1898 throughput_mbs_
.emplace_back(mbs
/ elapsed
);
1902 void Report(const std::string
& bench_name
) {
1903 const char* name
= bench_name
.c_str();
1904 int num_runs
= static_cast<int>(throughput_ops_
.size());
1906 if (throughput_mbs_
.size() == throughput_ops_
.size()) {
1908 "%s [AVG %d runs] : %d ops/sec; %6.1f MB/sec\n"
1909 "%s [MEDIAN %d runs] : %d ops/sec; %6.1f MB/sec\n",
1910 name
, num_runs
, static_cast<int>(CalcAvg(throughput_ops_
)),
1911 CalcAvg(throughput_mbs_
), name
, num_runs
,
1912 static_cast<int>(CalcMedian(throughput_ops_
)),
1913 CalcMedian(throughput_mbs_
));
1916 "%s [AVG %d runs] : %d ops/sec\n"
1917 "%s [MEDIAN %d runs] : %d ops/sec\n",
1918 name
, num_runs
, static_cast<int>(CalcAvg(throughput_ops_
)), name
,
1919 num_runs
, static_cast<int>(CalcMedian(throughput_ops_
)));
1924 double CalcAvg(std::vector
<double> data
) {
1926 for (double x
: data
) {
1929 avg
= avg
/ data
.size();
1933 double CalcMedian(std::vector
<double> data
) {
1934 assert(data
.size() > 0);
1935 std::sort(data
.begin(), data
.end());
1937 size_t mid
= data
.size() / 2;
1938 if (data
.size() % 2 == 1) {
1939 // Odd number of entries
1942 // Even number of entries
1943 return (data
[mid
] + data
[mid
- 1]) / 2;
1947 std::vector
<double> throughput_ops_
;
1948 std::vector
<double> throughput_mbs_
;
1951 class TimestampEmulator
{
1953 std::atomic
<uint64_t> timestamp_
;
1956 TimestampEmulator() : timestamp_(0) {}
1957 uint64_t Get() const { return timestamp_
.load(); }
1958 void Inc() { timestamp_
++; }
1961 // State shared by all concurrent executions of the same benchmark.
1962 struct SharedState
{
1967 std::shared_ptr
<RateLimiter
> write_rate_limiter
;
1968 std::shared_ptr
<RateLimiter
> read_rate_limiter
;
1970 // Each thread goes through the following states:
1972 // (2) waiting for others to be initialized
1976 long num_initialized
;
1980 SharedState() : cv(&mu
), perf_level(FLAGS_perf_level
) { }
1983 // Per-thread state for concurrent executions of the same benchmark.
1984 struct ThreadState
{
1985 int tid
; // 0..n-1 when running in n threads
1986 Random64 rand
; // Has different seeds for different threads
1988 SharedState
* shared
;
1990 /* implicit */ ThreadState(int index
)
1992 rand((FLAGS_seed
? FLAGS_seed
: 1000) + index
) {
1998 Duration(uint64_t max_seconds
, int64_t max_ops
, int64_t ops_per_stage
= 0) {
1999 max_seconds_
= max_seconds
;
2001 ops_per_stage_
= (ops_per_stage
> 0) ? ops_per_stage
: max_ops
;
2003 start_at_
= FLAGS_env
->NowMicros();
2006 int64_t GetStage() { return std::min(ops_
, max_ops_
- 1) / ops_per_stage_
; }
2008 bool Done(int64_t increment
) {
2009 if (increment
<= 0) increment
= 1; // avoid Done(0) and infinite loops
2013 // Recheck every appx 1000 ops (exact iff increment is factor of 1000)
2014 auto granularity
= FLAGS_ops_between_duration_checks
;
2015 if ((ops_
/ granularity
) != ((ops_
- increment
) / granularity
)) {
2016 uint64_t now
= FLAGS_env
->NowMicros();
2017 return ((now
- start_at_
) / 1000000) >= max_seconds_
;
2022 return ops_
> max_ops_
;
2027 uint64_t max_seconds_
;
2029 int64_t ops_per_stage_
;
2036 std::shared_ptr
<Cache
> cache_
;
2037 std::shared_ptr
<Cache
> compressed_cache_
;
2038 std::shared_ptr
<const FilterPolicy
> filter_policy_
;
2039 const SliceTransform
* prefix_extractor_
;
2040 DBWithColumnFamilies db_
;
2041 std::vector
<DBWithColumnFamilies
> multi_dbs_
;
2046 int64_t keys_per_prefix_
;
2047 int64_t entries_per_batch_
;
2048 int64_t writes_before_delete_range_
;
2049 int64_t writes_per_range_tombstone_
;
2050 int64_t range_tombstone_width_
;
2051 int64_t max_num_range_tombstones_
;
2052 WriteOptions write_options_
;
2053 Options open_options_
; // keep options around to properly destroy db later
2054 #ifndef ROCKSDB_LITE
2055 TraceOptions trace_options_
;
2059 double read_random_exp_range_
;
2061 int64_t readwrites_
;
2062 int64_t merge_keys_
;
2063 bool report_file_operations_
;
2065 std::vector
<std::string
> keys_
;
2067 class ErrorHandlerListener
: public EventListener
{
2069 #ifndef ROCKSDB_LITE
2070 ErrorHandlerListener()
2073 no_auto_recovery_(false),
2074 recovery_complete_(false) {}
2076 ~ErrorHandlerListener() override
{}
2078 void OnErrorRecoveryBegin(BackgroundErrorReason
/*reason*/,
2079 Status
/*bg_error*/,
2080 bool* auto_recovery
) override
{
2081 if (*auto_recovery
&& no_auto_recovery_
) {
2082 *auto_recovery
= false;
2086 void OnErrorRecoveryCompleted(Status
/*old_bg_error*/) override
{
2087 InstrumentedMutexLock
l(&mutex_
);
2088 recovery_complete_
= true;
2092 bool WaitForRecovery(uint64_t /*abs_time_us*/) {
2093 InstrumentedMutexLock
l(&mutex_
);
2094 if (!recovery_complete_
) {
2095 cv_
.Wait(/*abs_time_us*/);
2097 if (recovery_complete_
) {
2098 recovery_complete_
= false;
2104 void EnableAutoRecovery(bool enable
= true) { no_auto_recovery_
= !enable
; }
2107 InstrumentedMutex mutex_
;
2108 InstrumentedCondVar cv_
;
2109 bool no_auto_recovery_
;
2110 bool recovery_complete_
;
2111 #else // ROCKSDB_LITE
2112 bool WaitForRecovery(uint64_t /*abs_time_us*/) { return true; }
2113 void EnableAutoRecovery(bool /*enable*/) {}
2114 #endif // ROCKSDB_LITE
2117 std::shared_ptr
<ErrorHandlerListener
> listener_
;
2119 bool SanityCheck() {
2120 if (FLAGS_compression_ratio
> 1) {
2121 fprintf(stderr
, "compression_ratio should be between 0 and 1\n");
2127 inline bool CompressSlice(const CompressionInfo
& compression_info
,
2128 const Slice
& input
, std::string
* compressed
) {
2130 switch (FLAGS_compression_type_e
) {
2131 case rocksdb::kSnappyCompression
:
2132 ok
= Snappy_Compress(compression_info
, input
.data(), input
.size(),
2135 case rocksdb::kZlibCompression
:
2136 ok
= Zlib_Compress(compression_info
, 2, input
.data(), input
.size(),
2139 case rocksdb::kBZip2Compression
:
2140 ok
= BZip2_Compress(compression_info
, 2, input
.data(), input
.size(),
2143 case rocksdb::kLZ4Compression
:
2144 ok
= LZ4_Compress(compression_info
, 2, input
.data(), input
.size(),
2147 case rocksdb::kLZ4HCCompression
:
2148 ok
= LZ4HC_Compress(compression_info
, 2, input
.data(), input
.size(),
2151 case rocksdb::kXpressCompression
:
2152 ok
= XPRESS_Compress(input
.data(),
2153 input
.size(), compressed
);
2155 case rocksdb::kZSTD
:
2156 ok
= ZSTD_Compress(compression_info
, input
.data(), input
.size(),
2165 void PrintHeader() {
2167 fprintf(stdout
, "Keys: %d bytes each\n", FLAGS_key_size
);
2168 fprintf(stdout
, "Values: %d bytes each (%d bytes after compression)\n",
2170 static_cast<int>(FLAGS_value_size
* FLAGS_compression_ratio
+ 0.5));
2171 fprintf(stdout
, "Entries: %" PRIu64
"\n", num_
);
2172 fprintf(stdout
, "Prefix: %d bytes\n", FLAGS_prefix_size
);
2173 fprintf(stdout
, "Keys per prefix: %" PRIu64
"\n", keys_per_prefix_
);
2174 fprintf(stdout
, "RawSize: %.1f MB (estimated)\n",
2175 ((static_cast<int64_t>(FLAGS_key_size
+ FLAGS_value_size
) * num_
)
2177 fprintf(stdout
, "FileSize: %.1f MB (estimated)\n",
2178 (((FLAGS_key_size
+ FLAGS_value_size
* FLAGS_compression_ratio
)
2181 fprintf(stdout
, "Write rate: %" PRIu64
" bytes/second\n",
2182 FLAGS_benchmark_write_rate_limit
);
2183 fprintf(stdout
, "Read rate: %" PRIu64
" ops/second\n",
2184 FLAGS_benchmark_read_rate_limit
);
2185 if (FLAGS_enable_numa
) {
2186 fprintf(stderr
, "Running in NUMA enabled mode.\n");
2188 fprintf(stderr
, "NUMA is not defined in the system.\n");
2191 if (numa_available() == -1) {
2192 fprintf(stderr
, "NUMA is not supported by the system.\n");
2198 auto compression
= CompressionTypeToString(FLAGS_compression_type_e
);
2199 fprintf(stdout
, "Compression: %s\n", compression
.c_str());
2200 fprintf(stdout
, "Compression sampling rate: %" PRId64
"\n",
2201 FLAGS_sample_for_compression
);
2203 switch (FLAGS_rep_factory
) {
2205 fprintf(stdout
, "Memtablerep: prefix_hash\n");
2208 fprintf(stdout
, "Memtablerep: skip_list\n");
2211 fprintf(stdout
, "Memtablerep: vector\n");
2213 case kHashLinkedList
:
2214 fprintf(stdout
, "Memtablerep: hash_linkedlist\n");
2217 fprintf(stdout
, "Perf Level: %d\n", FLAGS_perf_level
);
2219 PrintWarnings(compression
.c_str());
2220 fprintf(stdout
, "------------------------------------------------\n");
2223 void PrintWarnings(const char* compression
) {
2224 #if defined(__GNUC__) && !defined(__OPTIMIZE__)
2226 "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
2231 "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
2233 if (FLAGS_compression_type_e
!= rocksdb::kNoCompression
) {
2234 // The test string should not be too small.
2235 const int len
= FLAGS_block_size
;
2236 std::string
input_str(len
, 'y');
2237 std::string compressed
;
2238 CompressionOptions opts
;
2239 CompressionContext
context(FLAGS_compression_type_e
);
2240 CompressionInfo
info(opts
, context
, CompressionDict::GetEmptyDict(),
2241 FLAGS_compression_type_e
,
2242 FLAGS_sample_for_compression
);
2243 bool result
= CompressSlice(info
, Slice(input_str
), &compressed
);
2246 fprintf(stdout
, "WARNING: %s compression is not enabled\n",
2248 } else if (compressed
.size() >= input_str
.size()) {
2249 fprintf(stdout
, "WARNING: %s compression is not effective\n",
2255 // Current the following isn't equivalent to OS_LINUX.
2256 #if defined(__linux)
2257 static Slice
TrimSpace(Slice s
) {
2258 unsigned int start
= 0;
2259 while (start
< s
.size() && isspace(s
[start
])) {
2262 unsigned int limit
= static_cast<unsigned int>(s
.size());
2263 while (limit
> start
&& isspace(s
[limit
-1])) {
2266 return Slice(s
.data() + start
, limit
- start
);
2270 void PrintEnvironment() {
2271 fprintf(stderr
, "RocksDB: version %d.%d\n",
2272 kMajorVersion
, kMinorVersion
);
2274 #if defined(__linux)
2275 time_t now
= time(nullptr);
2277 // Lint complains about ctime() usage, so replace it with ctime_r(). The
2278 // requirement is to provide a buffer which is at least 26 bytes.
2279 fprintf(stderr
, "Date: %s",
2280 ctime_r(&now
, buf
)); // ctime_r() adds newline
2282 FILE* cpuinfo
= fopen("/proc/cpuinfo", "r");
2283 if (cpuinfo
!= nullptr) {
2286 std::string cpu_type
;
2287 std::string cache_size
;
2288 while (fgets(line
, sizeof(line
), cpuinfo
) != nullptr) {
2289 const char* sep
= strchr(line
, ':');
2290 if (sep
== nullptr) {
2293 Slice key
= TrimSpace(Slice(line
, sep
- 1 - line
));
2294 Slice val
= TrimSpace(Slice(sep
+ 1));
2295 if (key
== "model name") {
2297 cpu_type
= val
.ToString();
2298 } else if (key
== "cache size") {
2299 cache_size
= val
.ToString();
2303 fprintf(stderr
, "CPU: %d * %s\n", num_cpus
, cpu_type
.c_str());
2304 fprintf(stderr
, "CPUCache: %s\n", cache_size
.c_str());
2309 static bool KeyExpired(const TimestampEmulator
* timestamp_emulator
,
2311 const char* pos
= key
.data();
2313 uint64_t timestamp
= 0;
2314 if (port::kLittleEndian
) {
2315 int bytes_to_fill
= 8;
2316 for (int i
= 0; i
< bytes_to_fill
; ++i
) {
2317 timestamp
|= (static_cast<uint64_t>(static_cast<unsigned char>(pos
[i
]))
2318 << ((bytes_to_fill
- i
- 1) << 3));
2321 memcpy(×tamp
, pos
, sizeof(timestamp
));
2323 return timestamp_emulator
->Get() - timestamp
> FLAGS_time_range
;
2326 class ExpiredTimeFilter
: public CompactionFilter
{
2328 explicit ExpiredTimeFilter(
2329 const std::shared_ptr
<TimestampEmulator
>& timestamp_emulator
)
2330 : timestamp_emulator_(timestamp_emulator
) {}
2331 bool Filter(int /*level*/, const Slice
& key
,
2332 const Slice
& /*existing_value*/, std::string
* /*new_value*/,
2333 bool* /*value_changed*/) const override
{
2334 return KeyExpired(timestamp_emulator_
.get(), key
);
2336 const char* Name() const override
{ return "ExpiredTimeFilter"; }
2339 std::shared_ptr
<TimestampEmulator
> timestamp_emulator_
;
2342 class KeepFilter
: public CompactionFilter
{
2344 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
& /*value*/,
2345 std::string
* /*new_value*/,
2346 bool* /*value_changed*/) const override
{
2350 const char* Name() const override
{ return "KeepFilter"; }
2353 std::shared_ptr
<Cache
> NewCache(int64_t capacity
) {
2354 if (capacity
<= 0) {
2357 if (FLAGS_use_clock_cache
) {
2358 auto cache
= NewClockCache((size_t)capacity
, FLAGS_cache_numshardbits
);
2360 fprintf(stderr
, "Clock cache not supported.");
2365 return NewLRUCache((size_t)capacity
, FLAGS_cache_numshardbits
,
2366 false /*strict_capacity_limit*/,
2367 FLAGS_cache_high_pri_pool_ratio
);
2373 : cache_(NewCache(FLAGS_cache_size
)),
2374 compressed_cache_(NewCache(FLAGS_compressed_cache_size
)),
2375 filter_policy_(FLAGS_bloom_bits
>= 0
2376 ? NewBloomFilterPolicy(FLAGS_bloom_bits
,
2377 FLAGS_use_block_based_filter
)
2379 prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size
)),
2381 value_size_(FLAGS_value_size
),
2382 key_size_(FLAGS_key_size
),
2383 prefix_size_(FLAGS_prefix_size
),
2384 keys_per_prefix_(FLAGS_keys_per_prefix
),
2385 entries_per_batch_(1),
2386 reads_(FLAGS_reads
< 0 ? FLAGS_num
: FLAGS_reads
),
2387 read_random_exp_range_(0.0),
2388 writes_(FLAGS_writes
< 0 ? FLAGS_num
: FLAGS_writes
),
2390 (FLAGS_writes
< 0 && FLAGS_reads
< 0)
2392 : ((FLAGS_writes
> FLAGS_reads
) ? FLAGS_writes
: FLAGS_reads
)),
2393 merge_keys_(FLAGS_merge_keys
< 0 ? FLAGS_num
: FLAGS_merge_keys
),
2394 report_file_operations_(FLAGS_report_file_operations
),
2395 #ifndef ROCKSDB_LITE
2396 use_blob_db_(FLAGS_use_blob_db
)
2399 #endif // !ROCKSDB_LITE
2401 // use simcache instead of cache
2402 if (FLAGS_simcache_size
>= 0) {
2403 if (FLAGS_cache_numshardbits
>= 1) {
2405 NewSimCache(cache_
, FLAGS_simcache_size
, FLAGS_cache_numshardbits
);
2407 cache_
= NewSimCache(cache_
, FLAGS_simcache_size
, 0);
2411 if (report_file_operations_
) {
2412 if (!FLAGS_hdfs
.empty()) {
2414 "--hdfs and --report_file_operations cannot be enabled "
2415 "at the same time");
2418 FLAGS_env
= new ReportFileOpEnv(rocksdb::Env::Default());
2421 if (FLAGS_prefix_size
> FLAGS_key_size
) {
2422 fprintf(stderr
, "prefix size is larger than key size");
2426 std::vector
<std::string
> files
;
2427 FLAGS_env
->GetChildren(FLAGS_db
, &files
);
2428 for (size_t i
= 0; i
< files
.size(); i
++) {
2429 if (Slice(files
[i
]).starts_with("heap-")) {
2430 FLAGS_env
->DeleteFile(FLAGS_db
+ "/" + files
[i
]);
2433 if (!FLAGS_use_existing_db
) {
2435 if (!FLAGS_wal_dir
.empty()) {
2436 options
.wal_dir
= FLAGS_wal_dir
;
2438 #ifndef ROCKSDB_LITE
2440 blob_db::DestroyBlobDB(FLAGS_db
, options
, blob_db::BlobDBOptions());
2442 #endif // !ROCKSDB_LITE
2443 DestroyDB(FLAGS_db
, options
);
2444 if (!FLAGS_wal_dir
.empty()) {
2445 FLAGS_env
->DeleteDir(FLAGS_wal_dir
);
2448 if (FLAGS_num_multi_db
> 1) {
2449 FLAGS_env
->CreateDir(FLAGS_db
);
2450 if (!FLAGS_wal_dir
.empty()) {
2451 FLAGS_env
->CreateDir(FLAGS_wal_dir
);
2456 listener_
.reset(new ErrorHandlerListener());
2461 delete prefix_extractor_
;
2462 if (cache_
.get() != nullptr) {
2463 // this will leak, but we're shutting down so nobody cares
2464 cache_
->DisownData();
2468 Slice
AllocateKey(std::unique_ptr
<const char[]>* key_guard
) {
2469 char* data
= new char[key_size_
];
2470 const char* const_data
= data
;
2471 key_guard
->reset(const_data
);
2472 return Slice(key_guard
->get(), key_size_
);
2475 // Generate key according to the given specification and random number.
2476 // The resulting key will have the following format (if keys_per_prefix_
2477 // is positive), extra trailing bytes are either cut off or padded with '0'.
2478 // The prefix value is derived from key value.
2479 // ----------------------------
2480 // | prefix 00000 | key 00000 |
2481 // ----------------------------
2482 // If keys_per_prefix_ is 0, the key is simply a binary representation of
2483 // random number followed by trailing '0's
2484 // ----------------------------
2486 // ----------------------------
2487 void GenerateKeyFromInt(uint64_t v
, int64_t num_keys
, Slice
* key
) {
2488 if (!keys_
.empty()) {
2489 assert(FLAGS_use_existing_keys
);
2490 assert(keys_
.size() == static_cast<size_t>(num_keys
));
2491 assert(v
< static_cast<uint64_t>(num_keys
));
2495 char* start
= const_cast<char*>(key
->data());
2497 if (keys_per_prefix_
> 0) {
2498 int64_t num_prefix
= num_keys
/ keys_per_prefix_
;
2499 int64_t prefix
= v
% num_prefix
;
2500 int bytes_to_fill
= std::min(prefix_size_
, 8);
2501 if (port::kLittleEndian
) {
2502 for (int i
= 0; i
< bytes_to_fill
; ++i
) {
2503 pos
[i
] = (prefix
>> ((bytes_to_fill
- i
- 1) << 3)) & 0xFF;
2506 memcpy(pos
, static_cast<void*>(&prefix
), bytes_to_fill
);
2508 if (prefix_size_
> 8) {
2509 // fill the rest with 0s
2510 memset(pos
+ 8, '0', prefix_size_
- 8);
2512 pos
+= prefix_size_
;
2515 int bytes_to_fill
= std::min(key_size_
- static_cast<int>(pos
- start
), 8);
2516 if (port::kLittleEndian
) {
2517 for (int i
= 0; i
< bytes_to_fill
; ++i
) {
2518 pos
[i
] = (v
>> ((bytes_to_fill
- i
- 1) << 3)) & 0xFF;
2521 memcpy(pos
, static_cast<void*>(&v
), bytes_to_fill
);
2523 pos
+= bytes_to_fill
;
2524 if (key_size_
> pos
- start
) {
2525 memset(pos
, '0', key_size_
- (pos
- start
));
2529 std::string
GetPathForMultiple(std::string base_name
, size_t id
) {
2530 if (!base_name
.empty()) {
2532 if (base_name
.back() != '/') {
2536 if (base_name
.back() != '\\') {
2541 return base_name
+ ToString(id
);
2544 void VerifyDBFromDB(std::string
& truth_db_name
) {
2545 DBWithColumnFamilies truth_db
;
2546 auto s
= DB::OpenForReadOnly(open_options_
, truth_db_name
, &truth_db
.db
);
2548 fprintf(stderr
, "open error: %s\n", s
.ToString().c_str());
2552 ro
.total_order_seek
= true;
2553 std::unique_ptr
<Iterator
> truth_iter(truth_db
.db
->NewIterator(ro
));
2554 std::unique_ptr
<Iterator
> db_iter(db_
.db
->NewIterator(ro
));
2555 // Verify that all the key/values in truth_db are retrivable in db with ::Get
2556 fprintf(stderr
, "Verifying db >= truth_db with ::Get...\n");
2557 for (truth_iter
->SeekToFirst(); truth_iter
->Valid(); truth_iter
->Next()) {
2559 s
= db_
.db
->Get(ro
, truth_iter
->key(), &value
);
2561 // TODO(myabandeh): provide debugging hints
2562 assert(Slice(value
) == truth_iter
->value());
2564 // Verify that the db iterator does not give any extra key/value
2565 fprintf(stderr
, "Verifying db == truth_db...\n");
2566 for (db_iter
->SeekToFirst(), truth_iter
->SeekToFirst(); db_iter
->Valid(); db_iter
->Next(), truth_iter
->Next()) {
2567 assert(truth_iter
->Valid());
2568 assert(truth_iter
->value() == db_iter
->value());
2570 // No more key should be left unchecked in truth_db
2571 assert(!truth_iter
->Valid());
2572 fprintf(stderr
, "...Verified\n");
2576 if (!SanityCheck()) {
2579 Open(&open_options_
);
2581 std::stringstream
benchmark_stream(FLAGS_benchmarks
);
2583 std::unique_ptr
<ExpiredTimeFilter
> filter
;
2584 while (std::getline(benchmark_stream
, name
, ',')) {
2585 // Sanitize parameters
2587 reads_
= (FLAGS_reads
< 0 ? FLAGS_num
: FLAGS_reads
);
2588 writes_
= (FLAGS_writes
< 0 ? FLAGS_num
: FLAGS_writes
);
2589 deletes_
= (FLAGS_deletes
< 0 ? FLAGS_num
: FLAGS_deletes
);
2590 value_size_
= FLAGS_value_size
;
2591 key_size_
= FLAGS_key_size
;
2592 entries_per_batch_
= FLAGS_batch_size
;
2593 writes_before_delete_range_
= FLAGS_writes_before_delete_range
;
2594 writes_per_range_tombstone_
= FLAGS_writes_per_range_tombstone
;
2595 range_tombstone_width_
= FLAGS_range_tombstone_width
;
2596 max_num_range_tombstones_
= FLAGS_max_num_range_tombstones
;
2597 write_options_
= WriteOptions();
2598 read_random_exp_range_
= FLAGS_read_random_exp_range
;
2600 write_options_
.sync
= true;
2602 write_options_
.disableWAL
= FLAGS_disable_wal
;
2604 void (Benchmark::*method
)(ThreadState
*) = nullptr;
2605 void (Benchmark::*post_process_method
)() = nullptr;
2607 bool fresh_db
= false;
2608 int num_threads
= FLAGS_threads
;
2612 if (!name
.empty() && *name
.rbegin() == ']') {
2613 auto it
= name
.find('[');
2614 if (it
== std::string::npos
) {
2615 fprintf(stderr
, "unknown benchmark arguments '%s'\n", name
.c_str());
2618 std::string args
= name
.substr(it
+ 1);
2619 args
.resize(args
.size() - 1);
2622 std::string bench_arg
;
2623 std::stringstream
args_stream(args
);
2624 while (std::getline(args_stream
, bench_arg
, '-')) {
2625 if (bench_arg
.empty()) {
2628 if (bench_arg
[0] == 'X') {
2629 // Repeat the benchmark n times
2630 std::string num_str
= bench_arg
.substr(1);
2631 num_repeat
= std::stoi(num_str
);
2632 } else if (bench_arg
[0] == 'W') {
2633 // Warm up the benchmark for n times
2634 std::string num_str
= bench_arg
.substr(1);
2635 num_warmup
= std::stoi(num_str
);
2640 // Both fillseqdeterministic and filluniquerandomdeterministic
2641 // fill the levels except the max level with UNIQUE_RANDOM
2642 // and fill the max level with fillseq and filluniquerandom, respectively
2643 if (name
== "fillseqdeterministic" ||
2644 name
== "filluniquerandomdeterministic") {
2645 if (!FLAGS_disable_auto_compactions
) {
2647 "Please disable_auto_compactions in FillDeterministic "
2651 if (num_threads
> 1) {
2653 "filldeterministic multithreaded not supported"
2654 ", use 1 thread\n");
2658 if (name
== "fillseqdeterministic") {
2659 method
= &Benchmark::WriteSeqDeterministic
;
2661 method
= &Benchmark::WriteUniqueRandomDeterministic
;
2663 } else if (name
== "fillseq") {
2665 method
= &Benchmark::WriteSeq
;
2666 } else if (name
== "fillbatch") {
2668 entries_per_batch_
= 1000;
2669 method
= &Benchmark::WriteSeq
;
2670 } else if (name
== "fillrandom") {
2672 method
= &Benchmark::WriteRandom
;
2673 } else if (name
== "filluniquerandom") {
2675 if (num_threads
> 1) {
2677 "filluniquerandom multithreaded not supported"
2681 method
= &Benchmark::WriteUniqueRandom
;
2682 } else if (name
== "overwrite") {
2683 method
= &Benchmark::WriteRandom
;
2684 } else if (name
== "fillsync") {
2687 write_options_
.sync
= true;
2688 method
= &Benchmark::WriteRandom
;
2689 } else if (name
== "fill100K") {
2692 value_size_
= 100 * 1000;
2693 method
= &Benchmark::WriteRandom
;
2694 } else if (name
== "readseq") {
2695 method
= &Benchmark::ReadSequential
;
2696 } else if (name
== "readtocache") {
2697 method
= &Benchmark::ReadSequential
;
2700 } else if (name
== "readreverse") {
2701 method
= &Benchmark::ReadReverse
;
2702 } else if (name
== "readrandom") {
2703 method
= &Benchmark::ReadRandom
;
2704 } else if (name
== "readrandomfast") {
2705 method
= &Benchmark::ReadRandomFast
;
2706 } else if (name
== "multireadrandom") {
2707 fprintf(stderr
, "entries_per_batch = %" PRIi64
"\n",
2708 entries_per_batch_
);
2709 method
= &Benchmark::MultiReadRandom
;
2710 } else if (name
== "mixgraph") {
2711 method
= &Benchmark::MixGraph
;
2712 } else if (name
== "readmissing") {
2714 method
= &Benchmark::ReadRandom
;
2715 } else if (name
== "newiterator") {
2716 method
= &Benchmark::IteratorCreation
;
2717 } else if (name
== "newiteratorwhilewriting") {
2718 num_threads
++; // Add extra thread for writing
2719 method
= &Benchmark::IteratorCreationWhileWriting
;
2720 } else if (name
== "seekrandom") {
2721 method
= &Benchmark::SeekRandom
;
2722 } else if (name
== "seekrandomwhilewriting") {
2723 num_threads
++; // Add extra thread for writing
2724 method
= &Benchmark::SeekRandomWhileWriting
;
2725 } else if (name
== "seekrandomwhilemerging") {
2726 num_threads
++; // Add extra thread for merging
2727 method
= &Benchmark::SeekRandomWhileMerging
;
2728 } else if (name
== "readrandomsmall") {
2730 method
= &Benchmark::ReadRandom
;
2731 } else if (name
== "deleteseq") {
2732 method
= &Benchmark::DeleteSeq
;
2733 } else if (name
== "deleterandom") {
2734 method
= &Benchmark::DeleteRandom
;
2735 } else if (name
== "readwhilewriting") {
2736 num_threads
++; // Add extra thread for writing
2737 method
= &Benchmark::ReadWhileWriting
;
2738 } else if (name
== "readwhilemerging") {
2739 num_threads
++; // Add extra thread for writing
2740 method
= &Benchmark::ReadWhileMerging
;
2741 } else if (name
== "readwhilescanning") {
2742 num_threads
++; // Add extra thread for scaning
2743 method
= &Benchmark::ReadWhileScanning
;
2744 } else if (name
== "readrandomwriterandom") {
2745 method
= &Benchmark::ReadRandomWriteRandom
;
2746 } else if (name
== "readrandommergerandom") {
2747 if (FLAGS_merge_operator
.empty()) {
2748 fprintf(stdout
, "%-12s : skipped (--merge_operator is unknown)\n",
2752 method
= &Benchmark::ReadRandomMergeRandom
;
2753 } else if (name
== "updaterandom") {
2754 method
= &Benchmark::UpdateRandom
;
2755 } else if (name
== "xorupdaterandom") {
2756 method
= &Benchmark::XORUpdateRandom
;
2757 } else if (name
== "appendrandom") {
2758 method
= &Benchmark::AppendRandom
;
2759 } else if (name
== "mergerandom") {
2760 if (FLAGS_merge_operator
.empty()) {
2761 fprintf(stdout
, "%-12s : skipped (--merge_operator is unknown)\n",
2765 method
= &Benchmark::MergeRandom
;
2766 } else if (name
== "randomwithverify") {
2767 method
= &Benchmark::RandomWithVerify
;
2768 } else if (name
== "fillseekseq") {
2769 method
= &Benchmark::WriteSeqSeekSeq
;
2770 } else if (name
== "compact") {
2771 method
= &Benchmark::Compact
;
2772 } else if (name
== "compactall") {
2774 } else if (name
== "crc32c") {
2775 method
= &Benchmark::Crc32c
;
2776 } else if (name
== "xxhash") {
2777 method
= &Benchmark::xxHash
;
2778 } else if (name
== "acquireload") {
2779 method
= &Benchmark::AcquireLoad
;
2780 } else if (name
== "compress") {
2781 method
= &Benchmark::Compress
;
2782 } else if (name
== "uncompress") {
2783 method
= &Benchmark::Uncompress
;
2784 #ifndef ROCKSDB_LITE
2785 } else if (name
== "randomtransaction") {
2786 method
= &Benchmark::RandomTransaction
;
2787 post_process_method
= &Benchmark::RandomTransactionVerify
;
2788 #endif // ROCKSDB_LITE
2789 } else if (name
== "randomreplacekeys") {
2791 method
= &Benchmark::RandomReplaceKeys
;
2792 } else if (name
== "timeseries") {
2793 timestamp_emulator_
.reset(new TimestampEmulator());
2794 if (FLAGS_expire_style
== "compaction_filter") {
2795 filter
.reset(new ExpiredTimeFilter(timestamp_emulator_
));
2796 fprintf(stdout
, "Compaction filter is used to remove expired data");
2797 open_options_
.compaction_filter
= filter
.get();
2800 method
= &Benchmark::TimeSeries
;
2801 } else if (name
== "stats") {
2802 PrintStats("rocksdb.stats");
2803 } else if (name
== "resetstats") {
2805 } else if (name
== "verify") {
2806 VerifyDBFromDB(FLAGS_truth_db
);
2807 } else if (name
== "levelstats") {
2808 PrintStats("rocksdb.levelstats");
2809 } else if (name
== "sstables") {
2810 PrintStats("rocksdb.sstables");
2811 } else if (name
== "replay") {
2812 if (num_threads
> 1) {
2813 fprintf(stderr
, "Multi-threaded replay is not yet supported\n");
2816 if (FLAGS_trace_file
== "") {
2817 fprintf(stderr
, "Please set --trace_file to be replayed from\n");
2820 method
= &Benchmark::Replay
;
2821 } else if (!name
.empty()) { // No error message for empty name
2822 fprintf(stderr
, "unknown benchmark '%s'\n", name
.c_str());
2827 if (FLAGS_use_existing_db
) {
2828 fprintf(stdout
, "%-12s : skipped (--use_existing_db is true)\n",
2832 if (db_
.db
!= nullptr) {
2834 DestroyDB(FLAGS_db
, open_options_
);
2836 Options options
= open_options_
;
2837 for (size_t i
= 0; i
< multi_dbs_
.size(); i
++) {
2838 delete multi_dbs_
[i
].db
;
2839 if (!open_options_
.wal_dir
.empty()) {
2840 options
.wal_dir
= GetPathForMultiple(open_options_
.wal_dir
, i
);
2842 DestroyDB(GetPathForMultiple(FLAGS_db
, i
), options
);
2846 Open(&open_options_
); // use open_options for the last accessed
2849 if (method
!= nullptr) {
2850 fprintf(stdout
, "DB path: [%s]\n", FLAGS_db
.c_str());
2852 #ifndef ROCKSDB_LITE
2853 // A trace_file option can be provided both for trace and replay
2854 // operations. But db_bench does not support tracing and replaying at
2855 // the same time, for now. So, start tracing only when it is not a
2857 if (FLAGS_trace_file
!= "" && name
!= "replay") {
2858 std::unique_ptr
<TraceWriter
> trace_writer
;
2859 Status s
= NewFileTraceWriter(FLAGS_env
, EnvOptions(),
2860 FLAGS_trace_file
, &trace_writer
);
2862 fprintf(stderr
, "Encountered an error starting a trace, %s\n",
2863 s
.ToString().c_str());
2866 s
= db_
.db
->StartTrace(trace_options_
, std::move(trace_writer
));
2868 fprintf(stderr
, "Encountered an error starting a trace, %s\n",
2869 s
.ToString().c_str());
2872 fprintf(stdout
, "Tracing the workload to: [%s]\n",
2873 FLAGS_trace_file
.c_str());
2875 #endif // ROCKSDB_LITE
2877 if (num_warmup
> 0) {
2878 printf("Warming up benchmark by running %d times\n", num_warmup
);
2881 for (int i
= 0; i
< num_warmup
; i
++) {
2882 RunBenchmark(num_threads
, name
, method
);
2885 if (num_repeat
> 1) {
2886 printf("Running benchmark for %d times\n", num_repeat
);
2889 CombinedStats combined_stats
;
2890 for (int i
= 0; i
< num_repeat
; i
++) {
2891 Stats stats
= RunBenchmark(num_threads
, name
, method
);
2892 combined_stats
.AddStats(stats
);
2894 if (num_repeat
> 1) {
2895 combined_stats
.Report(name
);
2898 if (post_process_method
!= nullptr) {
2899 (this->*post_process_method
)();
2903 #ifndef ROCKSDB_LITE
2904 if (name
!= "replay" && FLAGS_trace_file
!= "") {
2905 Status s
= db_
.db
->EndTrace();
2907 fprintf(stderr
, "Encountered an error ending the trace, %s\n",
2908 s
.ToString().c_str());
2911 #endif // ROCKSDB_LITE
2913 if (FLAGS_statistics
) {
2914 fprintf(stdout
, "STATISTICS:\n%s\n", dbstats
->ToString().c_str());
2916 if (FLAGS_simcache_size
>= 0) {
2917 fprintf(stdout
, "SIMULATOR CACHE STATISTICS:\n%s\n",
2918 static_cast_with_check
<SimCache
, Cache
>(cache_
.get())
2925 std::shared_ptr
<TimestampEmulator
> timestamp_emulator_
;
2929 SharedState
* shared
;
2930 ThreadState
* thread
;
2931 void (Benchmark::*method
)(ThreadState
*);
2934 static void ThreadBody(void* v
) {
2935 ThreadArg
* arg
= reinterpret_cast<ThreadArg
*>(v
);
2936 SharedState
* shared
= arg
->shared
;
2937 ThreadState
* thread
= arg
->thread
;
2939 MutexLock
l(&shared
->mu
);
2940 shared
->num_initialized
++;
2941 if (shared
->num_initialized
>= shared
->total
) {
2942 shared
->cv
.SignalAll();
2944 while (!shared
->start
) {
2949 SetPerfLevel(static_cast<PerfLevel
> (shared
->perf_level
));
2950 perf_context
.EnablePerLevelPerfContext();
2951 thread
->stats
.Start(thread
->tid
);
2952 (arg
->bm
->*(arg
->method
))(thread
);
2953 thread
->stats
.Stop();
2956 MutexLock
l(&shared
->mu
);
2958 if (shared
->num_done
>= shared
->total
) {
2959 shared
->cv
.SignalAll();
2964 Stats
RunBenchmark(int n
, Slice name
,
2965 void (Benchmark::*method
)(ThreadState
*)) {
2968 shared
.num_initialized
= 0;
2969 shared
.num_done
= 0;
2970 shared
.start
= false;
2971 if (FLAGS_benchmark_write_rate_limit
> 0) {
2972 shared
.write_rate_limiter
.reset(
2973 NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit
));
2975 if (FLAGS_benchmark_read_rate_limit
> 0) {
2976 shared
.read_rate_limiter
.reset(NewGenericRateLimiter(
2977 FLAGS_benchmark_read_rate_limit
, 100000 /* refill_period_us */,
2978 10 /* fairness */, RateLimiter::Mode::kReadsOnly
));
2981 std::unique_ptr
<ReporterAgent
> reporter_agent
;
2982 if (FLAGS_report_interval_seconds
> 0) {
2983 reporter_agent
.reset(new ReporterAgent(FLAGS_env
, FLAGS_report_file
,
2984 FLAGS_report_interval_seconds
));
2987 ThreadArg
* arg
= new ThreadArg
[n
];
2989 for (int i
= 0; i
< n
; i
++) {
2991 if (FLAGS_enable_numa
) {
2992 // Performs a local allocation of memory to threads in numa node.
2993 int n_nodes
= numa_num_task_nodes(); // Number of nodes in NUMA.
2994 numa_exit_on_error
= 1;
2995 int numa_node
= i
% n_nodes
;
2996 bitmask
* nodes
= numa_allocate_nodemask();
2997 numa_bitmask_clearall(nodes
);
2998 numa_bitmask_setbit(nodes
, numa_node
);
2999 // numa_bind() call binds the process to the node and these
3000 // properties are passed on to the thread that is created in
3001 // StartThread method called later in the loop.
3004 numa_free_nodemask(nodes
);
3008 arg
[i
].method
= method
;
3009 arg
[i
].shared
= &shared
;
3010 arg
[i
].thread
= new ThreadState(i
);
3011 arg
[i
].thread
->stats
.SetReporterAgent(reporter_agent
.get());
3012 arg
[i
].thread
->shared
= &shared
;
3013 FLAGS_env
->StartThread(ThreadBody
, &arg
[i
]);
3017 while (shared
.num_initialized
< n
) {
3021 shared
.start
= true;
3022 shared
.cv
.SignalAll();
3023 while (shared
.num_done
< n
) {
3028 // Stats for some threads can be excluded.
3030 for (int i
= 0; i
< n
; i
++) {
3031 merge_stats
.Merge(arg
[i
].thread
->stats
);
3033 merge_stats
.Report(name
);
3035 for (int i
= 0; i
< n
; i
++) {
3036 delete arg
[i
].thread
;
3043 void Crc32c(ThreadState
* thread
) {
3044 // Checksum about 500MB of data total
3045 const int size
= FLAGS_block_size
; // use --block_size option for db_bench
3046 std::string labels
= "(" + ToString(FLAGS_block_size
) + " per op)";
3047 const char* label
= labels
.c_str();
3049 std::string
data(size
, 'x');
3052 while (bytes
< 500 * 1048576) {
3053 crc
= crc32c::Value(data
.data(), size
);
3054 thread
->stats
.FinishedOps(nullptr, nullptr, 1, kCrc
);
3057 // Print so result is not dead
3058 fprintf(stderr
, "... crc=0x%x\r", static_cast<unsigned int>(crc
));
3060 thread
->stats
.AddBytes(bytes
);
3061 thread
->stats
.AddMessage(label
);
3064 void xxHash(ThreadState
* thread
) {
3065 // Checksum about 500MB of data total
3066 const int size
= 4096;
3067 const char* label
= "(4K per op)";
3068 std::string
data(size
, 'x');
3070 unsigned int xxh32
= 0;
3071 while (bytes
< 500 * 1048576) {
3072 xxh32
= XXH32(data
.data(), size
, 0);
3073 thread
->stats
.FinishedOps(nullptr, nullptr, 1, kHash
);
3076 // Print so result is not dead
3077 fprintf(stderr
, "... xxh32=0x%x\r", static_cast<unsigned int>(xxh32
));
3079 thread
->stats
.AddBytes(bytes
);
3080 thread
->stats
.AddMessage(label
);
3083 void AcquireLoad(ThreadState
* thread
) {
3085 std::atomic
<void*> ap(&dummy
);
3087 void *ptr
= nullptr;
3088 thread
->stats
.AddMessage("(each op is 1000 loads)");
3089 while (count
< 100000) {
3090 for (int i
= 0; i
< 1000; i
++) {
3091 ptr
= ap
.load(std::memory_order_acquire
);
3094 thread
->stats
.FinishedOps(nullptr, nullptr, 1, kOthers
);
3096 if (ptr
== nullptr) exit(1); // Disable unused variable warning.
3099 void Compress(ThreadState
*thread
) {
3100 RandomGenerator gen
;
3101 Slice input
= gen
.Generate(FLAGS_block_size
);
3103 int64_t produced
= 0;
3105 std::string compressed
;
3106 CompressionOptions opts
;
3107 CompressionContext
context(FLAGS_compression_type_e
);
3108 CompressionInfo
info(opts
, context
, CompressionDict::GetEmptyDict(),
3109 FLAGS_compression_type_e
,
3110 FLAGS_sample_for_compression
);
3112 while (ok
&& bytes
< int64_t(1) << 30) {
3114 ok
= CompressSlice(info
, input
, &compressed
);
3115 produced
+= compressed
.size();
3116 bytes
+= input
.size();
3117 thread
->stats
.FinishedOps(nullptr, nullptr, 1, kCompress
);
3121 thread
->stats
.AddMessage("(compression failure)");
3124 snprintf(buf
, sizeof(buf
), "(output: %.1f%%)",
3125 (produced
* 100.0) / bytes
);
3126 thread
->stats
.AddMessage(buf
);
3127 thread
->stats
.AddBytes(bytes
);
3131 void Uncompress(ThreadState
*thread
) {
3132 RandomGenerator gen
;
3133 Slice input
= gen
.Generate(FLAGS_block_size
);
3134 std::string compressed
;
3136 CompressionContext
compression_ctx(FLAGS_compression_type_e
);
3137 CompressionOptions compression_opts
;
3138 CompressionInfo
compression_info(
3139 compression_opts
, compression_ctx
, CompressionDict::GetEmptyDict(),
3140 FLAGS_compression_type_e
, FLAGS_sample_for_compression
);
3141 UncompressionContext
uncompression_ctx(FLAGS_compression_type_e
);
3142 UncompressionInfo
uncompression_info(uncompression_ctx
,
3143 UncompressionDict::GetEmptyDict(),
3144 FLAGS_compression_type_e
);
3146 bool ok
= CompressSlice(compression_info
, input
, &compressed
);
3148 int decompress_size
;
3149 while (ok
&& bytes
< 1024 * 1048576) {
3150 CacheAllocationPtr uncompressed
;
3151 switch (FLAGS_compression_type_e
) {
3152 case rocksdb::kSnappyCompression
: {
3153 // get size and allocate here to make comparison fair
3155 if (!Snappy_GetUncompressedLength(compressed
.data(),
3156 compressed
.size(), &ulength
)) {
3160 uncompressed
= AllocateBlock(ulength
, nullptr);
3161 ok
= Snappy_Uncompress(compressed
.data(), compressed
.size(),
3162 uncompressed
.get());
3165 case rocksdb::kZlibCompression
:
3166 uncompressed
= Zlib_Uncompress(uncompression_info
, compressed
.data(),
3167 compressed
.size(), &decompress_size
, 2);
3168 ok
= uncompressed
.get() != nullptr;
3170 case rocksdb::kBZip2Compression
:
3171 uncompressed
= BZip2_Uncompress(compressed
.data(), compressed
.size(),
3172 &decompress_size
, 2);
3173 ok
= uncompressed
.get() != nullptr;
3175 case rocksdb::kLZ4Compression
:
3176 uncompressed
= LZ4_Uncompress(uncompression_info
, compressed
.data(),
3177 compressed
.size(), &decompress_size
, 2);
3178 ok
= uncompressed
.get() != nullptr;
3180 case rocksdb::kLZ4HCCompression
:
3181 uncompressed
= LZ4_Uncompress(uncompression_info
, compressed
.data(),
3182 compressed
.size(), &decompress_size
, 2);
3183 ok
= uncompressed
.get() != nullptr;
3185 case rocksdb::kXpressCompression
:
3186 uncompressed
.reset(XPRESS_Uncompress(
3187 compressed
.data(), compressed
.size(), &decompress_size
));
3188 ok
= uncompressed
.get() != nullptr;
3190 case rocksdb::kZSTD
:
3191 uncompressed
= ZSTD_Uncompress(uncompression_info
, compressed
.data(),
3192 compressed
.size(), &decompress_size
);
3193 ok
= uncompressed
.get() != nullptr;
3198 bytes
+= input
.size();
3199 thread
->stats
.FinishedOps(nullptr, nullptr, 1, kUncompress
);
3203 thread
->stats
.AddMessage("(compression failure)");
3205 thread
->stats
.AddBytes(bytes
);
3209 // Returns true if the options is initialized from the specified
3211 bool InitializeOptionsFromFile(Options
* opts
) {
3212 #ifndef ROCKSDB_LITE
3213 printf("Initializing RocksDB Options from the specified file\n");
3215 std::vector
<ColumnFamilyDescriptor
> cf_descs
;
3216 if (FLAGS_options_file
!= "") {
3217 auto s
= LoadOptionsFromFile(FLAGS_options_file
, Env::Default(), &db_opts
,
3220 *opts
= Options(db_opts
, cf_descs
[0].options
);
3223 fprintf(stderr
, "Unable to load options file %s --- %s\n",
3224 FLAGS_options_file
.c_str(), s
.ToString().c_str());
3233 void InitializeOptionsFromFlags(Options
* opts
) {
3234 printf("Initializing RocksDB Options from command-line flags\n");
3235 Options
& options
= *opts
;
3237 assert(db_
.db
== nullptr);
3239 options
.max_open_files
= FLAGS_open_files
;
3240 if (FLAGS_cost_write_buffer_to_cache
|| FLAGS_db_write_buffer_size
!= 0) {
3241 options
.write_buffer_manager
.reset(
3242 new WriteBufferManager(FLAGS_db_write_buffer_size
, cache_
));
3244 options
.write_buffer_size
= FLAGS_write_buffer_size
;
3245 options
.max_write_buffer_number
= FLAGS_max_write_buffer_number
;
3246 options
.min_write_buffer_number_to_merge
=
3247 FLAGS_min_write_buffer_number_to_merge
;
3248 options
.max_write_buffer_number_to_maintain
=
3249 FLAGS_max_write_buffer_number_to_maintain
;
3250 options
.max_background_jobs
= FLAGS_max_background_jobs
;
3251 options
.max_background_compactions
= FLAGS_max_background_compactions
;
3252 options
.max_subcompactions
= static_cast<uint32_t>(FLAGS_subcompactions
);
3253 options
.max_background_flushes
= FLAGS_max_background_flushes
;
3254 options
.compaction_style
= FLAGS_compaction_style_e
;
3255 options
.compaction_pri
= FLAGS_compaction_pri_e
;
3256 options
.allow_mmap_reads
= FLAGS_mmap_read
;
3257 options
.allow_mmap_writes
= FLAGS_mmap_write
;
3258 options
.use_direct_reads
= FLAGS_use_direct_reads
;
3259 options
.use_direct_io_for_flush_and_compaction
=
3260 FLAGS_use_direct_io_for_flush_and_compaction
;
3261 #ifndef ROCKSDB_LITE
3262 options
.ttl
= FLAGS_fifo_compaction_ttl
;
3263 options
.compaction_options_fifo
= CompactionOptionsFIFO(
3264 FLAGS_fifo_compaction_max_table_files_size_mb
* 1024 * 1024,
3265 FLAGS_fifo_compaction_allow_compaction
);
3266 #endif // ROCKSDB_LITE
3267 if (FLAGS_prefix_size
!= 0) {
3268 options
.prefix_extractor
.reset(
3269 NewFixedPrefixTransform(FLAGS_prefix_size
));
3271 if (FLAGS_use_uint64_comparator
) {
3272 options
.comparator
= test::Uint64Comparator();
3273 if (FLAGS_key_size
!= 8) {
3274 fprintf(stderr
, "Using Uint64 comparator but key size is not 8.\n");
3278 if (FLAGS_use_stderr_info_logger
) {
3279 options
.info_log
.reset(new StderrLogger());
3281 options
.memtable_huge_page_size
= FLAGS_memtable_use_huge_page
? 2048 : 0;
3282 options
.memtable_prefix_bloom_size_ratio
= FLAGS_memtable_bloom_size_ratio
;
3283 options
.memtable_whole_key_filtering
= FLAGS_memtable_whole_key_filtering
;
3284 if (FLAGS_memtable_insert_with_hint_prefix_size
> 0) {
3285 options
.memtable_insert_with_hint_prefix_extractor
.reset(
3286 NewCappedPrefixTransform(
3287 FLAGS_memtable_insert_with_hint_prefix_size
));
3289 options
.bloom_locality
= FLAGS_bloom_locality
;
3290 options
.max_file_opening_threads
= FLAGS_file_opening_threads
;
3291 options
.new_table_reader_for_compaction_inputs
=
3292 FLAGS_new_table_reader_for_compaction_inputs
;
3293 options
.compaction_readahead_size
= FLAGS_compaction_readahead_size
;
3294 options
.random_access_max_buffer_size
= FLAGS_random_access_max_buffer_size
;
3295 options
.writable_file_max_buffer_size
= FLAGS_writable_file_max_buffer_size
;
3296 options
.use_fsync
= FLAGS_use_fsync
;
3297 options
.num_levels
= FLAGS_num_levels
;
3298 options
.target_file_size_base
= FLAGS_target_file_size_base
;
3299 options
.target_file_size_multiplier
= FLAGS_target_file_size_multiplier
;
3300 options
.max_bytes_for_level_base
= FLAGS_max_bytes_for_level_base
;
3301 options
.level_compaction_dynamic_level_bytes
=
3302 FLAGS_level_compaction_dynamic_level_bytes
;
3303 options
.max_bytes_for_level_multiplier
=
3304 FLAGS_max_bytes_for_level_multiplier
;
3305 if ((FLAGS_prefix_size
== 0) && (FLAGS_rep_factory
== kPrefixHash
||
3306 FLAGS_rep_factory
== kHashLinkedList
)) {
3307 fprintf(stderr
, "prefix_size should be non-zero if PrefixHash or "
3308 "HashLinkedList memtablerep is used\n");
3311 switch (FLAGS_rep_factory
) {
3313 options
.memtable_factory
.reset(new SkipListFactory(
3314 FLAGS_skip_list_lookahead
));
3316 #ifndef ROCKSDB_LITE
3318 options
.memtable_factory
.reset(
3319 NewHashSkipListRepFactory(FLAGS_hash_bucket_count
));
3321 case kHashLinkedList
:
3322 options
.memtable_factory
.reset(NewHashLinkListRepFactory(
3323 FLAGS_hash_bucket_count
));
3326 options
.memtable_factory
.reset(
3327 new VectorRepFactory
3332 fprintf(stderr
, "Only skip list is supported in lite mode\n");
3334 #endif // ROCKSDB_LITE
3336 if (FLAGS_use_plain_table
) {
3337 #ifndef ROCKSDB_LITE
3338 if (FLAGS_rep_factory
!= kPrefixHash
&&
3339 FLAGS_rep_factory
!= kHashLinkedList
) {
3340 fprintf(stderr
, "Waring: plain table is used with skipList\n");
3343 int bloom_bits_per_key
= FLAGS_bloom_bits
;
3344 if (bloom_bits_per_key
< 0) {
3345 bloom_bits_per_key
= 0;
3348 PlainTableOptions plain_table_options
;
3349 plain_table_options
.user_key_len
= FLAGS_key_size
;
3350 plain_table_options
.bloom_bits_per_key
= bloom_bits_per_key
;
3351 plain_table_options
.hash_table_ratio
= 0.75;
3352 options
.table_factory
= std::shared_ptr
<TableFactory
>(
3353 NewPlainTableFactory(plain_table_options
));
3355 fprintf(stderr
, "Plain table is not supported in lite mode\n");
3357 #endif // ROCKSDB_LITE
3358 } else if (FLAGS_use_cuckoo_table
) {
3359 #ifndef ROCKSDB_LITE
3360 if (FLAGS_cuckoo_hash_ratio
> 1 || FLAGS_cuckoo_hash_ratio
< 0) {
3361 fprintf(stderr
, "Invalid cuckoo_hash_ratio\n");
3365 if (!FLAGS_mmap_read
) {
3366 fprintf(stderr
, "cuckoo table format requires mmap read to operate\n");
3370 rocksdb::CuckooTableOptions table_options
;
3371 table_options
.hash_table_ratio
= FLAGS_cuckoo_hash_ratio
;
3372 table_options
.identity_as_first_hash
= FLAGS_identity_as_first_hash
;
3373 options
.table_factory
= std::shared_ptr
<TableFactory
>(
3374 NewCuckooTableFactory(table_options
));
3376 fprintf(stderr
, "Cuckoo table is not supported in lite mode\n");
3378 #endif // ROCKSDB_LITE
3380 BlockBasedTableOptions block_based_options
;
3381 if (FLAGS_use_hash_search
) {
3382 if (FLAGS_prefix_size
== 0) {
3384 "prefix_size not assigned when enable use_hash_search \n");
3387 block_based_options
.index_type
= BlockBasedTableOptions::kHashSearch
;
3389 block_based_options
.index_type
= BlockBasedTableOptions::kBinarySearch
;
3391 if (FLAGS_partition_index_and_filters
|| FLAGS_partition_index
) {
3392 if (FLAGS_use_hash_search
) {
3394 "use_hash_search is incompatible with "
3395 "partition index and is ignored");
3397 block_based_options
.index_type
=
3398 BlockBasedTableOptions::kTwoLevelIndexSearch
;
3399 block_based_options
.metadata_block_size
= FLAGS_metadata_block_size
;
3400 if (FLAGS_partition_index_and_filters
) {
3401 block_based_options
.partition_filters
= true;
3404 if (cache_
== nullptr) {
3405 block_based_options
.no_block_cache
= true;
3407 block_based_options
.cache_index_and_filter_blocks
=
3408 FLAGS_cache_index_and_filter_blocks
;
3409 block_based_options
.pin_l0_filter_and_index_blocks_in_cache
=
3410 FLAGS_pin_l0_filter_and_index_blocks_in_cache
;
3411 block_based_options
.pin_top_level_index_and_filter
=
3412 FLAGS_pin_top_level_index_and_filter
;
3413 if (FLAGS_cache_high_pri_pool_ratio
> 1e-6) { // > 0.0 + eps
3414 block_based_options
.cache_index_and_filter_blocks_with_high_priority
=
3417 block_based_options
.block_cache
= cache_
;
3418 block_based_options
.block_cache_compressed
= compressed_cache_
;
3419 block_based_options
.block_size
= FLAGS_block_size
;
3420 block_based_options
.block_restart_interval
= FLAGS_block_restart_interval
;
3421 block_based_options
.index_block_restart_interval
=
3422 FLAGS_index_block_restart_interval
;
3423 block_based_options
.filter_policy
= filter_policy_
;
3424 block_based_options
.format_version
=
3425 static_cast<uint32_t>(FLAGS_format_version
);
3426 block_based_options
.read_amp_bytes_per_bit
= FLAGS_read_amp_bytes_per_bit
;
3427 block_based_options
.enable_index_compression
=
3428 FLAGS_enable_index_compression
;
3429 block_based_options
.block_align
= FLAGS_block_align
;
3430 if (FLAGS_use_data_block_hash_index
) {
3431 block_based_options
.data_block_index_type
=
3432 rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash
;
3434 block_based_options
.data_block_index_type
=
3435 rocksdb::BlockBasedTableOptions::kDataBlockBinarySearch
;
3437 block_based_options
.data_block_hash_table_util_ratio
=
3438 FLAGS_data_block_hash_table_util_ratio
;
3439 if (FLAGS_read_cache_path
!= "") {
3440 #ifndef ROCKSDB_LITE
3443 // Read cache need to be provided with a the Logger, we will put all
3444 // reac cache logs in the read cache path in a file named rc_LOG
3445 rc_status
= FLAGS_env
->CreateDirIfMissing(FLAGS_read_cache_path
);
3446 std::shared_ptr
<Logger
> read_cache_logger
;
3447 if (rc_status
.ok()) {
3448 rc_status
= FLAGS_env
->NewLogger(FLAGS_read_cache_path
+ "/rc_LOG",
3449 &read_cache_logger
);
3452 if (rc_status
.ok()) {
3453 PersistentCacheConfig
rc_cfg(FLAGS_env
, FLAGS_read_cache_path
,
3454 FLAGS_read_cache_size
,
3457 rc_cfg
.enable_direct_reads
= FLAGS_read_cache_direct_read
;
3458 rc_cfg
.enable_direct_writes
= FLAGS_read_cache_direct_write
;
3459 rc_cfg
.writer_qdepth
= 4;
3460 rc_cfg
.writer_dispatch_size
= 4 * 1024;
3462 auto pcache
= std::make_shared
<BlockCacheTier
>(rc_cfg
);
3463 block_based_options
.persistent_cache
= pcache
;
3464 rc_status
= pcache
->Open();
3467 if (!rc_status
.ok()) {
3468 fprintf(stderr
, "Error initializing read cache, %s\n",
3469 rc_status
.ToString().c_str());
3473 fprintf(stderr
, "Read cache is not supported in LITE\n");
3478 options
.table_factory
.reset(
3479 NewBlockBasedTableFactory(block_based_options
));
3481 if (FLAGS_max_bytes_for_level_multiplier_additional_v
.size() > 0) {
3482 if (FLAGS_max_bytes_for_level_multiplier_additional_v
.size() !=
3483 (unsigned int)FLAGS_num_levels
) {
3484 fprintf(stderr
, "Insufficient number of fanouts specified %d\n",
3485 (int)FLAGS_max_bytes_for_level_multiplier_additional_v
.size());
3488 options
.max_bytes_for_level_multiplier_additional
=
3489 FLAGS_max_bytes_for_level_multiplier_additional_v
;
3491 options
.level0_stop_writes_trigger
= FLAGS_level0_stop_writes_trigger
;
3492 options
.level0_file_num_compaction_trigger
=
3493 FLAGS_level0_file_num_compaction_trigger
;
3494 options
.level0_slowdown_writes_trigger
=
3495 FLAGS_level0_slowdown_writes_trigger
;
3496 options
.compression
= FLAGS_compression_type_e
;
3497 options
.sample_for_compression
= FLAGS_sample_for_compression
;
3498 options
.WAL_ttl_seconds
= FLAGS_wal_ttl_seconds
;
3499 options
.WAL_size_limit_MB
= FLAGS_wal_size_limit_MB
;
3500 options
.max_total_wal_size
= FLAGS_max_total_wal_size
;
3502 if (FLAGS_min_level_to_compress
>= 0) {
3503 assert(FLAGS_min_level_to_compress
<= FLAGS_num_levels
);
3504 options
.compression_per_level
.resize(FLAGS_num_levels
);
3505 for (int i
= 0; i
< FLAGS_min_level_to_compress
; i
++) {
3506 options
.compression_per_level
[i
] = kNoCompression
;
3508 for (int i
= FLAGS_min_level_to_compress
;
3509 i
< FLAGS_num_levels
; i
++) {
3510 options
.compression_per_level
[i
] = FLAGS_compression_type_e
;
3513 options
.soft_rate_limit
= FLAGS_soft_rate_limit
;
3514 options
.hard_rate_limit
= FLAGS_hard_rate_limit
;
3515 options
.soft_pending_compaction_bytes_limit
=
3516 FLAGS_soft_pending_compaction_bytes_limit
;
3517 options
.hard_pending_compaction_bytes_limit
=
3518 FLAGS_hard_pending_compaction_bytes_limit
;
3519 options
.delayed_write_rate
= FLAGS_delayed_write_rate
;
3520 options
.allow_concurrent_memtable_write
=
3521 FLAGS_allow_concurrent_memtable_write
;
3522 options
.inplace_update_support
= FLAGS_inplace_update_support
;
3523 options
.inplace_update_num_locks
= FLAGS_inplace_update_num_locks
;
3524 options
.enable_write_thread_adaptive_yield
=
3525 FLAGS_enable_write_thread_adaptive_yield
;
3526 options
.enable_pipelined_write
= FLAGS_enable_pipelined_write
;
3527 options
.write_thread_max_yield_usec
= FLAGS_write_thread_max_yield_usec
;
3528 options
.write_thread_slow_yield_usec
= FLAGS_write_thread_slow_yield_usec
;
3529 options
.rate_limit_delay_max_milliseconds
=
3530 FLAGS_rate_limit_delay_max_milliseconds
;
3531 options
.table_cache_numshardbits
= FLAGS_table_cache_numshardbits
;
3532 options
.max_compaction_bytes
= FLAGS_max_compaction_bytes
;
3533 options
.disable_auto_compactions
= FLAGS_disable_auto_compactions
;
3534 options
.optimize_filters_for_hits
= FLAGS_optimize_filters_for_hits
;
3536 // fill storage options
3537 options
.advise_random_on_open
= FLAGS_advise_random_on_open
;
3538 options
.access_hint_on_compaction_start
= FLAGS_compaction_fadvice_e
;
3539 options
.use_adaptive_mutex
= FLAGS_use_adaptive_mutex
;
3540 options
.bytes_per_sync
= FLAGS_bytes_per_sync
;
3541 options
.wal_bytes_per_sync
= FLAGS_wal_bytes_per_sync
;
3543 // merge operator options
3544 options
.merge_operator
= MergeOperators::CreateFromStringId(
3545 FLAGS_merge_operator
);
3546 if (options
.merge_operator
== nullptr && !FLAGS_merge_operator
.empty()) {
3547 fprintf(stderr
, "invalid merge operator: %s\n",
3548 FLAGS_merge_operator
.c_str());
3551 options
.max_successive_merges
= FLAGS_max_successive_merges
;
3552 options
.report_bg_io_stats
= FLAGS_report_bg_io_stats
;
3554 // set universal style compaction configurations, if applicable
3555 if (FLAGS_universal_size_ratio
!= 0) {
3556 options
.compaction_options_universal
.size_ratio
=
3557 FLAGS_universal_size_ratio
;
3559 if (FLAGS_universal_min_merge_width
!= 0) {
3560 options
.compaction_options_universal
.min_merge_width
=
3561 FLAGS_universal_min_merge_width
;
3563 if (FLAGS_universal_max_merge_width
!= 0) {
3564 options
.compaction_options_universal
.max_merge_width
=
3565 FLAGS_universal_max_merge_width
;
3567 if (FLAGS_universal_max_size_amplification_percent
!= 0) {
3568 options
.compaction_options_universal
.max_size_amplification_percent
=
3569 FLAGS_universal_max_size_amplification_percent
;
3571 if (FLAGS_universal_compression_size_percent
!= -1) {
3572 options
.compaction_options_universal
.compression_size_percent
=
3573 FLAGS_universal_compression_size_percent
;
3575 options
.compaction_options_universal
.allow_trivial_move
=
3576 FLAGS_universal_allow_trivial_move
;
3577 if (FLAGS_thread_status_per_interval
> 0) {
3578 options
.enable_thread_tracking
= true;
3581 #ifndef ROCKSDB_LITE
3582 if (FLAGS_readonly
&& FLAGS_transaction_db
) {
3583 fprintf(stderr
, "Cannot use readonly flag with transaction_db\n");
3586 #endif // ROCKSDB_LITE
3590 void InitializeOptionsGeneral(Options
* opts
) {
3591 Options
& options
= *opts
;
3593 options
.create_missing_column_families
= FLAGS_num_column_families
> 1;
3594 options
.statistics
= dbstats
;
3595 options
.wal_dir
= FLAGS_wal_dir
;
3596 options
.create_if_missing
= !FLAGS_use_existing_db
;
3597 options
.dump_malloc_stats
= FLAGS_dump_malloc_stats
;
3598 options
.stats_dump_period_sec
=
3599 static_cast<unsigned int>(FLAGS_stats_dump_period_sec
);
3600 options
.stats_persist_period_sec
=
3601 static_cast<unsigned int>(FLAGS_stats_persist_period_sec
);
3602 options
.stats_history_buffer_size
=
3603 static_cast<size_t>(FLAGS_stats_history_buffer_size
);
3605 options
.compression_opts
.level
= FLAGS_compression_level
;
3606 options
.compression_opts
.max_dict_bytes
= FLAGS_compression_max_dict_bytes
;
3607 options
.compression_opts
.zstd_max_train_bytes
=
3608 FLAGS_compression_zstd_max_train_bytes
;
3609 // If this is a block based table, set some related options
3610 if (options
.table_factory
->Name() == BlockBasedTableFactory::kName
&&
3611 options
.table_factory
->GetOptions() != nullptr) {
3612 BlockBasedTableOptions
* table_options
=
3613 reinterpret_cast<BlockBasedTableOptions
*>(
3614 options
.table_factory
->GetOptions());
3615 if (FLAGS_cache_size
) {
3616 table_options
->block_cache
= cache_
;
3618 if (FLAGS_bloom_bits
>= 0) {
3619 table_options
->filter_policy
.reset(NewBloomFilterPolicy(
3620 FLAGS_bloom_bits
, FLAGS_use_block_based_filter
));
3623 if (FLAGS_row_cache_size
) {
3624 if (FLAGS_cache_numshardbits
>= 1) {
3626 NewLRUCache(FLAGS_row_cache_size
, FLAGS_cache_numshardbits
);
3628 options
.row_cache
= NewLRUCache(FLAGS_row_cache_size
);
3631 if (FLAGS_enable_io_prio
) {
3632 FLAGS_env
->LowerThreadPoolIOPriority(Env::LOW
);
3633 FLAGS_env
->LowerThreadPoolIOPriority(Env::HIGH
);
3635 if (FLAGS_enable_cpu_prio
) {
3636 FLAGS_env
->LowerThreadPoolCPUPriority(Env::LOW
);
3637 FLAGS_env
->LowerThreadPoolCPUPriority(Env::HIGH
);
3639 options
.env
= FLAGS_env
;
3640 if (FLAGS_sine_write_rate
) {
3641 FLAGS_benchmark_write_rate_limit
= static_cast<uint64_t>(SineRate(0));
3644 if (FLAGS_rate_limiter_bytes_per_sec
> 0) {
3645 if (FLAGS_rate_limit_bg_reads
&&
3646 !FLAGS_new_table_reader_for_compaction_inputs
) {
3648 "rate limit compaction reads must have "
3649 "new_table_reader_for_compaction_inputs set\n");
3652 options
.rate_limiter
.reset(NewGenericRateLimiter(
3653 FLAGS_rate_limiter_bytes_per_sec
, 100 * 1000 /* refill_period_us */,
3655 FLAGS_rate_limit_bg_reads
? RateLimiter::Mode::kReadsOnly
3656 : RateLimiter::Mode::kWritesOnly
,
3657 FLAGS_rate_limiter_auto_tuned
));
3660 options
.listeners
.emplace_back(listener_
);
3661 if (FLAGS_num_multi_db
<= 1) {
3662 OpenDb(options
, FLAGS_db
, &db_
);
3665 multi_dbs_
.resize(FLAGS_num_multi_db
);
3666 auto wal_dir
= options
.wal_dir
;
3667 for (int i
= 0; i
< FLAGS_num_multi_db
; i
++) {
3668 if (!wal_dir
.empty()) {
3669 options
.wal_dir
= GetPathForMultiple(wal_dir
, i
);
3671 OpenDb(options
, GetPathForMultiple(FLAGS_db
, i
), &multi_dbs_
[i
]);
3673 options
.wal_dir
= wal_dir
;
3676 // KeepFilter is a noop filter, this can be used to test compaction filter
3677 if (FLAGS_use_keep_filter
) {
3678 options
.compaction_filter
= new KeepFilter();
3679 fprintf(stdout
, "A noop compaction filter is used\n");
3682 if (FLAGS_use_existing_keys
) {
3683 // Only work on single database
3684 assert(db_
.db
!= nullptr);
3685 ReadOptions read_opts
;
3686 read_opts
.total_order_seek
= true;
3687 Iterator
* iter
= db_
.db
->NewIterator(read_opts
);
3688 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
3689 keys_
.emplace_back(iter
->key().ToString());
3692 FLAGS_num
= keys_
.size();
3696 void Open(Options
* opts
) {
3697 if (!InitializeOptionsFromFile(opts
)) {
3698 InitializeOptionsFromFlags(opts
);
3701 InitializeOptionsGeneral(opts
);
3704 void OpenDb(Options options
, const std::string
& db_name
,
3705 DBWithColumnFamilies
* db
) {
3707 // Open with column families if necessary.
3708 if (FLAGS_num_column_families
> 1) {
3709 size_t num_hot
= FLAGS_num_column_families
;
3710 if (FLAGS_num_hot_column_families
> 0 &&
3711 FLAGS_num_hot_column_families
< FLAGS_num_column_families
) {
3712 num_hot
= FLAGS_num_hot_column_families
;
3714 FLAGS_num_hot_column_families
= FLAGS_num_column_families
;
3716 std::vector
<ColumnFamilyDescriptor
> column_families
;
3717 for (size_t i
= 0; i
< num_hot
; i
++) {
3718 column_families
.push_back(ColumnFamilyDescriptor(
3719 ColumnFamilyName(i
), ColumnFamilyOptions(options
)));
3721 std::vector
<int> cfh_idx_to_prob
;
3722 if (!FLAGS_column_family_distribution
.empty()) {
3723 std::stringstream
cf_prob_stream(FLAGS_column_family_distribution
);
3724 std::string cf_prob
;
3726 while (std::getline(cf_prob_stream
, cf_prob
, ',')) {
3727 cfh_idx_to_prob
.push_back(std::stoi(cf_prob
));
3728 sum
+= cfh_idx_to_prob
.back();
3731 fprintf(stderr
, "column_family_distribution items must sum to 100\n");
3734 if (cfh_idx_to_prob
.size() != num_hot
) {
3736 "got %" ROCKSDB_PRIszt
3737 " column_family_distribution items; expected "
3738 "%" ROCKSDB_PRIszt
"\n",
3739 cfh_idx_to_prob
.size(), num_hot
);
3743 #ifndef ROCKSDB_LITE
3744 if (FLAGS_readonly
) {
3745 s
= DB::OpenForReadOnly(options
, db_name
, column_families
,
3747 } else if (FLAGS_optimistic_transaction_db
) {
3748 s
= OptimisticTransactionDB::Open(options
, db_name
, column_families
,
3749 &db
->cfh
, &db
->opt_txn_db
);
3751 db
->db
= db
->opt_txn_db
->GetBaseDB();
3753 } else if (FLAGS_transaction_db
) {
3755 TransactionDBOptions txn_db_options
;
3756 s
= TransactionDB::Open(options
, txn_db_options
, db_name
,
3757 column_families
, &db
->cfh
, &ptr
);
3762 s
= DB::Open(options
, db_name
, column_families
, &db
->cfh
, &db
->db
);
3765 s
= DB::Open(options
, db_name
, column_families
, &db
->cfh
, &db
->db
);
3766 #endif // ROCKSDB_LITE
3767 db
->cfh
.resize(FLAGS_num_column_families
);
3768 db
->num_created
= num_hot
;
3769 db
->num_hot
= num_hot
;
3770 db
->cfh_idx_to_prob
= std::move(cfh_idx_to_prob
);
3771 #ifndef ROCKSDB_LITE
3772 } else if (FLAGS_readonly
) {
3773 s
= DB::OpenForReadOnly(options
, db_name
, &db
->db
);
3774 } else if (FLAGS_optimistic_transaction_db
) {
3775 s
= OptimisticTransactionDB::Open(options
, db_name
, &db
->opt_txn_db
);
3777 db
->db
= db
->opt_txn_db
->GetBaseDB();
3779 } else if (FLAGS_transaction_db
) {
3780 TransactionDB
* ptr
= nullptr;
3781 TransactionDBOptions txn_db_options
;
3782 s
= CreateLoggerFromOptions(db_name
, options
, &options
.info_log
);
3784 s
= TransactionDB::Open(options
, txn_db_options
, db_name
, &ptr
);
3789 } else if (FLAGS_use_blob_db
) {
3790 blob_db::BlobDBOptions blob_db_options
;
3791 blob_db_options
.enable_garbage_collection
= FLAGS_blob_db_enable_gc
;
3792 blob_db_options
.is_fifo
= FLAGS_blob_db_is_fifo
;
3793 blob_db_options
.max_db_size
= FLAGS_blob_db_max_db_size
;
3794 blob_db_options
.ttl_range_secs
= FLAGS_blob_db_ttl_range_secs
;
3795 blob_db_options
.min_blob_size
= FLAGS_blob_db_min_blob_size
;
3796 blob_db_options
.bytes_per_sync
= FLAGS_blob_db_bytes_per_sync
;
3797 blob_db_options
.blob_file_size
= FLAGS_blob_db_file_size
;
3798 blob_db::BlobDB
* ptr
= nullptr;
3799 s
= blob_db::BlobDB::Open(options
, blob_db_options
, db_name
, &ptr
);
3803 #endif // ROCKSDB_LITE
3805 s
= DB::Open(options
, db_name
, &db
->db
);
3808 fprintf(stderr
, "open error: %s\n", s
.ToString().c_str());
3814 RANDOM
, SEQUENTIAL
, UNIQUE_RANDOM
3817 void WriteSeqDeterministic(ThreadState
* thread
) {
3818 DoDeterministicCompact(thread
, open_options_
.compaction_style
, SEQUENTIAL
);
3821 void WriteUniqueRandomDeterministic(ThreadState
* thread
) {
3822 DoDeterministicCompact(thread
, open_options_
.compaction_style
,
3826 void WriteSeq(ThreadState
* thread
) {
3827 DoWrite(thread
, SEQUENTIAL
);
3830 void WriteRandom(ThreadState
* thread
) {
3831 DoWrite(thread
, RANDOM
);
3834 void WriteUniqueRandom(ThreadState
* thread
) {
3835 DoWrite(thread
, UNIQUE_RANDOM
);
3838 class KeyGenerator
{
3840 KeyGenerator(Random64
* rand
, WriteMode mode
, uint64_t num
,
3841 uint64_t /*num_per_set*/ = 64 * 1024)
3842 : rand_(rand
), mode_(mode
), num_(num
), next_(0) {
3843 if (mode_
== UNIQUE_RANDOM
) {
3844 // NOTE: if memory consumption of this approach becomes a concern,
3845 // we can either break it into pieces and only random shuffle a section
3846 // each time. Alternatively, use a bit map implementation
3847 // (https://reviews.facebook.net/differential/diff/54627/)
3848 values_
.resize(num_
);
3849 for (uint64_t i
= 0; i
< num_
; ++i
) {
3853 values_
.begin(), values_
.end(),
3854 std::default_random_engine(static_cast<unsigned int>(FLAGS_seed
)));
3863 return rand_
->Next() % num_
;
3865 assert(next_
< num_
);
3866 return values_
[next_
++];
3869 return std::numeric_limits
<uint64_t>::max();
3875 const uint64_t num_
;
3877 std::vector
<uint64_t> values_
;
3880 DB
* SelectDB(ThreadState
* thread
) {
3881 return SelectDBWithCfh(thread
)->db
;
3884 DBWithColumnFamilies
* SelectDBWithCfh(ThreadState
* thread
) {
3885 return SelectDBWithCfh(thread
->rand
.Next());
3888 DBWithColumnFamilies
* SelectDBWithCfh(uint64_t rand_int
) {
3889 if (db_
.db
!= nullptr) {
3892 return &multi_dbs_
[rand_int
% multi_dbs_
.size()];
3896 double SineRate(double x
) {
3897 return FLAGS_sine_a
*sin((FLAGS_sine_b
*x
) + FLAGS_sine_c
) + FLAGS_sine_d
;
3900 void DoWrite(ThreadState
* thread
, WriteMode write_mode
) {
3901 const int test_duration
= write_mode
== RANDOM
? FLAGS_duration
: 0;
3902 const int64_t num_ops
= writes_
== 0 ? num_
: writes_
;
3904 size_t num_key_gens
= 1;
3905 if (db_
.db
== nullptr) {
3906 num_key_gens
= multi_dbs_
.size();
3908 std::vector
<std::unique_ptr
<KeyGenerator
>> key_gens(num_key_gens
);
3909 int64_t max_ops
= num_ops
* num_key_gens
;
3910 int64_t ops_per_stage
= max_ops
;
3911 if (FLAGS_num_column_families
> 1 && FLAGS_num_hot_column_families
> 0) {
3912 ops_per_stage
= (max_ops
- 1) / (FLAGS_num_column_families
/
3913 FLAGS_num_hot_column_families
) +
3917 Duration
duration(test_duration
, max_ops
, ops_per_stage
);
3918 for (size_t i
= 0; i
< num_key_gens
; i
++) {
3919 key_gens
[i
].reset(new KeyGenerator(&(thread
->rand
), write_mode
,
3920 num_
+ max_num_range_tombstones_
,
3924 if (num_
!= FLAGS_num
) {
3926 snprintf(msg
, sizeof(msg
), "(%" PRIu64
" ops)", num_
);
3927 thread
->stats
.AddMessage(msg
);
3930 RandomGenerator gen
;
3935 std::unique_ptr
<const char[]> key_guard
;
3936 Slice key
= AllocateKey(&key_guard
);
3937 std::unique_ptr
<const char[]> begin_key_guard
;
3938 Slice begin_key
= AllocateKey(&begin_key_guard
);
3939 std::unique_ptr
<const char[]> end_key_guard
;
3940 Slice end_key
= AllocateKey(&end_key_guard
);
3941 std::vector
<std::unique_ptr
<const char[]>> expanded_key_guards
;
3942 std::vector
<Slice
> expanded_keys
;
3943 if (FLAGS_expand_range_tombstones
) {
3944 expanded_key_guards
.resize(range_tombstone_width_
);
3945 for (auto& expanded_key_guard
: expanded_key_guards
) {
3946 expanded_keys
.emplace_back(AllocateKey(&expanded_key_guard
));
3951 int64_t num_written
= 0;
3952 while (!duration
.Done(entries_per_batch_
)) {
3953 if (duration
.GetStage() != stage
) {
3954 stage
= duration
.GetStage();
3955 if (db_
.db
!= nullptr) {
3956 db_
.CreateNewCf(open_options_
, stage
);
3958 for (auto& db
: multi_dbs_
) {
3959 db
.CreateNewCf(open_options_
, stage
);
3964 size_t id
= thread
->rand
.Next() % num_key_gens
;
3965 DBWithColumnFamilies
* db_with_cfh
= SelectDBWithCfh(id
);
3968 if (thread
->shared
->write_rate_limiter
.get() != nullptr) {
3969 thread
->shared
->write_rate_limiter
->Request(
3970 entries_per_batch_
* (value_size_
+ key_size_
), Env::IO_HIGH
,
3971 nullptr /* stats */, RateLimiter::OpType::kWrite
);
3972 // Set time at which last op finished to Now() to hide latency and
3973 // sleep from rate limiter. Also, do the check once per batch, not
3975 thread
->stats
.ResetLastOpTime();
3978 for (int64_t j
= 0; j
< entries_per_batch_
; j
++) {
3979 int64_t rand_num
= key_gens
[id
]->Next();
3980 GenerateKeyFromInt(rand_num
, FLAGS_num
, &key
);
3982 #ifndef ROCKSDB_LITE
3983 Slice val
= gen
.Generate(value_size_
);
3984 int ttl
= rand() % FLAGS_blob_db_max_ttl_range
;
3985 blob_db::BlobDB
* blobdb
=
3986 static_cast<blob_db::BlobDB
*>(db_with_cfh
->db
);
3987 s
= blobdb
->PutWithTTL(write_options_
, key
, val
, ttl
);
3988 #endif // ROCKSDB_LITE
3989 } else if (FLAGS_num_column_families
<= 1) {
3990 batch
.Put(key
, gen
.Generate(value_size_
));
3992 // We use same rand_num as seed for key and column family so that we
3993 // can deterministically find the cfh corresponding to a particular
3994 // key while reading the key.
3995 batch
.Put(db_with_cfh
->GetCfh(rand_num
), key
,
3996 gen
.Generate(value_size_
));
3998 bytes
+= value_size_
+ key_size_
;
4000 if (writes_per_range_tombstone_
> 0 &&
4001 num_written
> writes_before_delete_range_
&&
4002 (num_written
- writes_before_delete_range_
) /
4003 writes_per_range_tombstone_
<=
4004 max_num_range_tombstones_
&&
4005 (num_written
- writes_before_delete_range_
) %
4006 writes_per_range_tombstone_
==
4008 int64_t begin_num
= key_gens
[id
]->Next();
4009 if (FLAGS_expand_range_tombstones
) {
4010 for (int64_t offset
= 0; offset
< range_tombstone_width_
;
4012 GenerateKeyFromInt(begin_num
+ offset
, FLAGS_num
,
4013 &expanded_keys
[offset
]);
4015 #ifndef ROCKSDB_LITE
4016 s
= db_with_cfh
->db
->Delete(write_options_
,
4017 expanded_keys
[offset
]);
4018 #endif // ROCKSDB_LITE
4019 } else if (FLAGS_num_column_families
<= 1) {
4020 batch
.Delete(expanded_keys
[offset
]);
4022 batch
.Delete(db_with_cfh
->GetCfh(rand_num
),
4023 expanded_keys
[offset
]);
4027 GenerateKeyFromInt(begin_num
, FLAGS_num
, &begin_key
);
4028 GenerateKeyFromInt(begin_num
+ range_tombstone_width_
, FLAGS_num
,
4031 #ifndef ROCKSDB_LITE
4032 s
= db_with_cfh
->db
->DeleteRange(
4033 write_options_
, db_with_cfh
->db
->DefaultColumnFamily(),
4034 begin_key
, end_key
);
4035 #endif // ROCKSDB_LITE
4036 } else if (FLAGS_num_column_families
<= 1) {
4037 batch
.DeleteRange(begin_key
, end_key
);
4039 batch
.DeleteRange(db_with_cfh
->GetCfh(rand_num
), begin_key
,
4045 if (!use_blob_db_
) {
4046 s
= db_with_cfh
->db
->Write(write_options_
, &batch
);
4048 thread
->stats
.FinishedOps(db_with_cfh
, db_with_cfh
->db
,
4049 entries_per_batch_
, kWrite
);
4050 if (FLAGS_sine_write_rate
) {
4051 uint64_t now
= FLAGS_env
->NowMicros();
4053 uint64_t usecs_since_last
;
4054 if (now
> thread
->stats
.GetSineInterval()) {
4055 usecs_since_last
= now
- thread
->stats
.GetSineInterval();
4057 usecs_since_last
= 0;
4060 if (usecs_since_last
>
4061 (FLAGS_sine_write_rate_interval_milliseconds
* uint64_t{1000})) {
4062 double usecs_since_start
=
4063 static_cast<double>(now
- thread
->stats
.GetStart());
4064 thread
->stats
.ResetSineInterval();
4065 uint64_t write_rate
=
4066 static_cast<uint64_t>(SineRate(usecs_since_start
/ 1000000.0));
4067 thread
->shared
->write_rate_limiter
.reset(
4068 NewGenericRateLimiter(write_rate
));
4072 s
= listener_
->WaitForRecovery(600000000) ? Status::OK() : s
;
4076 fprintf(stderr
, "put error: %s\n", s
.ToString().c_str());
4080 thread
->stats
.AddBytes(bytes
);
4083 Status
DoDeterministicCompact(ThreadState
* thread
,
4084 CompactionStyle compaction_style
,
4085 WriteMode write_mode
) {
4086 #ifndef ROCKSDB_LITE
4087 ColumnFamilyMetaData meta
;
4088 std::vector
<DB
*> db_list
;
4089 if (db_
.db
!= nullptr) {
4090 db_list
.push_back(db_
.db
);
4092 for (auto& db
: multi_dbs_
) {
4093 db_list
.push_back(db
.db
);
4096 std::vector
<Options
> options_list
;
4097 for (auto db
: db_list
) {
4098 options_list
.push_back(db
->GetOptions());
4099 if (compaction_style
!= kCompactionStyleFIFO
) {
4100 db
->SetOptions({{"disable_auto_compactions", "1"},
4101 {"level0_slowdown_writes_trigger", "400000000"},
4102 {"level0_stop_writes_trigger", "400000000"}});
4104 db
->SetOptions({{"disable_auto_compactions", "1"}});
4108 assert(!db_list
.empty());
4109 auto num_db
= db_list
.size();
4110 size_t num_levels
= static_cast<size_t>(open_options_
.num_levels
);
4111 size_t output_level
= open_options_
.num_levels
- 1;
4112 std::vector
<std::vector
<std::vector
<SstFileMetaData
>>> sorted_runs(num_db
);
4113 std::vector
<size_t> num_files_at_level0(num_db
, 0);
4114 if (compaction_style
== kCompactionStyleLevel
) {
4115 if (num_levels
== 0) {
4116 return Status::InvalidArgument("num_levels should be larger than 1");
4118 bool should_stop
= false;
4119 while (!should_stop
) {
4120 if (sorted_runs
[0].empty()) {
4121 DoWrite(thread
, write_mode
);
4123 DoWrite(thread
, UNIQUE_RANDOM
);
4125 for (size_t i
= 0; i
< num_db
; i
++) {
4126 auto db
= db_list
[i
];
4127 db
->Flush(FlushOptions());
4128 db
->GetColumnFamilyMetaData(&meta
);
4129 if (num_files_at_level0
[i
] == meta
.levels
[0].files
.size() ||
4134 sorted_runs
[i
].emplace_back(
4135 meta
.levels
[0].files
.begin(),
4136 meta
.levels
[0].files
.end() - num_files_at_level0
[i
]);
4137 num_files_at_level0
[i
] = meta
.levels
[0].files
.size();
4138 if (sorted_runs
[i
].back().size() == 1) {
4142 if (sorted_runs
[i
].size() == output_level
) {
4143 auto& L1
= sorted_runs
[i
].back();
4144 L1
.erase(L1
.begin(), L1
.begin() + L1
.size() / 3);
4149 writes_
/= static_cast<int64_t>(open_options_
.max_bytes_for_level_multiplier
);
4151 for (size_t i
= 0; i
< num_db
; i
++) {
4152 if (sorted_runs
[i
].size() < num_levels
- 1) {
4153 fprintf(stderr
, "n is too small to fill %" ROCKSDB_PRIszt
" levels\n", num_levels
);
4157 for (size_t i
= 0; i
< num_db
; i
++) {
4158 auto db
= db_list
[i
];
4159 auto compactionOptions
= CompactionOptions();
4160 compactionOptions
.compression
= FLAGS_compression_type_e
;
4161 auto options
= db
->GetOptions();
4162 MutableCFOptions
mutable_cf_options(options
);
4163 for (size_t j
= 0; j
< sorted_runs
[i
].size(); j
++) {
4164 compactionOptions
.output_file_size_limit
=
4165 MaxFileSizeForLevel(mutable_cf_options
,
4166 static_cast<int>(output_level
), compaction_style
);
4167 std::cout
<< sorted_runs
[i
][j
].size() << std::endl
;
4168 db
->CompactFiles(compactionOptions
, {sorted_runs
[i
][j
].back().name
,
4169 sorted_runs
[i
][j
].front().name
},
4170 static_cast<int>(output_level
- j
) /*level*/);
4173 } else if (compaction_style
== kCompactionStyleUniversal
) {
4174 auto ratio
= open_options_
.compaction_options_universal
.size_ratio
;
4175 bool should_stop
= false;
4176 while (!should_stop
) {
4177 if (sorted_runs
[0].empty()) {
4178 DoWrite(thread
, write_mode
);
4180 DoWrite(thread
, UNIQUE_RANDOM
);
4182 for (size_t i
= 0; i
< num_db
; i
++) {
4183 auto db
= db_list
[i
];
4184 db
->Flush(FlushOptions());
4185 db
->GetColumnFamilyMetaData(&meta
);
4186 if (num_files_at_level0
[i
] == meta
.levels
[0].files
.size() ||
4191 sorted_runs
[i
].emplace_back(
4192 meta
.levels
[0].files
.begin(),
4193 meta
.levels
[0].files
.end() - num_files_at_level0
[i
]);
4194 num_files_at_level0
[i
] = meta
.levels
[0].files
.size();
4195 if (sorted_runs
[i
].back().size() == 1) {
4199 num_files_at_level0
[i
] = meta
.levels
[0].files
.size();
4201 writes_
= static_cast<int64_t>(writes_
* static_cast<double>(100) / (ratio
+ 200));
4203 for (size_t i
= 0; i
< num_db
; i
++) {
4204 if (sorted_runs
[i
].size() < num_levels
) {
4205 fprintf(stderr
, "n is too small to fill %" ROCKSDB_PRIszt
" levels\n", num_levels
);
4209 for (size_t i
= 0; i
< num_db
; i
++) {
4210 auto db
= db_list
[i
];
4211 auto compactionOptions
= CompactionOptions();
4212 compactionOptions
.compression
= FLAGS_compression_type_e
;
4213 auto options
= db
->GetOptions();
4214 MutableCFOptions
mutable_cf_options(options
);
4215 for (size_t j
= 0; j
< sorted_runs
[i
].size(); j
++) {
4216 compactionOptions
.output_file_size_limit
=
4217 MaxFileSizeForLevel(mutable_cf_options
,
4218 static_cast<int>(output_level
), compaction_style
);
4221 {sorted_runs
[i
][j
].back().name
, sorted_runs
[i
][j
].front().name
},
4222 (output_level
> j
? static_cast<int>(output_level
- j
)
4226 } else if (compaction_style
== kCompactionStyleFIFO
) {
4227 if (num_levels
!= 1) {
4228 return Status::InvalidArgument(
4229 "num_levels should be 1 for FIFO compaction");
4231 if (FLAGS_num_multi_db
!= 0) {
4232 return Status::InvalidArgument("Doesn't support multiDB");
4234 auto db
= db_list
[0];
4235 std::vector
<std::string
> file_names
;
4237 if (sorted_runs
[0].empty()) {
4238 DoWrite(thread
, write_mode
);
4240 DoWrite(thread
, UNIQUE_RANDOM
);
4242 db
->Flush(FlushOptions());
4243 db
->GetColumnFamilyMetaData(&meta
);
4244 auto total_size
= meta
.levels
[0].size
;
4246 db
->GetOptions().compaction_options_fifo
.max_table_files_size
) {
4247 for (auto file_meta
: meta
.levels
[0].files
) {
4248 file_names
.emplace_back(file_meta
.name
);
4253 // TODO(shuzhang1989): Investigate why CompactFiles not working
4254 // auto compactionOptions = CompactionOptions();
4255 // db->CompactFiles(compactionOptions, file_names, 0);
4256 auto compactionOptions
= CompactRangeOptions();
4257 db
->CompactRange(compactionOptions
, nullptr, nullptr);
4260 "%-12s : skipped (-compaction_stype=kCompactionStyleNone)\n",
4261 "filldeterministic");
4262 return Status::InvalidArgument("None compaction is not supported");
4265 // Verify seqno and key range
4266 // Note: the seqno get changed at the max level by implementation
4267 // optimization, so skip the check of the max level.
4269 for (size_t k
= 0; k
< num_db
; k
++) {
4270 auto db
= db_list
[k
];
4271 db
->GetColumnFamilyMetaData(&meta
);
4272 // verify the number of sorted runs
4273 if (compaction_style
== kCompactionStyleLevel
) {
4274 assert(num_levels
- 1 == sorted_runs
[k
].size());
4275 } else if (compaction_style
== kCompactionStyleUniversal
) {
4276 assert(meta
.levels
[0].files
.size() + num_levels
- 1 ==
4277 sorted_runs
[k
].size());
4278 } else if (compaction_style
== kCompactionStyleFIFO
) {
4279 // TODO(gzh): FIFO compaction
4280 db
->GetColumnFamilyMetaData(&meta
);
4281 auto total_size
= meta
.levels
[0].size
;
4282 assert(total_size
<=
4283 db
->GetOptions().compaction_options_fifo
.max_table_files_size
);
4287 // verify smallest/largest seqno and key range of each sorted run
4288 auto max_level
= num_levels
- 1;
4290 for (size_t i
= 0; i
< sorted_runs
[k
].size(); i
++) {
4291 level
= static_cast<int>(max_level
- i
);
4292 SequenceNumber sorted_run_smallest_seqno
= kMaxSequenceNumber
;
4293 SequenceNumber sorted_run_largest_seqno
= 0;
4294 std::string sorted_run_smallest_key
, sorted_run_largest_key
;
4295 bool first_key
= true;
4296 for (auto fileMeta
: sorted_runs
[k
][i
]) {
4297 sorted_run_smallest_seqno
=
4298 std::min(sorted_run_smallest_seqno
, fileMeta
.smallest_seqno
);
4299 sorted_run_largest_seqno
=
4300 std::max(sorted_run_largest_seqno
, fileMeta
.largest_seqno
);
4302 db
->DefaultColumnFamily()->GetComparator()->Compare(
4303 fileMeta
.smallestkey
, sorted_run_smallest_key
) < 0) {
4304 sorted_run_smallest_key
= fileMeta
.smallestkey
;
4307 db
->DefaultColumnFamily()->GetComparator()->Compare(
4308 fileMeta
.largestkey
, sorted_run_largest_key
) > 0) {
4309 sorted_run_largest_key
= fileMeta
.largestkey
;
4313 if (compaction_style
== kCompactionStyleLevel
||
4314 (compaction_style
== kCompactionStyleUniversal
&& level
> 0)) {
4315 SequenceNumber level_smallest_seqno
= kMaxSequenceNumber
;
4316 SequenceNumber level_largest_seqno
= 0;
4317 for (auto fileMeta
: meta
.levels
[level
].files
) {
4318 level_smallest_seqno
=
4319 std::min(level_smallest_seqno
, fileMeta
.smallest_seqno
);
4320 level_largest_seqno
=
4321 std::max(level_largest_seqno
, fileMeta
.largest_seqno
);
4323 assert(sorted_run_smallest_key
==
4324 meta
.levels
[level
].files
.front().smallestkey
);
4325 assert(sorted_run_largest_key
==
4326 meta
.levels
[level
].files
.back().largestkey
);
4327 if (level
!= static_cast<int>(max_level
)) {
4328 // compaction at max_level would change sequence number
4329 assert(sorted_run_smallest_seqno
== level_smallest_seqno
);
4330 assert(sorted_run_largest_seqno
== level_largest_seqno
);
4332 } else if (compaction_style
== kCompactionStyleUniversal
) {
4333 // level <= 0 means sorted runs on level 0
4335 meta
.levels
[0].files
[sorted_runs
[k
].size() - 1 - i
];
4336 assert(sorted_run_smallest_key
== level0_file
.smallestkey
);
4337 assert(sorted_run_largest_key
== level0_file
.largestkey
);
4338 if (level
!= static_cast<int>(max_level
)) {
4339 assert(sorted_run_smallest_seqno
== level0_file
.smallest_seqno
);
4340 assert(sorted_run_largest_seqno
== level0_file
.largest_seqno
);
4346 // print the size of each sorted_run
4347 for (size_t k
= 0; k
< num_db
; k
++) {
4348 auto db
= db_list
[k
];
4350 "---------------------- DB %" ROCKSDB_PRIszt
" LSM ---------------------\n", k
);
4351 db
->GetColumnFamilyMetaData(&meta
);
4352 for (auto& levelMeta
: meta
.levels
) {
4353 if (levelMeta
.files
.empty()) {
4356 if (levelMeta
.level
== 0) {
4357 for (auto& fileMeta
: levelMeta
.files
) {
4358 fprintf(stdout
, "Level[%d]: %s(size: %" ROCKSDB_PRIszt
" bytes)\n",
4359 levelMeta
.level
, fileMeta
.name
.c_str(), fileMeta
.size
);
4362 fprintf(stdout
, "Level[%d]: %s - %s(total size: %" PRIi64
" bytes)\n",
4363 levelMeta
.level
, levelMeta
.files
.front().name
.c_str(),
4364 levelMeta
.files
.back().name
.c_str(), levelMeta
.size
);
4368 for (size_t i
= 0; i
< num_db
; i
++) {
4369 db_list
[i
]->SetOptions(
4370 {{"disable_auto_compactions",
4371 std::to_string(options_list
[i
].disable_auto_compactions
)},
4372 {"level0_slowdown_writes_trigger",
4373 std::to_string(options_list
[i
].level0_slowdown_writes_trigger
)},
4374 {"level0_stop_writes_trigger",
4375 std::to_string(options_list
[i
].level0_stop_writes_trigger
)}});
4377 return Status::OK();
4380 (void)compaction_style
;
4382 fprintf(stderr
, "Rocksdb Lite doesn't support filldeterministic\n");
4383 return Status::NotSupported(
4384 "Rocksdb Lite doesn't support filldeterministic");
4385 #endif // ROCKSDB_LITE
4388 void ReadSequential(ThreadState
* thread
) {
4389 if (db_
.db
!= nullptr) {
4390 ReadSequential(thread
, db_
.db
);
4392 for (const auto& db_with_cfh
: multi_dbs_
) {
4393 ReadSequential(thread
, db_with_cfh
.db
);
4398 void ReadSequential(ThreadState
* thread
, DB
* db
) {
4399 ReadOptions
options(FLAGS_verify_checksum
, true);
4400 options
.tailing
= FLAGS_use_tailing_iterator
;
4402 Iterator
* iter
= db
->NewIterator(options
);
4405 for (iter
->SeekToFirst(); i
< reads_
&& iter
->Valid(); iter
->Next()) {
4406 bytes
+= iter
->key().size() + iter
->value().size();
4407 thread
->stats
.FinishedOps(nullptr, db
, 1, kRead
);
4410 if (thread
->shared
->read_rate_limiter
.get() != nullptr &&
4412 thread
->shared
->read_rate_limiter
->Request(1024, Env::IO_HIGH
,
4413 nullptr /* stats */,
4414 RateLimiter::OpType::kRead
);
4419 thread
->stats
.AddBytes(bytes
);
4420 if (FLAGS_perf_level
> rocksdb::PerfLevel::kDisable
) {
4421 thread
->stats
.AddMessage(std::string("PERF_CONTEXT:\n") +
4422 get_perf_context()->ToString());
4426 void ReadReverse(ThreadState
* thread
) {
4427 if (db_
.db
!= nullptr) {
4428 ReadReverse(thread
, db_
.db
);
4430 for (const auto& db_with_cfh
: multi_dbs_
) {
4431 ReadReverse(thread
, db_with_cfh
.db
);
4436 void ReadReverse(ThreadState
* thread
, DB
* db
) {
4437 Iterator
* iter
= db
->NewIterator(ReadOptions(FLAGS_verify_checksum
, true));
4440 for (iter
->SeekToLast(); i
< reads_
&& iter
->Valid(); iter
->Prev()) {
4441 bytes
+= iter
->key().size() + iter
->value().size();
4442 thread
->stats
.FinishedOps(nullptr, db
, 1, kRead
);
4444 if (thread
->shared
->read_rate_limiter
.get() != nullptr &&
4446 thread
->shared
->read_rate_limiter
->Request(1024, Env::IO_HIGH
,
4447 nullptr /* stats */,
4448 RateLimiter::OpType::kRead
);
4452 thread
->stats
.AddBytes(bytes
);
4455 void ReadRandomFast(ThreadState
* thread
) {
4458 int64_t nonexist
= 0;
4459 ReadOptions
options(FLAGS_verify_checksum
, true);
4460 std::unique_ptr
<const char[]> key_guard
;
4461 Slice key
= AllocateKey(&key_guard
);
4463 DB
* db
= SelectDBWithCfh(thread
)->db
;
4466 while (pot
< FLAGS_num
) {
4470 Duration
duration(FLAGS_duration
, reads_
);
4472 for (int i
= 0; i
< 100; ++i
) {
4473 int64_t key_rand
= thread
->rand
.Next() & (pot
- 1);
4474 GenerateKeyFromInt(key_rand
, FLAGS_num
, &key
);
4476 auto status
= db
->Get(options
, key
, &value
);
4479 } else if (!status
.IsNotFound()) {
4480 fprintf(stderr
, "Get returned an error: %s\n",
4481 status
.ToString().c_str());
4484 if (key_rand
>= FLAGS_num
) {
4488 if (thread
->shared
->read_rate_limiter
.get() != nullptr) {
4489 thread
->shared
->read_rate_limiter
->Request(
4490 100, Env::IO_HIGH
, nullptr /* stats */, RateLimiter::OpType::kRead
);
4493 thread
->stats
.FinishedOps(nullptr, db
, 100, kRead
);
4494 } while (!duration
.Done(100));
4497 snprintf(msg
, sizeof(msg
), "(%" PRIu64
" of %" PRIu64
" found, "
4498 "issued %" PRIu64
" non-exist keys)\n",
4499 found
, read
, nonexist
);
4501 thread
->stats
.AddMessage(msg
);
4503 if (FLAGS_perf_level
> rocksdb::PerfLevel::kDisable
) {
4504 thread
->stats
.AddMessage(std::string("PERF_CONTEXT:\n") +
4505 get_perf_context()->ToString());
4509 int64_t GetRandomKey(Random64
* rand
) {
4510 uint64_t rand_int
= rand
->Next();
4512 if (read_random_exp_range_
== 0) {
4513 key_rand
= rand_int
% FLAGS_num
;
4515 const uint64_t kBigInt
= static_cast<uint64_t>(1U) << 62;
4516 long double order
= -static_cast<long double>(rand_int
% kBigInt
) /
4517 static_cast<long double>(kBigInt
) *
4518 read_random_exp_range_
;
4519 long double exp_ran
= std::exp(order
);
4521 static_cast<int64_t>(exp_ran
* static_cast<long double>(FLAGS_num
));
4522 // Map to a different number to avoid locality.
4523 const uint64_t kBigPrime
= 0x5bd1e995;
4524 // Overflow is like %(2^64). Will have little impact of results.
4525 key_rand
= static_cast<int64_t>((rand_num
* kBigPrime
) % FLAGS_num
);
4530 void ReadRandom(ThreadState
* thread
) {
4534 ReadOptions
options(FLAGS_verify_checksum
, true);
4535 std::unique_ptr
<const char[]> key_guard
;
4536 Slice key
= AllocateKey(&key_guard
);
4537 PinnableSlice pinnable_val
;
4539 Duration
duration(FLAGS_duration
, reads_
);
4540 while (!duration
.Done(1)) {
4541 DBWithColumnFamilies
* db_with_cfh
= SelectDBWithCfh(thread
);
4542 // We use same key_rand as seed for key and column family so that we can
4543 // deterministically find the cfh corresponding to a particular key, as it
4544 // is done in DoWrite method.
4545 int64_t key_rand
= GetRandomKey(&thread
->rand
);
4546 GenerateKeyFromInt(key_rand
, FLAGS_num
, &key
);
4549 if (FLAGS_num_column_families
> 1) {
4550 s
= db_with_cfh
->db
->Get(options
, db_with_cfh
->GetCfh(key_rand
), key
,
4553 pinnable_val
.Reset();
4554 s
= db_with_cfh
->db
->Get(options
,
4555 db_with_cfh
->db
->DefaultColumnFamily(), key
,
4560 bytes
+= key
.size() + pinnable_val
.size();
4561 } else if (!s
.IsNotFound()) {
4562 fprintf(stderr
, "Get returned an error: %s\n", s
.ToString().c_str());
4566 if (thread
->shared
->read_rate_limiter
.get() != nullptr &&
4567 read
% 256 == 255) {
4568 thread
->shared
->read_rate_limiter
->Request(
4569 256, Env::IO_HIGH
, nullptr /* stats */, RateLimiter::OpType::kRead
);
4572 thread
->stats
.FinishedOps(db_with_cfh
, db_with_cfh
->db
, 1, kRead
);
4576 snprintf(msg
, sizeof(msg
), "(%" PRIu64
" of %" PRIu64
" found)\n",
4579 thread
->stats
.AddBytes(bytes
);
4580 thread
->stats
.AddMessage(msg
);
4582 if (FLAGS_perf_level
> rocksdb::PerfLevel::kDisable
) {
4583 thread
->stats
.AddMessage(std::string("PERF_CONTEXT:\n") +
4584 get_perf_context()->ToString());
4588 // Calls MultiGet over a list of keys from a random distribution.
4589 // Returns the total number of keys found.
4590 void MultiReadRandom(ThreadState
* thread
) {
4592 int64_t num_multireads
= 0;
4594 ReadOptions
options(FLAGS_verify_checksum
, true);
4595 std::vector
<Slice
> keys
;
4596 std::vector
<std::unique_ptr
<const char[]> > key_guards
;
4597 std::vector
<std::string
> values(entries_per_batch_
);
4598 while (static_cast<int64_t>(keys
.size()) < entries_per_batch_
) {
4599 key_guards
.push_back(std::unique_ptr
<const char[]>());
4600 keys
.push_back(AllocateKey(&key_guards
.back()));
4603 Duration
duration(FLAGS_duration
, reads_
);
4604 while (!duration
.Done(1)) {
4605 DB
* db
= SelectDB(thread
);
4606 for (int64_t i
= 0; i
< entries_per_batch_
; ++i
) {
4607 GenerateKeyFromInt(GetRandomKey(&thread
->rand
), FLAGS_num
, &keys
[i
]);
4609 std::vector
<Status
> statuses
= db
->MultiGet(options
, keys
, &values
);
4610 assert(static_cast<int64_t>(statuses
.size()) == entries_per_batch_
);
4612 read
+= entries_per_batch_
;
4614 for (int64_t i
= 0; i
< entries_per_batch_
; ++i
) {
4615 if (statuses
[i
].ok()) {
4617 } else if (!statuses
[i
].IsNotFound()) {
4618 fprintf(stderr
, "MultiGet returned an error: %s\n",
4619 statuses
[i
].ToString().c_str());
4623 if (thread
->shared
->read_rate_limiter
.get() != nullptr &&
4624 num_multireads
% 256 == 255) {
4625 thread
->shared
->read_rate_limiter
->Request(
4626 256 * entries_per_batch_
, Env::IO_HIGH
, nullptr /* stats */,
4627 RateLimiter::OpType::kRead
);
4629 thread
->stats
.FinishedOps(nullptr, db
, entries_per_batch_
, kRead
);
4633 snprintf(msg
, sizeof(msg
), "(%" PRIu64
" of %" PRIu64
" found)",
4635 thread
->stats
.AddMessage(msg
);
4638 // THe reverse function of Pareto function
4639 int64_t ParetoCdfInversion(double u
, double theta
, double k
, double sigma
) {
4642 ret
= theta
- sigma
* std::log(u
);
4644 ret
= theta
+ sigma
* (std::pow(u
, -1 * k
) - 1) / k
;
4646 return static_cast<int64_t>(ceil(ret
));
4648 // inversion of y=ax^b
4649 int64_t PowerCdfInversion(double u
, double a
, double b
) {
4651 ret
= std::pow((u
/ a
), (1 / b
));
4652 return static_cast<int64_t>(ceil(ret
));
4655 // Add the noice to the QPS
4656 double AddNoise(double origin
, double noise_ratio
) {
4657 if (noise_ratio
< 0.0 || noise_ratio
> 1.0) {
4660 int band_int
= static_cast<int>(FLAGS_sine_a
);
4661 double delta
= (rand() % band_int
- band_int
/ 2) * noise_ratio
;
4662 if (origin
+ delta
< 0) {
4665 return (origin
+ delta
);
4669 // decide the query type
4670 // 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
4671 class QueryDecider
{
4673 std::vector
<int> type_
;
4674 std::vector
<double> ratio_
;
4680 Status
Initiate(std::vector
<double> ratio_input
) {
4681 int range_max
= 1000;
4683 for (auto& ratio
: ratio_input
) {
4687 for (auto& ratio
: ratio_input
) {
4688 range_
+= static_cast<int>(ceil(range_max
* (ratio
/ sum
)));
4689 type_
.push_back(range_
);
4690 ratio_
.push_back(ratio
/ sum
);
4692 return Status::OK();
4695 int GetType(int64_t rand_num
) {
4697 rand_num
= rand_num
* (-1);
4699 assert(range_
!= 0);
4700 int pos
= static_cast<int>(rand_num
% range_
);
4701 for (int i
= 0; i
< static_cast<int>(type_
.size()); i
++) {
4702 if (pos
< type_
[i
]) {
4710 // The graph wokrload mixed with Get, Put, Iterator
4711 void MixGraph(ThreadState
* thread
) {
4712 int64_t read
= 0; // including single gets and Next of iterators
4717 int64_t seek_found
= 0;
4719 const int64_t default_value_max
= 1 * 1024 * 1024;
4720 int64_t value_max
= default_value_max
;
4721 int64_t scan_len_max
= FLAGS_mix_max_scan_len
;
4722 double write_rate
= 1000000.0;
4723 double read_rate
= 1000000.0;
4724 std::vector
<double> ratio
{FLAGS_mix_get_ratio
, FLAGS_mix_put_ratio
,
4725 FLAGS_mix_seek_ratio
};
4726 char value_buffer
[default_value_max
];
4728 RandomGenerator gen
;
4730 if (value_max
> FLAGS_mix_max_value_size
) {
4731 value_max
= FLAGS_mix_max_value_size
;
4734 ReadOptions
options(FLAGS_verify_checksum
, true);
4735 std::unique_ptr
<const char[]> key_guard
;
4736 Slice key
= AllocateKey(&key_guard
);
4737 PinnableSlice pinnable_val
;
4738 query
.Initiate(ratio
);
4740 // the limit of qps initiation
4741 if (FLAGS_sine_a
!= 0 || FLAGS_sine_d
!= 0) {
4742 thread
->shared
->read_rate_limiter
.reset(NewGenericRateLimiter(
4743 read_rate
, 100000 /* refill_period_us */, 10 /* fairness */,
4744 RateLimiter::Mode::kReadsOnly
));
4745 thread
->shared
->write_rate_limiter
.reset(
4746 NewGenericRateLimiter(write_rate
));
4749 Duration
duration(FLAGS_duration
, reads_
);
4750 while (!duration
.Done(1)) {
4751 DBWithColumnFamilies
* db_with_cfh
= SelectDBWithCfh(thread
);
4752 int64_t rand_v
, key_rand
, key_seed
;
4753 rand_v
= GetRandomKey(&thread
->rand
) % FLAGS_num
;
4754 double u
= static_cast<double>(rand_v
) / FLAGS_num
;
4755 key_seed
= PowerCdfInversion(u
, FLAGS_key_dist_a
, FLAGS_key_dist_b
);
4756 Random64
rand(key_seed
);
4757 key_rand
= static_cast<int64_t>(rand
.Next()) % FLAGS_num
;
4758 GenerateKeyFromInt(key_rand
, FLAGS_num
, &key
);
4759 int query_type
= query
.GetType(rand_v
);
4762 uint64_t now
= FLAGS_env
->NowMicros();
4763 uint64_t usecs_since_last
;
4764 if (now
> thread
->stats
.GetSineInterval()) {
4765 usecs_since_last
= now
- thread
->stats
.GetSineInterval();
4767 usecs_since_last
= 0;
4770 if (usecs_since_last
>
4771 (FLAGS_sine_mix_rate_interval_milliseconds
* uint64_t{1000})) {
4772 double usecs_since_start
=
4773 static_cast<double>(now
- thread
->stats
.GetStart());
4774 thread
->stats
.ResetSineInterval();
4775 double mix_rate_with_noise
= AddNoise(
4776 SineRate(usecs_since_start
/ 1000000.0), FLAGS_sine_mix_rate_noise
);
4777 read_rate
= mix_rate_with_noise
* (query
.ratio_
[0] + query
.ratio_
[2]);
4779 mix_rate_with_noise
* query
.ratio_
[1] * FLAGS_mix_ave_kv_size
;
4781 thread
->shared
->write_rate_limiter
.reset(
4782 NewGenericRateLimiter(write_rate
));
4783 thread
->shared
->read_rate_limiter
.reset(NewGenericRateLimiter(
4785 FLAGS_sine_mix_rate_interval_milliseconds
* uint64_t{1000}, 10,
4786 RateLimiter::Mode::kReadsOnly
));
4789 if (query_type
== 0) {
4793 if (FLAGS_num_column_families
> 1) {
4794 s
= db_with_cfh
->db
->Get(options
, db_with_cfh
->GetCfh(key_rand
), key
,
4797 pinnable_val
.Reset();
4798 s
= db_with_cfh
->db
->Get(options
,
4799 db_with_cfh
->db
->DefaultColumnFamily(), key
,
4805 bytes
+= key
.size() + pinnable_val
.size();
4806 } else if (!s
.IsNotFound()) {
4807 fprintf(stderr
, "Get returned an error: %s\n", s
.ToString().c_str());
4811 if (thread
->shared
->read_rate_limiter
.get() != nullptr &&
4812 read
% 256 == 255) {
4813 thread
->shared
->read_rate_limiter
->Request(
4814 256, Env::IO_HIGH
, nullptr /* stats */,
4815 RateLimiter::OpType::kRead
);
4817 thread
->stats
.FinishedOps(db_with_cfh
, db_with_cfh
->db
, 1, kRead
);
4818 } else if (query_type
== 1) {
4821 int64_t value_size
= ParetoCdfInversion(
4822 u
, FLAGS_value_theta
, FLAGS_value_k
, FLAGS_value_sigma
);
4823 if (value_size
< 0) {
4825 } else if (value_size
> value_max
) {
4826 value_size
= value_size
% value_max
;
4828 s
= db_with_cfh
->db
->Put(
4829 write_options_
, key
,
4830 gen
.Generate(static_cast<unsigned int>(value_size
)));
4832 fprintf(stderr
, "put error: %s\n", s
.ToString().c_str());
4836 if (thread
->shared
->write_rate_limiter
) {
4837 thread
->shared
->write_rate_limiter
->Request(
4838 key
.size() + value_size
, Env::IO_HIGH
, nullptr /*stats*/,
4839 RateLimiter::OpType::kWrite
);
4841 thread
->stats
.FinishedOps(db_with_cfh
, db_with_cfh
->db
, 1, kWrite
);
4842 } else if (query_type
== 2) {
4844 if (db_with_cfh
->db
!= nullptr) {
4845 Iterator
* single_iter
= nullptr;
4846 single_iter
= db_with_cfh
->db
->NewIterator(options
);
4847 if (single_iter
!= nullptr) {
4848 single_iter
->Seek(key
);
4851 if (single_iter
->Valid() && single_iter
->key().compare(key
) == 0) {
4854 int64_t scan_length
=
4855 ParetoCdfInversion(u
, FLAGS_iter_theta
, FLAGS_iter_k
,
4858 for (int64_t j
= 0; j
< scan_length
&& single_iter
->Valid(); j
++) {
4859 Slice value
= single_iter
->value();
4860 memcpy(value_buffer
, value
.data(),
4861 std::min(value
.size(), sizeof(value_buffer
)));
4862 bytes
+= single_iter
->key().size() + single_iter
->value().size();
4863 single_iter
->Next();
4864 assert(single_iter
->status().ok());
4869 thread
->stats
.FinishedOps(db_with_cfh
, db_with_cfh
->db
, 1, kSeek
);
4873 snprintf(msg
, sizeof(msg
),
4874 "( Gets:%" PRIu64
" Puts:%" PRIu64
" Seek:%" PRIu64
" of %" PRIu64
4875 " in %" PRIu64
" found)\n",
4876 gets
, puts
, seek
, found
, read
);
4878 thread
->stats
.AddBytes(bytes
);
4879 thread
->stats
.AddMessage(msg
);
4881 if (FLAGS_perf_level
> rocksdb::PerfLevel::kDisable
) {
4882 thread
->stats
.AddMessage(std::string("PERF_CONTEXT:\n") +
4883 get_perf_context()->ToString());
4887 void IteratorCreation(ThreadState
* thread
) {
4888 Duration
duration(FLAGS_duration
, reads_
);
4889 ReadOptions
options(FLAGS_verify_checksum
, true);
4890 while (!duration
.Done(1)) {
4891 DB
* db
= SelectDB(thread
);
4892 Iterator
* iter
= db
->NewIterator(options
);
4894 thread
->stats
.FinishedOps(nullptr, db
, 1, kOthers
);
4898 void IteratorCreationWhileWriting(ThreadState
* thread
) {
4899 if (thread
->tid
> 0) {
4900 IteratorCreation(thread
);
4902 BGWriter(thread
, kWrite
);
4906 void SeekRandom(ThreadState
* thread
) {
4910 ReadOptions
options(FLAGS_verify_checksum
, true);
4911 options
.tailing
= FLAGS_use_tailing_iterator
;
4913 Iterator
* single_iter
= nullptr;
4914 std::vector
<Iterator
*> multi_iters
;
4915 if (db_
.db
!= nullptr) {
4916 single_iter
= db_
.db
->NewIterator(options
);
4918 for (const auto& db_with_cfh
: multi_dbs_
) {
4919 multi_iters
.push_back(db_with_cfh
.db
->NewIterator(options
));
4923 std::unique_ptr
<const char[]> key_guard
;
4924 Slice key
= AllocateKey(&key_guard
);
4926 std::unique_ptr
<const char[]> upper_bound_key_guard
;
4927 Slice upper_bound
= AllocateKey(&upper_bound_key_guard
);
4928 std::unique_ptr
<const char[]> lower_bound_key_guard
;
4929 Slice lower_bound
= AllocateKey(&lower_bound_key_guard
);
4931 Duration
duration(FLAGS_duration
, reads_
);
4932 char value_buffer
[256];
4933 while (!duration
.Done(1)) {
4934 int64_t seek_pos
= thread
->rand
.Next() % FLAGS_num
;
4935 GenerateKeyFromInt((uint64_t)seek_pos
, FLAGS_num
, &key
);
4936 if (FLAGS_max_scan_distance
!= 0) {
4937 if (FLAGS_reverse_iterator
) {
4939 static_cast<uint64_t>(std::max(
4940 static_cast<int64_t>(0), seek_pos
- FLAGS_max_scan_distance
)),
4941 FLAGS_num
, &lower_bound
);
4942 options
.iterate_lower_bound
= &lower_bound
;
4945 (uint64_t)std::min(FLAGS_num
, seek_pos
+ FLAGS_max_scan_distance
),
4946 FLAGS_num
, &upper_bound
);
4947 options
.iterate_upper_bound
= &upper_bound
;
4951 if (!FLAGS_use_tailing_iterator
) {
4952 if (db_
.db
!= nullptr) {
4954 single_iter
= db_
.db
->NewIterator(options
);
4956 for (auto iter
: multi_iters
) {
4959 multi_iters
.clear();
4960 for (const auto& db_with_cfh
: multi_dbs_
) {
4961 multi_iters
.push_back(db_with_cfh
.db
->NewIterator(options
));
4965 // Pick a Iterator to use
4966 Iterator
* iter_to_use
= single_iter
;
4967 if (single_iter
== nullptr) {
4968 iter_to_use
= multi_iters
[thread
->rand
.Next() % multi_iters
.size()];
4971 iter_to_use
->Seek(key
);
4973 if (iter_to_use
->Valid() && iter_to_use
->key().compare(key
) == 0) {
4977 for (int j
= 0; j
< FLAGS_seek_nexts
&& iter_to_use
->Valid(); ++j
) {
4978 // Copy out iterator's value to make sure we read them.
4979 Slice value
= iter_to_use
->value();
4980 memcpy(value_buffer
, value
.data(),
4981 std::min(value
.size(), sizeof(value_buffer
)));
4982 bytes
+= iter_to_use
->key().size() + iter_to_use
->value().size();
4984 if (!FLAGS_reverse_iterator
) {
4985 iter_to_use
->Next();
4987 iter_to_use
->Prev();
4989 assert(iter_to_use
->status().ok());
4992 if (thread
->shared
->read_rate_limiter
.get() != nullptr &&
4993 read
% 256 == 255) {
4994 thread
->shared
->read_rate_limiter
->Request(
4995 256, Env::IO_HIGH
, nullptr /* stats */, RateLimiter::OpType::kRead
);
4998 thread
->stats
.FinishedOps(&db_
, db_
.db
, 1, kSeek
);
5001 for (auto iter
: multi_iters
) {
5006 snprintf(msg
, sizeof(msg
), "(%" PRIu64
" of %" PRIu64
" found)\n",
5008 thread
->stats
.AddBytes(bytes
);
5009 thread
->stats
.AddMessage(msg
);
5010 if (FLAGS_perf_level
> rocksdb::PerfLevel::kDisable
) {
5011 thread
->stats
.AddMessage(std::string("PERF_CONTEXT:\n") +
5012 get_perf_context()->ToString());
5016 void SeekRandomWhileWriting(ThreadState
* thread
) {
5017 if (thread
->tid
> 0) {
5020 BGWriter(thread
, kWrite
);
5024 void SeekRandomWhileMerging(ThreadState
* thread
) {
5025 if (thread
->tid
> 0) {
5028 BGWriter(thread
, kMerge
);
5032 void DoDelete(ThreadState
* thread
, bool seq
) {
5034 Duration
duration(seq
? 0 : FLAGS_duration
, deletes_
);
5036 std::unique_ptr
<const char[]> key_guard
;
5037 Slice key
= AllocateKey(&key_guard
);
5039 while (!duration
.Done(entries_per_batch_
)) {
5040 DB
* db
= SelectDB(thread
);
5042 for (int64_t j
= 0; j
< entries_per_batch_
; ++j
) {
5043 const int64_t k
= seq
? i
+ j
: (thread
->rand
.Next() % FLAGS_num
);
5044 GenerateKeyFromInt(k
, FLAGS_num
, &key
);
5047 auto s
= db
->Write(write_options_
, &batch
);
5048 thread
->stats
.FinishedOps(nullptr, db
, entries_per_batch_
, kDelete
);
5050 fprintf(stderr
, "del error: %s\n", s
.ToString().c_str());
5053 i
+= entries_per_batch_
;
5057 void DeleteSeq(ThreadState
* thread
) {
5058 DoDelete(thread
, true);
5061 void DeleteRandom(ThreadState
* thread
) {
5062 DoDelete(thread
, false);
5065 void ReadWhileWriting(ThreadState
* thread
) {
5066 if (thread
->tid
> 0) {
5069 BGWriter(thread
, kWrite
);
5073 void ReadWhileMerging(ThreadState
* thread
) {
5074 if (thread
->tid
> 0) {
5077 BGWriter(thread
, kMerge
);
5081 void BGWriter(ThreadState
* thread
, enum OperationType write_merge
) {
5082 // Special thread that keeps writing until other threads are done.
5083 RandomGenerator gen
;
5086 std::unique_ptr
<RateLimiter
> write_rate_limiter
;
5087 if (FLAGS_benchmark_write_rate_limit
> 0) {
5088 write_rate_limiter
.reset(
5089 NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit
));
5092 // Don't merge stats from this thread with the readers.
5093 thread
->stats
.SetExcludeFromMerge();
5095 std::unique_ptr
<const char[]> key_guard
;
5096 Slice key
= AllocateKey(&key_guard
);
5097 uint32_t written
= 0;
5098 bool hint_printed
= false;
5101 DB
* db
= SelectDB(thread
);
5103 MutexLock
l(&thread
->shared
->mu
);
5104 if (FLAGS_finish_after_writes
&& written
== writes_
) {
5105 fprintf(stderr
, "Exiting the writer after %u writes...\n", written
);
5108 if (thread
->shared
->num_done
+ 1 >= thread
->shared
->num_initialized
) {
5109 // Other threads have finished
5110 if (FLAGS_finish_after_writes
) {
5111 // Wait for the writes to be finished
5112 if (!hint_printed
) {
5113 fprintf(stderr
, "Reads are finished. Have %d more writes to do\n",
5114 (int)writes_
- written
);
5115 hint_printed
= true;
5118 // Finish the write immediately
5124 GenerateKeyFromInt(thread
->rand
.Next() % FLAGS_num
, FLAGS_num
, &key
);
5127 if (write_merge
== kWrite
) {
5128 s
= db
->Put(write_options_
, key
, gen
.Generate(value_size_
));
5130 s
= db
->Merge(write_options_
, key
, gen
.Generate(value_size_
));
5135 fprintf(stderr
, "put or merge error: %s\n", s
.ToString().c_str());
5138 bytes
+= key
.size() + value_size_
;
5139 thread
->stats
.FinishedOps(&db_
, db_
.db
, 1, kWrite
);
5141 if (FLAGS_benchmark_write_rate_limit
> 0) {
5142 write_rate_limiter
->Request(
5143 entries_per_batch_
* (value_size_
+ key_size_
), Env::IO_HIGH
,
5144 nullptr /* stats */, RateLimiter::OpType::kWrite
);
5147 thread
->stats
.AddBytes(bytes
);
5150 void ReadWhileScanning(ThreadState
* thread
) {
5151 if (thread
->tid
> 0) {
5158 void BGScan(ThreadState
* thread
) {
5159 if (FLAGS_num_multi_db
> 0) {
5160 fprintf(stderr
, "Not supporting multiple DBs.\n");
5163 assert(db_
.db
!= nullptr);
5164 ReadOptions read_options
;
5165 Iterator
* iter
= db_
.db
->NewIterator(read_options
);
5167 fprintf(stderr
, "num reads to do %" PRIu64
"\n", reads_
);
5168 Duration
duration(FLAGS_duration
, reads_
);
5169 uint64_t num_seek_to_first
= 0;
5170 uint64_t num_next
= 0;
5171 while (!duration
.Done(1)) {
5172 if (!iter
->Valid()) {
5173 iter
->SeekToFirst();
5174 num_seek_to_first
++;
5175 } else if (!iter
->status().ok()) {
5176 fprintf(stderr
, "Iterator error: %s\n",
5177 iter
->status().ToString().c_str());
5184 thread
->stats
.FinishedOps(&db_
, db_
.db
, 1, kSeek
);
5189 // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
5190 // in DB atomically i.e in a single batch. Also refer GetMany.
5191 Status
PutMany(DB
* db
, const WriteOptions
& writeoptions
, const Slice
& key
,
5192 const Slice
& value
) {
5193 std::string suffixes
[3] = {"2", "1", "0"};
5194 std::string keys
[3];
5198 for (int i
= 0; i
< 3; i
++) {
5199 keys
[i
] = key
.ToString() + suffixes
[i
];
5200 batch
.Put(keys
[i
], value
);
5203 s
= db
->Write(writeoptions
, &batch
);
5208 // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
5209 // in DB atomically i.e in a single batch. Also refer GetMany.
5210 Status
DeleteMany(DB
* db
, const WriteOptions
& writeoptions
,
5212 std::string suffixes
[3] = {"1", "2", "0"};
5213 std::string keys
[3];
5217 for (int i
= 0; i
< 3; i
++) {
5218 keys
[i
] = key
.ToString() + suffixes
[i
];
5219 batch
.Delete(keys
[i
]);
5222 s
= db
->Write(writeoptions
, &batch
);
5226 // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
5227 // in the same snapshot, and verifies that all the values are identical.
5228 // ASSUMES that PutMany was used to put (K, V) into the DB.
5229 Status
GetMany(DB
* db
, const ReadOptions
& readoptions
, const Slice
& key
,
5230 std::string
* value
) {
5231 std::string suffixes
[3] = {"0", "1", "2"};
5232 std::string keys
[3];
5233 Slice key_slices
[3];
5234 std::string values
[3];
5235 ReadOptions readoptionscopy
= readoptions
;
5236 readoptionscopy
.snapshot
= db
->GetSnapshot();
5238 for (int i
= 0; i
< 3; i
++) {
5239 keys
[i
] = key
.ToString() + suffixes
[i
];
5240 key_slices
[i
] = keys
[i
];
5241 s
= db
->Get(readoptionscopy
, key_slices
[i
], value
);
5242 if (!s
.ok() && !s
.IsNotFound()) {
5243 fprintf(stderr
, "get error: %s\n", s
.ToString().c_str());
5245 // we continue after error rather than exiting so that we can
5246 // find more errors if any
5247 } else if (s
.IsNotFound()) {
5253 db
->ReleaseSnapshot(readoptionscopy
.snapshot
);
5255 if ((values
[0] != values
[1]) || (values
[1] != values
[2])) {
5256 fprintf(stderr
, "inconsistent values for key %s: %s, %s, %s\n",
5257 key
.ToString().c_str(), values
[0].c_str(), values
[1].c_str(),
5259 // we continue after error rather than exiting so that we can
5260 // find more errors if any
5266 // Differs from readrandomwriterandom in the following ways:
5267 // (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs.
5268 // (b) Does deletes as well (per FLAGS_deletepercent)
5269 // (c) In order to achieve high % of 'found' during lookups, and to do
5270 // multiple writes (including puts and deletes) it uses upto
5271 // FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys.
5272 // (d) Does not have a MultiGet option.
5273 void RandomWithVerify(ThreadState
* thread
) {
5274 ReadOptions
options(FLAGS_verify_checksum
, true);
5275 RandomGenerator gen
;
5280 int delete_weight
= 0;
5281 int64_t gets_done
= 0;
5282 int64_t puts_done
= 0;
5283 int64_t deletes_done
= 0;
5285 std::unique_ptr
<const char[]> key_guard
;
5286 Slice key
= AllocateKey(&key_guard
);
5288 // the number of iterations is the larger of read_ or write_
5289 for (int64_t i
= 0; i
< readwrites_
; i
++) {
5290 DB
* db
= SelectDB(thread
);
5291 if (get_weight
== 0 && put_weight
== 0 && delete_weight
== 0) {
5292 // one batch completed, reinitialize for next batch
5293 get_weight
= FLAGS_readwritepercent
;
5294 delete_weight
= FLAGS_deletepercent
;
5295 put_weight
= 100 - get_weight
- delete_weight
;
5297 GenerateKeyFromInt(thread
->rand
.Next() % FLAGS_numdistinct
,
5298 FLAGS_numdistinct
, &key
);
5299 if (get_weight
> 0) {
5300 // do all the gets first
5301 Status s
= GetMany(db
, options
, key
, &value
);
5302 if (!s
.ok() && !s
.IsNotFound()) {
5303 fprintf(stderr
, "getmany error: %s\n", s
.ToString().c_str());
5304 // we continue after error rather than exiting so that we can
5305 // find more errors if any
5306 } else if (!s
.IsNotFound()) {
5311 thread
->stats
.FinishedOps(&db_
, db_
.db
, 1, kRead
);
5312 } else if (put_weight
> 0) {
5313 // then do all the corresponding number of puts
5314 // for all the gets we have done earlier
5315 Status s
= PutMany(db
, write_options_
, key
, gen
.Generate(value_size_
));
5317 fprintf(stderr
, "putmany error: %s\n", s
.ToString().c_str());
5322 thread
->stats
.FinishedOps(&db_
, db_
.db
, 1, kWrite
);
5323 } else if (delete_weight
> 0) {
5324 Status s
= DeleteMany(db
, write_options_
, key
);
5326 fprintf(stderr
, "deletemany error: %s\n", s
.ToString().c_str());
5331 thread
->stats
.FinishedOps(&db_
, db_
.db
, 1, kDelete
);
5335 snprintf(msg
, sizeof(msg
),
5336 "( get:%" PRIu64
" put:%" PRIu64
" del:%" PRIu64
" total:%" \
5337 PRIu64
" found:%" PRIu64
")",
5338 gets_done
, puts_done
, deletes_done
, readwrites_
, found
);
5339 thread
->stats
.AddMessage(msg
);
5342 // This is different from ReadWhileWriting because it does not use
5344 void ReadRandomWriteRandom(ThreadState
* thread
) {
5345 ReadOptions
options(FLAGS_verify_checksum
, true);
5346 RandomGenerator gen
;
5351 int64_t reads_done
= 0;
5352 int64_t writes_done
= 0;
5353 Duration
duration(FLAGS_duration
, readwrites_
);
5355 std::unique_ptr
<const char[]> key_guard
;
5356 Slice key
= AllocateKey(&key_guard
);
5358 // the number of iterations is the larger of read_ or write_
5359 while (!duration
.Done(1)) {
5360 DB
* db
= SelectDB(thread
);
5361 GenerateKeyFromInt(thread
->rand
.Next() % FLAGS_num
, FLAGS_num
, &key
);
5362 if (get_weight
== 0 && put_weight
== 0) {
5363 // one batch completed, reinitialize for next batch
5364 get_weight
= FLAGS_readwritepercent
;
5365 put_weight
= 100 - get_weight
;
5367 if (get_weight
> 0) {
5368 // do all the gets first
5369 Status s
= db
->Get(options
, key
, &value
);
5370 if (!s
.ok() && !s
.IsNotFound()) {
5371 fprintf(stderr
, "get error: %s\n", s
.ToString().c_str());
5372 // we continue after error rather than exiting so that we can
5373 // find more errors if any
5374 } else if (!s
.IsNotFound()) {
5379 thread
->stats
.FinishedOps(nullptr, db
, 1, kRead
);
5380 } else if (put_weight
> 0) {
5381 // then do all the corresponding number of puts
5382 // for all the gets we have done earlier
5383 Status s
= db
->Put(write_options_
, key
, gen
.Generate(value_size_
));
5385 fprintf(stderr
, "put error: %s\n", s
.ToString().c_str());
5390 thread
->stats
.FinishedOps(nullptr, db
, 1, kWrite
);
5394 snprintf(msg
, sizeof(msg
), "( reads:%" PRIu64
" writes:%" PRIu64 \
5395 " total:%" PRIu64
" found:%" PRIu64
")",
5396 reads_done
, writes_done
, readwrites_
, found
);
5397 thread
->stats
.AddMessage(msg
);
5401 // Read-modify-write for random keys
5402 void UpdateRandom(ThreadState
* thread
) {
5403 ReadOptions
options(FLAGS_verify_checksum
, true);
5404 RandomGenerator gen
;
5408 Duration
duration(FLAGS_duration
, readwrites_
);
5410 std::unique_ptr
<const char[]> key_guard
;
5411 Slice key
= AllocateKey(&key_guard
);
5412 // the number of iterations is the larger of read_ or write_
5413 while (!duration
.Done(1)) {
5414 DB
* db
= SelectDB(thread
);
5415 GenerateKeyFromInt(thread
->rand
.Next() % FLAGS_num
, FLAGS_num
, &key
);
5417 auto status
= db
->Get(options
, key
, &value
);
5420 bytes
+= key
.size() + value
.size();
5421 } else if (!status
.IsNotFound()) {
5422 fprintf(stderr
, "Get returned an error: %s\n",
5423 status
.ToString().c_str());
5427 if (thread
->shared
->write_rate_limiter
) {
5428 thread
->shared
->write_rate_limiter
->Request(
5429 key
.size() + value_size_
, Env::IO_HIGH
, nullptr /*stats*/,
5430 RateLimiter::OpType::kWrite
);
5433 Status s
= db
->Put(write_options_
, key
, gen
.Generate(value_size_
));
5435 fprintf(stderr
, "put error: %s\n", s
.ToString().c_str());
5438 bytes
+= key
.size() + value_size_
;
5439 thread
->stats
.FinishedOps(nullptr, db
, 1, kUpdate
);
5442 snprintf(msg
, sizeof(msg
),
5443 "( updates:%" PRIu64
" found:%" PRIu64
")", readwrites_
, found
);
5444 thread
->stats
.AddBytes(bytes
);
5445 thread
->stats
.AddMessage(msg
);
5448 // Read-XOR-write for random keys. Xors the existing value with a randomly
5449 // generated value, and stores the result. Assuming A in the array of bytes
5450 // representing the existing value, we generate an array B of the same size,
5451 // then compute C = A^B as C[i]=A[i]^B[i], and store C
5452 void XORUpdateRandom(ThreadState
* thread
) {
5453 ReadOptions
options(FLAGS_verify_checksum
, true);
5454 RandomGenerator gen
;
5455 std::string existing_value
;
5457 Duration
duration(FLAGS_duration
, readwrites_
);
5459 BytesXOROperator xor_operator
;
5461 std::unique_ptr
<const char[]> key_guard
;
5462 Slice key
= AllocateKey(&key_guard
);
5463 // the number of iterations is the larger of read_ or write_
5464 while (!duration
.Done(1)) {
5465 DB
* db
= SelectDB(thread
);
5466 GenerateKeyFromInt(thread
->rand
.Next() % FLAGS_num
, FLAGS_num
, &key
);
5468 auto status
= db
->Get(options
, key
, &existing_value
);
5471 } else if (!status
.IsNotFound()) {
5472 fprintf(stderr
, "Get returned an error: %s\n",
5473 status
.ToString().c_str());
5477 Slice value
= gen
.Generate(value_size_
);
5478 std::string new_value
;
5481 Slice existing_value_slice
= Slice(existing_value
);
5482 xor_operator
.XOR(&existing_value_slice
, value
, &new_value
);
5484 xor_operator
.XOR(nullptr, value
, &new_value
);
5487 Status s
= db
->Put(write_options_
, key
, Slice(new_value
));
5489 fprintf(stderr
, "put error: %s\n", s
.ToString().c_str());
5492 thread
->stats
.FinishedOps(nullptr, db
, 1);
5495 snprintf(msg
, sizeof(msg
),
5496 "( updates:%" PRIu64
" found:%" PRIu64
")", readwrites_
, found
);
5497 thread
->stats
.AddMessage(msg
);
5500 // Read-modify-write for random keys.
5501 // Each operation causes the key grow by value_size (simulating an append).
5502 // Generally used for benchmarking against merges of similar type
5503 void AppendRandom(ThreadState
* thread
) {
5504 ReadOptions
options(FLAGS_verify_checksum
, true);
5505 RandomGenerator gen
;
5510 std::unique_ptr
<const char[]> key_guard
;
5511 Slice key
= AllocateKey(&key_guard
);
5512 // The number of iterations is the larger of read_ or write_
5513 Duration
duration(FLAGS_duration
, readwrites_
);
5514 while (!duration
.Done(1)) {
5515 DB
* db
= SelectDB(thread
);
5516 GenerateKeyFromInt(thread
->rand
.Next() % FLAGS_num
, FLAGS_num
, &key
);
5518 auto status
= db
->Get(options
, key
, &value
);
5521 bytes
+= key
.size() + value
.size();
5522 } else if (!status
.IsNotFound()) {
5523 fprintf(stderr
, "Get returned an error: %s\n",
5524 status
.ToString().c_str());
5527 // If not existing, then just assume an empty string of data
5531 // Update the value (by appending data)
5532 Slice operand
= gen
.Generate(value_size_
);
5533 if (value
.size() > 0) {
5534 // Use a delimiter to match the semantics for StringAppendOperator
5535 value
.append(1,',');
5537 value
.append(operand
.data(), operand
.size());
5539 // Write back to the database
5540 Status s
= db
->Put(write_options_
, key
, value
);
5542 fprintf(stderr
, "put error: %s\n", s
.ToString().c_str());
5545 bytes
+= key
.size() + value
.size();
5546 thread
->stats
.FinishedOps(nullptr, db
, 1, kUpdate
);
5550 snprintf(msg
, sizeof(msg
), "( updates:%" PRIu64
" found:%" PRIu64
")",
5551 readwrites_
, found
);
5552 thread
->stats
.AddBytes(bytes
);
5553 thread
->stats
.AddMessage(msg
);
5556 // Read-modify-write for random keys (using MergeOperator)
5557 // The merge operator to use should be defined by FLAGS_merge_operator
5558 // Adjust FLAGS_value_size so that the keys are reasonable for this operator
5559 // Assumes that the merge operator is non-null (i.e.: is well-defined)
5561 // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
5562 // to simulate random additions over 64-bit integers using merge.
5564 // The number of merges on the same key can be controlled by adjusting
5565 // FLAGS_merge_keys.
5566 void MergeRandom(ThreadState
* thread
) {
5567 RandomGenerator gen
;
5569 std::unique_ptr
<const char[]> key_guard
;
5570 Slice key
= AllocateKey(&key_guard
);
5571 // The number of iterations is the larger of read_ or write_
5572 Duration
duration(FLAGS_duration
, readwrites_
);
5573 while (!duration
.Done(1)) {
5574 DBWithColumnFamilies
* db_with_cfh
= SelectDBWithCfh(thread
);
5575 int64_t key_rand
= thread
->rand
.Next() % merge_keys_
;
5576 GenerateKeyFromInt(key_rand
, merge_keys_
, &key
);
5579 if (FLAGS_num_column_families
> 1) {
5580 s
= db_with_cfh
->db
->Merge(write_options_
,
5581 db_with_cfh
->GetCfh(key_rand
), key
,
5582 gen
.Generate(value_size_
));
5584 s
= db_with_cfh
->db
->Merge(write_options_
,
5585 db_with_cfh
->db
->DefaultColumnFamily(), key
,
5586 gen
.Generate(value_size_
));
5590 fprintf(stderr
, "merge error: %s\n", s
.ToString().c_str());
5593 bytes
+= key
.size() + value_size_
;
5594 thread
->stats
.FinishedOps(nullptr, db_with_cfh
->db
, 1, kMerge
);
5597 // Print some statistics
5599 snprintf(msg
, sizeof(msg
), "( updates:%" PRIu64
")", readwrites_
);
5600 thread
->stats
.AddBytes(bytes
);
5601 thread
->stats
.AddMessage(msg
);
5604 // Read and merge random keys. The amount of reads and merges are controlled
5605 // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
5606 // keys (and thus also the number of reads and merges on the same key) can be
5607 // adjusted with FLAGS_merge_keys.
5609 // As with MergeRandom, the merge operator to use should be defined by
5610 // FLAGS_merge_operator.
5611 void ReadRandomMergeRandom(ThreadState
* thread
) {
5612 ReadOptions
options(FLAGS_verify_checksum
, true);
5613 RandomGenerator gen
;
5615 int64_t num_hits
= 0;
5616 int64_t num_gets
= 0;
5617 int64_t num_merges
= 0;
5618 size_t max_length
= 0;
5620 std::unique_ptr
<const char[]> key_guard
;
5621 Slice key
= AllocateKey(&key_guard
);
5622 // the number of iterations is the larger of read_ or write_
5623 Duration
duration(FLAGS_duration
, readwrites_
);
5624 while (!duration
.Done(1)) {
5625 DB
* db
= SelectDB(thread
);
5626 GenerateKeyFromInt(thread
->rand
.Next() % merge_keys_
, merge_keys_
, &key
);
5628 bool do_merge
= int(thread
->rand
.Next() % 100) < FLAGS_mergereadpercent
;
5631 Status s
= db
->Merge(write_options_
, key
, gen
.Generate(value_size_
));
5633 fprintf(stderr
, "merge error: %s\n", s
.ToString().c_str());
5637 thread
->stats
.FinishedOps(nullptr, db
, 1, kMerge
);
5639 Status s
= db
->Get(options
, key
, &value
);
5640 if (value
.length() > max_length
)
5641 max_length
= value
.length();
5643 if (!s
.ok() && !s
.IsNotFound()) {
5644 fprintf(stderr
, "get error: %s\n", s
.ToString().c_str());
5645 // we continue after error rather than exiting so that we can
5646 // find more errors if any
5647 } else if (!s
.IsNotFound()) {
5651 thread
->stats
.FinishedOps(nullptr, db
, 1, kRead
);
5656 snprintf(msg
, sizeof(msg
),
5657 "(reads:%" PRIu64
" merges:%" PRIu64
" total:%" PRIu64
5658 " hits:%" PRIu64
" maxlength:%" ROCKSDB_PRIszt
")",
5659 num_gets
, num_merges
, readwrites_
, num_hits
, max_length
);
5660 thread
->stats
.AddMessage(msg
);
5663 void WriteSeqSeekSeq(ThreadState
* thread
) {
5664 writes_
= FLAGS_num
;
5665 DoWrite(thread
, SEQUENTIAL
);
5666 // exclude writes from the ops/sec calculation
5667 thread
->stats
.Start(thread
->tid
);
5669 DB
* db
= SelectDB(thread
);
5670 std::unique_ptr
<Iterator
> iter(
5671 db
->NewIterator(ReadOptions(FLAGS_verify_checksum
, true)));
5673 std::unique_ptr
<const char[]> key_guard
;
5674 Slice key
= AllocateKey(&key_guard
);
5675 for (int64_t i
= 0; i
< FLAGS_num
; ++i
) {
5676 GenerateKeyFromInt(i
, FLAGS_num
, &key
);
5678 assert(iter
->Valid() && iter
->key() == key
);
5679 thread
->stats
.FinishedOps(nullptr, db
, 1, kSeek
);
5681 for (int j
= 0; j
< FLAGS_seek_nexts
&& i
+ 1 < FLAGS_num
; ++j
) {
5682 if (!FLAGS_reverse_iterator
) {
5687 GenerateKeyFromInt(++i
, FLAGS_num
, &key
);
5688 assert(iter
->Valid() && iter
->key() == key
);
5689 thread
->stats
.FinishedOps(nullptr, db
, 1, kSeek
);
5693 assert(iter
->Valid() && iter
->key() == key
);
5694 thread
->stats
.FinishedOps(nullptr, db
, 1, kSeek
);
5698 #ifndef ROCKSDB_LITE
5699 // This benchmark stress tests Transactions. For a given --duration (or
5700 // total number of --writes, a Transaction will perform a read-modify-write
5701 // to increment the value of a key in each of N(--transaction-sets) sets of
5702 // keys (where each set has --num keys). If --threads is set, this will be
5703 // done in parallel.
5705 // To test transactions, use --transaction_db=true. Not setting this
5707 // will run the same benchmark without transactions.
5709 // RandomTransactionVerify() will then validate the correctness of the results
5710 // by checking if the sum of all keys in each set is the same.
5711 void RandomTransaction(ThreadState
* thread
) {
5712 ReadOptions
options(FLAGS_verify_checksum
, true);
5713 Duration
duration(FLAGS_duration
, readwrites_
);
5714 ReadOptions
read_options(FLAGS_verify_checksum
, true);
5715 uint16_t num_prefix_ranges
= static_cast<uint16_t>(FLAGS_transaction_sets
);
5716 uint64_t transactions_done
= 0;
5718 if (num_prefix_ranges
== 0 || num_prefix_ranges
> 9999) {
5719 fprintf(stderr
, "invalid value for transaction_sets\n");
5723 TransactionOptions txn_options
;
5724 txn_options
.lock_timeout
= FLAGS_transaction_lock_timeout
;
5725 txn_options
.set_snapshot
= FLAGS_transaction_set_snapshot
;
5727 RandomTransactionInserter
inserter(&thread
->rand
, write_options_
,
5728 read_options
, FLAGS_num
,
5731 if (FLAGS_num_multi_db
> 1) {
5733 "Cannot run RandomTransaction benchmark with "
5734 "FLAGS_multi_db > 1.");
5738 while (!duration
.Done(1)) {
5741 // RandomTransactionInserter will attempt to insert a key for each
5742 // # of FLAGS_transaction_sets
5743 if (FLAGS_optimistic_transaction_db
) {
5744 success
= inserter
.OptimisticTransactionDBInsert(db_
.opt_txn_db
);
5745 } else if (FLAGS_transaction_db
) {
5746 TransactionDB
* txn_db
= reinterpret_cast<TransactionDB
*>(db_
.db
);
5747 success
= inserter
.TransactionDBInsert(txn_db
, txn_options
);
5749 success
= inserter
.DBInsert(db_
.db
);
5753 fprintf(stderr
, "Unexpected error: %s\n",
5754 inserter
.GetLastStatus().ToString().c_str());
5758 thread
->stats
.FinishedOps(nullptr, db_
.db
, 1, kOthers
);
5759 transactions_done
++;
5763 if (FLAGS_optimistic_transaction_db
|| FLAGS_transaction_db
) {
5764 snprintf(msg
, sizeof(msg
),
5765 "( transactions:%" PRIu64
" aborts:%" PRIu64
")",
5766 transactions_done
, inserter
.GetFailureCount());
5768 snprintf(msg
, sizeof(msg
), "( batches:%" PRIu64
" )", transactions_done
);
5770 thread
->stats
.AddMessage(msg
);
5772 if (FLAGS_perf_level
> rocksdb::PerfLevel::kDisable
) {
5773 thread
->stats
.AddMessage(std::string("PERF_CONTEXT:\n") +
5774 get_perf_context()->ToString());
5776 thread
->stats
.AddBytes(static_cast<int64_t>(inserter
.GetBytesInserted()));
5779 // Verifies consistency of data after RandomTransaction() has been run.
5780 // Since each iteration of RandomTransaction() incremented a key in each set
5781 // by the same value, the sum of the keys in each set should be the same.
5782 void RandomTransactionVerify() {
5783 if (!FLAGS_transaction_db
&& !FLAGS_optimistic_transaction_db
) {
5784 // transactions not used, nothing to verify.
5789 RandomTransactionInserter::Verify(db_
.db
,
5790 static_cast<uint16_t>(FLAGS_transaction_sets
));
5793 fprintf(stdout
, "RandomTransactionVerify Success.\n");
5795 fprintf(stdout
, "RandomTransactionVerify FAILED!!\n");
5798 #endif // ROCKSDB_LITE
5800 // Writes and deletes random keys without overwriting keys.
5802 // This benchmark is intended to partially replicate the behavior of MyRocks
5803 // secondary indices: All data is stored in keys and updates happen by
5804 // deleting the old version of the key and inserting the new version.
5805 void RandomReplaceKeys(ThreadState
* thread
) {
5806 std::unique_ptr
<const char[]> key_guard
;
5807 Slice key
= AllocateKey(&key_guard
);
5808 std::vector
<uint32_t> counters(FLAGS_numdistinct
, 0);
5809 size_t max_counter
= 50;
5810 RandomGenerator gen
;
5813 DB
* db
= SelectDB(thread
);
5814 for (int64_t i
= 0; i
< FLAGS_numdistinct
; i
++) {
5815 GenerateKeyFromInt(i
* max_counter
, FLAGS_num
, &key
);
5816 s
= db
->Put(write_options_
, key
, gen
.Generate(value_size_
));
5818 fprintf(stderr
, "Operation failed: %s\n", s
.ToString().c_str());
5825 std::default_random_engine generator
;
5826 std::normal_distribution
<double> distribution(FLAGS_numdistinct
/ 2.0,
5828 Duration
duration(FLAGS_duration
, FLAGS_num
);
5829 while (!duration
.Done(1)) {
5830 int64_t rnd_id
= static_cast<int64_t>(distribution(generator
));
5831 int64_t key_id
= std::max(std::min(FLAGS_numdistinct
- 1, rnd_id
),
5832 static_cast<int64_t>(0));
5833 GenerateKeyFromInt(key_id
* max_counter
+ counters
[key_id
], FLAGS_num
,
5835 s
= FLAGS_use_single_deletes
? db
->SingleDelete(write_options_
, key
)
5836 : db
->Delete(write_options_
, key
);
5838 counters
[key_id
] = (counters
[key_id
] + 1) % max_counter
;
5839 GenerateKeyFromInt(key_id
* max_counter
+ counters
[key_id
], FLAGS_num
,
5841 s
= db
->Put(write_options_
, key
, Slice());
5845 fprintf(stderr
, "Operation failed: %s\n", s
.ToString().c_str());
5849 thread
->stats
.FinishedOps(nullptr, db
, 1, kOthers
);
5853 snprintf(msg
, sizeof(msg
),
5854 "use single deletes: %d, "
5855 "standard deviation: %lf\n",
5856 FLAGS_use_single_deletes
, FLAGS_stddev
);
5857 thread
->stats
.AddMessage(msg
);
5860 void TimeSeriesReadOrDelete(ThreadState
* thread
, bool do_deletion
) {
5861 ReadOptions
options(FLAGS_verify_checksum
, true);
5866 Iterator
* iter
= nullptr;
5867 // Only work on single database
5868 assert(db_
.db
!= nullptr);
5869 iter
= db_
.db
->NewIterator(options
);
5871 std::unique_ptr
<const char[]> key_guard
;
5872 Slice key
= AllocateKey(&key_guard
);
5874 char value_buffer
[256];
5877 MutexLock
l(&thread
->shared
->mu
);
5878 if (thread
->shared
->num_done
>= 1) {
5879 // Write thread have finished
5883 if (!FLAGS_use_tailing_iterator
) {
5885 iter
= db_
.db
->NewIterator(options
);
5887 // Pick a Iterator to use
5889 int64_t key_id
= thread
->rand
.Next() % FLAGS_key_id_range
;
5890 GenerateKeyFromInt(key_id
, FLAGS_num
, &key
);
5891 // Reset last 8 bytes to 0
5892 char* start
= const_cast<char*>(key
.data());
5893 start
+= key
.size() - 8;
5894 memset(start
, 0, 8);
5897 bool key_found
= false;
5899 for (iter
->Seek(key
); iter
->Valid() && iter
->key().starts_with(key
);
5902 // Copy out iterator's value to make sure we read them.
5904 bytes
+= iter
->key().size();
5905 if (KeyExpired(timestamp_emulator_
.get(), iter
->key())) {
5906 thread
->stats
.FinishedOps(&db_
, db_
.db
, 1, kDelete
);
5907 db_
.db
->Delete(write_options_
, iter
->key());
5912 bytes
+= iter
->key().size() + iter
->value().size();
5913 thread
->stats
.FinishedOps(&db_
, db_
.db
, 1, kRead
);
5914 Slice value
= iter
->value();
5915 memcpy(value_buffer
, value
.data(),
5916 std::min(value
.size(), sizeof(value_buffer
)));
5918 assert(iter
->status().ok());
5923 if (thread
->shared
->read_rate_limiter
.get() != nullptr) {
5924 thread
->shared
->read_rate_limiter
->Request(
5925 1, Env::IO_HIGH
, nullptr /* stats */, RateLimiter::OpType::kRead
);
5931 snprintf(msg
, sizeof(msg
), "(%" PRIu64
" of %" PRIu64
" found)", found
,
5933 thread
->stats
.AddBytes(bytes
);
5934 thread
->stats
.AddMessage(msg
);
5935 if (FLAGS_perf_level
> rocksdb::PerfLevel::kDisable
) {
5936 thread
->stats
.AddMessage(std::string("PERF_CONTEXT:\n") +
5937 get_perf_context()->ToString());
5941 void TimeSeriesWrite(ThreadState
* thread
) {
5942 // Special thread that keeps writing until other threads are done.
5943 RandomGenerator gen
;
5946 // Don't merge stats from this thread with the readers.
5947 thread
->stats
.SetExcludeFromMerge();
5949 std::unique_ptr
<RateLimiter
> write_rate_limiter
;
5950 if (FLAGS_benchmark_write_rate_limit
> 0) {
5951 write_rate_limiter
.reset(
5952 NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit
));
5955 std::unique_ptr
<const char[]> key_guard
;
5956 Slice key
= AllocateKey(&key_guard
);
5958 Duration
duration(FLAGS_duration
, writes_
);
5959 while (!duration
.Done(1)) {
5960 DB
* db
= SelectDB(thread
);
5962 uint64_t key_id
= thread
->rand
.Next() % FLAGS_key_id_range
;
5964 GenerateKeyFromInt(key_id
, FLAGS_num
, &key
);
5967 char* start
= const_cast<char*>(key
.data());
5968 char* pos
= start
+ 8;
5970 std::min(key_size_
- static_cast<int>(pos
- start
), 8);
5971 uint64_t timestamp_value
= timestamp_emulator_
->Get();
5972 if (port::kLittleEndian
) {
5973 for (int i
= 0; i
< bytes_to_fill
; ++i
) {
5974 pos
[i
] = (timestamp_value
>> ((bytes_to_fill
- i
- 1) << 3)) & 0xFF;
5977 memcpy(pos
, static_cast<void*>(×tamp_value
), bytes_to_fill
);
5980 timestamp_emulator_
->Inc();
5984 s
= db
->Put(write_options_
, key
, gen
.Generate(value_size_
));
5987 fprintf(stderr
, "put error: %s\n", s
.ToString().c_str());
5990 bytes
= key
.size() + value_size_
;
5991 thread
->stats
.FinishedOps(&db_
, db_
.db
, 1, kWrite
);
5992 thread
->stats
.AddBytes(bytes
);
5994 if (FLAGS_benchmark_write_rate_limit
> 0) {
5995 write_rate_limiter
->Request(
5996 entries_per_batch_
* (value_size_
+ key_size_
), Env::IO_HIGH
,
5997 nullptr /* stats */, RateLimiter::OpType::kWrite
);
6002 void TimeSeries(ThreadState
* thread
) {
6003 if (thread
->tid
> 0) {
6004 bool do_deletion
= FLAGS_expire_style
== "delete" &&
6005 thread
->tid
<= FLAGS_num_deletion_threads
;
6006 TimeSeriesReadOrDelete(thread
, do_deletion
);
6008 TimeSeriesWrite(thread
);
6009 thread
->stats
.Stop();
6010 thread
->stats
.Report("timeseries write");
6014 void Compact(ThreadState
* thread
) {
6015 DB
* db
= SelectDB(thread
);
6016 CompactRangeOptions cro
;
6017 cro
.bottommost_level_compaction
= BottommostLevelCompaction::kForce
;
6018 db
->CompactRange(cro
, nullptr, nullptr);
6022 if (db_
.db
!= nullptr) {
6023 db_
.db
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
6025 for (const auto& db_with_cfh
: multi_dbs_
) {
6026 db_with_cfh
.db
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
6031 if (db_
.db
!= nullptr) {
6032 db_
.db
->ResetStats();
6034 for (const auto& db_with_cfh
: multi_dbs_
) {
6035 db_with_cfh
.db
->ResetStats();
6039 void PrintStats(const char* key
) {
6040 if (db_
.db
!= nullptr) {
6041 PrintStats(db_
.db
, key
, false);
6043 for (const auto& db_with_cfh
: multi_dbs_
) {
6044 PrintStats(db_with_cfh
.db
, key
, true);
6048 void PrintStats(DB
* db
, const char* key
, bool print_header
= false) {
6050 fprintf(stdout
, "\n==== DB: %s ===\n", db
->GetName().c_str());
6053 if (!db
->GetProperty(key
, &stats
)) {
6056 fprintf(stdout
, "\n%s\n", stats
.c_str());
6059 void Replay(ThreadState
* thread
) {
6060 if (db_
.db
!= nullptr) {
6061 Replay(thread
, &db_
);
6065 void Replay(ThreadState
* /*thread*/, DBWithColumnFamilies
* db_with_cfh
) {
6067 std::unique_ptr
<TraceReader
> trace_reader
;
6068 s
= NewFileTraceReader(FLAGS_env
, EnvOptions(), FLAGS_trace_file
,
6073 "Encountered an error creating a TraceReader from the trace file. "
6075 s
.ToString().c_str());
6078 Replayer
replayer(db_with_cfh
->db
, db_with_cfh
->cfh
,
6079 std::move(trace_reader
));
6080 s
= replayer
.Replay();
6082 fprintf(stdout
, "Replay started from trace_file: %s\n",
6083 FLAGS_trace_file
.c_str());
6085 fprintf(stderr
, "Starting replay failed. Error: %s\n",
6086 s
.ToString().c_str());
6091 int db_bench_tool(int argc
, char** argv
) {
6092 rocksdb::port::InstallStackTraceHandler();
6093 static bool initialized
= false;
6095 SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv
[0]) +
6099 ParseCommandLineFlags(&argc
, &argv
, true);
6100 FLAGS_compaction_style_e
= (rocksdb::CompactionStyle
) FLAGS_compaction_style
;
6101 #ifndef ROCKSDB_LITE
6102 if (FLAGS_statistics
&& !FLAGS_statistics_string
.empty()) {
6104 "Cannot provide both --statistics and --statistics_string.\n");
6107 if (!FLAGS_statistics_string
.empty()) {
6108 std::unique_ptr
<Statistics
> custom_stats_guard
;
6109 dbstats
.reset(NewCustomObject
<Statistics
>(FLAGS_statistics_string
,
6110 &custom_stats_guard
));
6111 custom_stats_guard
.release();
6112 if (dbstats
== nullptr) {
6113 fprintf(stderr
, "No Statistics registered matching string: %s\n",
6114 FLAGS_statistics_string
.c_str());
6118 #endif // ROCKSDB_LITE
6119 if (FLAGS_statistics
) {
6120 dbstats
= rocksdb::CreateDBStatistics();
6123 dbstats
->set_stats_level(static_cast<StatsLevel
>(FLAGS_stats_level
));
6125 FLAGS_compaction_pri_e
= (rocksdb::CompactionPri
)FLAGS_compaction_pri
;
6127 std::vector
<std::string
> fanout
= rocksdb::StringSplit(
6128 FLAGS_max_bytes_for_level_multiplier_additional
, ',');
6129 for (size_t j
= 0; j
< fanout
.size(); j
++) {
6130 FLAGS_max_bytes_for_level_multiplier_additional_v
.push_back(
6132 std::stoi(fanout
[j
]));
6138 FLAGS_compression_type_e
=
6139 StringToCompressionType(FLAGS_compression_type
.c_str());
6141 #ifndef ROCKSDB_LITE
6142 std::unique_ptr
<Env
> custom_env_guard
;
6143 if (!FLAGS_hdfs
.empty() && !FLAGS_env_uri
.empty()) {
6144 fprintf(stderr
, "Cannot provide both --hdfs and --env_uri.\n");
6146 } else if (!FLAGS_env_uri
.empty()) {
6147 FLAGS_env
= NewCustomObject
<Env
>(FLAGS_env_uri
, &custom_env_guard
);
6148 if (FLAGS_env
== nullptr) {
6149 fprintf(stderr
, "No Env registered for URI: %s\n", FLAGS_env_uri
.c_str());
6153 #endif // ROCKSDB_LITE
6154 if (FLAGS_use_existing_keys
&& !FLAGS_use_existing_db
) {
6156 "`-use_existing_db` must be true for `-use_existing_keys` to be "
6161 if (!FLAGS_hdfs
.empty()) {
6162 FLAGS_env
= new rocksdb::HdfsEnv(FLAGS_hdfs
);
6165 if (!strcasecmp(FLAGS_compaction_fadvice
.c_str(), "NONE"))
6166 FLAGS_compaction_fadvice_e
= rocksdb::Options::NONE
;
6167 else if (!strcasecmp(FLAGS_compaction_fadvice
.c_str(), "NORMAL"))
6168 FLAGS_compaction_fadvice_e
= rocksdb::Options::NORMAL
;
6169 else if (!strcasecmp(FLAGS_compaction_fadvice
.c_str(), "SEQUENTIAL"))
6170 FLAGS_compaction_fadvice_e
= rocksdb::Options::SEQUENTIAL
;
6171 else if (!strcasecmp(FLAGS_compaction_fadvice
.c_str(), "WILLNEED"))
6172 FLAGS_compaction_fadvice_e
= rocksdb::Options::WILLNEED
;
6174 fprintf(stdout
, "Unknown compaction fadvice:%s\n",
6175 FLAGS_compaction_fadvice
.c_str());
6178 FLAGS_rep_factory
= StringToRepFactory(FLAGS_memtablerep
.c_str());
6180 // Note options sanitization may increase thread pool sizes according to
6181 // max_background_flushes/max_background_compactions/max_background_jobs
6182 FLAGS_env
->SetBackgroundThreads(FLAGS_num_high_pri_threads
,
6183 rocksdb::Env::Priority::HIGH
);
6184 FLAGS_env
->SetBackgroundThreads(FLAGS_num_bottom_pri_threads
,
6185 rocksdb::Env::Priority::BOTTOM
);
6186 FLAGS_env
->SetBackgroundThreads(FLAGS_num_low_pri_threads
,
6187 rocksdb::Env::Priority::LOW
);
6189 // Choose a location for the test database if none given with --db=<path>
6190 if (FLAGS_db
.empty()) {
6191 std::string default_db_path
;
6192 rocksdb::Env::Default()->GetTestDirectory(&default_db_path
);
6193 default_db_path
+= "/dbbench";
6194 FLAGS_db
= default_db_path
;
6197 if (FLAGS_stats_interval_seconds
> 0) {
6198 // When both are set then FLAGS_stats_interval determines the frequency
6199 // at which the timer is checked for FLAGS_stats_interval_seconds
6200 FLAGS_stats_interval
= 1000;
6203 rocksdb::Benchmark benchmark
;
6206 #ifndef ROCKSDB_LITE
6207 if (FLAGS_print_malloc_stats
) {
6208 std::string stats_string
;
6209 rocksdb::DumpMallocStats(&stats_string
);
6210 fprintf(stdout
, "Malloc stats:\n%s\n", stats_string
.c_str());
6212 #endif // ROCKSDB_LITE
6216 } // namespace rocksdb