1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // The test uses an array to compare against values written to the database.
11 // Keys written to the array are in 1:1 correspondence to the actual values in
12 // the database according to the formula in the function GenerateValue.
14 // Space is reserved in the array from 0 to FLAGS_max_key and values are
15 // randomly written/deleted/read from those positions. During verification we
16 // compare all the positions in the array. To shorten/elongate the running
17 // time, you could change the settings: FLAGS_max_key, FLAGS_ops_per_thread,
18 // (sometimes also FLAGS_threads).
20 // NOTE that if FLAGS_test_batches_snapshots is set, the test will have
21 // different behavior. See comment of the flag for details.
26 fprintf(stderr
, "Please install gflags to run rocksdb tools\n");
31 #ifndef __STDC_FORMAT_MACROS
32 #define __STDC_FORMAT_MACROS
33 #endif // __STDC_FORMAT_MACROS
39 #include <sys/types.h>
46 #include "db/db_impl.h"
47 #include "db/version_set.h"
48 #include "hdfs/env_hdfs.h"
49 #include "monitoring/histogram.h"
50 #include "options/options_helper.h"
51 #include "port/port.h"
52 #include "rocksdb/cache.h"
53 #include "rocksdb/env.h"
54 #include "rocksdb/slice.h"
55 #include "rocksdb/slice_transform.h"
56 #include "rocksdb/statistics.h"
57 #include "rocksdb/utilities/backupable_db.h"
58 #include "rocksdb/utilities/checkpoint.h"
59 #include "rocksdb/utilities/db_ttl.h"
60 #include "rocksdb/utilities/options_util.h"
61 #include "rocksdb/utilities/transaction.h"
62 #include "rocksdb/utilities/transaction_db.h"
63 #include "rocksdb/write_batch.h"
64 #include "util/coding.h"
65 #include "util/compression.h"
66 #include "util/crc32c.h"
67 #include "util/gflags_compat.h"
68 #include "util/logging.h"
69 #include "util/mutexlock.h"
70 #include "util/random.h"
71 #include "util/string_util.h"
72 // SyncPoint is not supported in Released Windows Mode.
73 #if !(defined NDEBUG) || !defined(OS_WIN)
74 #include "util/sync_point.h"
75 #endif // !(defined NDEBUG) || !defined(OS_WIN)
76 #include "util/testutil.h"
78 #include "utilities/merge_operators.h"
80 using GFLAGS_NAMESPACE::ParseCommandLineFlags
;
81 using GFLAGS_NAMESPACE::RegisterFlagValidator
;
82 using GFLAGS_NAMESPACE::SetUsageMessage
;
84 static const long KB
= 1024;
85 static const int kRandomValueMaxFactor
= 3;
86 static const int kValueMaxLen
= 100;
88 static bool ValidateUint32Range(const char* flagname
, uint64_t value
) {
89 if (value
> std::numeric_limits
<uint32_t>::max()) {
91 "Invalid value for --%s: %lu, overflow\n",
93 (unsigned long)value
);
99 DEFINE_uint64(seed
, 2341234, "Seed for PRNG");
100 static const bool FLAGS_seed_dummy
__attribute__((__unused__
)) =
101 RegisterFlagValidator(&FLAGS_seed
, &ValidateUint32Range
);
103 DEFINE_int64(max_key
, 1 * KB
* KB
,
104 "Max number of key/values to place in database");
106 DEFINE_int32(column_families
, 10, "Number of column families");
110 "The path to a RocksDB options file. If specified, then db_stress will "
111 "run with the RocksDB options in the default column family of the "
112 "specified options file. Note that, when an options file is provided, "
113 "db_stress will ignore the flag values for all options that may be passed "
114 "via options file.");
118 "Number of keys in active span of the key-range at any given time. The "
119 "span begins with its left endpoint at key 0, gradually moves rightwards, "
120 "and ends with its right endpoint at max_key. If set to 0, active_width "
121 "will be sanitized to be equal to max_key.");
123 // TODO(noetzli) Add support for single deletes
124 DEFINE_bool(test_batches_snapshots
, false,
125 "If set, the test uses MultiGet(), MultiPut() and MultiDelete()"
126 " which read/write/delete multiple keys in a batch. In this mode,"
127 " we do not verify db content by comparing the content with the "
128 "pre-allocated array. Instead, we do partial verification inside"
129 " MultiGet() by checking various values in a batch. Benefit of"
131 "\t(a) No need to acquire mutexes during writes (less cache "
132 "flushes in multi-core leading to speed up)\n"
133 "\t(b) No long validation at the end (more speed up)\n"
134 "\t(c) Test snapshot and atomicity of batch writes");
136 DEFINE_int32(threads
, 32, "Number of concurrent threads to run.");
138 DEFINE_int32(ttl
, -1,
139 "Opens the db with this ttl value if this is not -1. "
140 "Carefully specify a large value such that verifications on "
141 "deleted values don't fail");
143 DEFINE_int32(value_size_mult
, 8,
144 "Size of value will be this number times rand_int(1,3) bytes");
146 DEFINE_int32(compaction_readahead_size
, 0, "Compaction readahead size");
148 DEFINE_bool(enable_pipelined_write
, false, "Pipeline WAL/memtable writes");
150 DEFINE_bool(verify_before_write
, false, "Verify before write");
152 DEFINE_bool(histogram
, false, "Print histogram of operation timings");
154 DEFINE_bool(destroy_db_initially
, true,
155 "Destroys the database dir before start if this is true");
157 DEFINE_bool(verbose
, false, "Verbose");
159 DEFINE_bool(progress_reports
, true,
160 "If true, db_stress will report number of finished operations");
162 DEFINE_uint64(db_write_buffer_size
, rocksdb::Options().db_write_buffer_size
,
163 "Number of bytes to buffer in all memtables before compacting");
165 DEFINE_int32(write_buffer_size
,
166 static_cast<int32_t>(rocksdb::Options().write_buffer_size
),
167 "Number of bytes to buffer in memtable before compacting");
169 DEFINE_int32(max_write_buffer_number
,
170 rocksdb::Options().max_write_buffer_number
,
171 "The number of in-memory memtables. "
172 "Each memtable is of size FLAGS_write_buffer_size.");
174 DEFINE_int32(min_write_buffer_number_to_merge
,
175 rocksdb::Options().min_write_buffer_number_to_merge
,
176 "The minimum number of write buffers that will be merged together "
177 "before writing to storage. This is cheap because it is an "
178 "in-memory merge. If this feature is not enabled, then all these "
179 "write buffers are flushed to L0 as separate files and this "
180 "increases read amplification because a get request has to check "
181 "in all of these files. Also, an in-memory merge may result in "
182 "writing less data to storage if there are duplicate records in"
183 " each of these individual write buffers.");
185 DEFINE_int32(max_write_buffer_number_to_maintain
,
186 rocksdb::Options().max_write_buffer_number_to_maintain
,
187 "The total maximum number of write buffers to maintain in memory "
188 "including copies of buffers that have already been flushed. "
189 "Unlike max_write_buffer_number, this parameter does not affect "
190 "flushing. This controls the minimum amount of write history "
191 "that will be available in memory for conflict checking when "
192 "Transactions are used. If this value is too low, some "
193 "transactions may fail at commit time due to not being able to "
194 "determine whether there were any write conflicts. Setting this "
195 "value to 0 will cause write buffers to be freed immediately "
196 "after they are flushed. If this value is set to -1, "
197 "'max_write_buffer_number' will be used.");
199 DEFINE_double(memtable_prefix_bloom_size_ratio
,
200 rocksdb::Options().memtable_prefix_bloom_size_ratio
,
201 "creates prefix blooms for memtables, each with size "
202 "`write_buffer_size * memtable_prefix_bloom_size_ratio`.");
204 DEFINE_int32(open_files
, rocksdb::Options().max_open_files
,
205 "Maximum number of files to keep open at the same time "
206 "(use default if == 0)");
208 DEFINE_int64(compressed_cache_size
, -1,
209 "Number of bytes to use as a cache of compressed data."
210 " Negative means use default settings.");
212 DEFINE_int32(compaction_style
, rocksdb::Options().compaction_style
, "");
214 DEFINE_int32(level0_file_num_compaction_trigger
,
215 rocksdb::Options().level0_file_num_compaction_trigger
,
216 "Level0 compaction start trigger");
218 DEFINE_int32(level0_slowdown_writes_trigger
,
219 rocksdb::Options().level0_slowdown_writes_trigger
,
220 "Number of files in level-0 that will slow down writes");
222 DEFINE_int32(level0_stop_writes_trigger
,
223 rocksdb::Options().level0_stop_writes_trigger
,
224 "Number of files in level-0 that will trigger put stop.");
226 DEFINE_int32(block_size
,
227 static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size
),
228 "Number of bytes in a block.");
232 static_cast<int32_t>(rocksdb::BlockBasedTableOptions().format_version
),
233 "Format version of SST files.");
235 DEFINE_int32(index_block_restart_interval
,
236 rocksdb::BlockBasedTableOptions().index_block_restart_interval
,
237 "Number of keys between restart points "
238 "for delta encoding of keys in index block.");
240 DEFINE_int32(max_background_compactions
,
241 rocksdb::Options().max_background_compactions
,
242 "The maximum number of concurrent background compactions "
243 "that can occur in parallel.");
245 DEFINE_int32(num_bottom_pri_threads
, 0,
246 "The number of threads in the bottom-priority thread pool (used "
247 "by universal compaction only).");
249 DEFINE_int32(compaction_thread_pool_adjust_interval
, 0,
250 "The interval (in milliseconds) to adjust compaction thread pool "
251 "size. Don't change it periodically if the value is 0.");
253 DEFINE_int32(compaction_thread_pool_variations
, 2,
254 "Range of background thread pool size variations when adjusted "
257 DEFINE_int32(max_background_flushes
, rocksdb::Options().max_background_flushes
,
258 "The maximum number of concurrent background flushes "
259 "that can occur in parallel.");
261 DEFINE_int32(universal_size_ratio
, 0, "The ratio of file sizes that trigger"
262 " compaction in universal style");
264 DEFINE_int32(universal_min_merge_width
, 0, "The minimum number of files to "
265 "compact in universal style compaction");
267 DEFINE_int32(universal_max_merge_width
, 0, "The max number of files to compact"
268 " in universal style compaction");
270 DEFINE_int32(universal_max_size_amplification_percent
, 0,
271 "The max size amplification for universal style compaction");
273 DEFINE_int32(clear_column_family_one_in
, 1000000,
274 "With a chance of 1/N, delete a column family and then recreate "
275 "it again. If N == 0, never drop/create column families. "
276 "When test_batches_snapshots is true, this flag has no effect");
278 DEFINE_int32(set_options_one_in
, 0,
279 "With a chance of 1/N, change some random options");
281 DEFINE_int32(set_in_place_one_in
, 0,
282 "With a chance of 1/N, toggle in place support option");
284 DEFINE_int64(cache_size
, 2LL * KB
* KB
* KB
,
285 "Number of bytes to use as a cache of uncompressed data.");
287 DEFINE_bool(use_clock_cache
, false,
288 "Replace default LRU block cache with clock cache.");
290 DEFINE_uint64(subcompactions
, 1,
291 "Maximum number of subcompactions to divide L0-L1 compactions "
294 DEFINE_bool(allow_concurrent_memtable_write
, false,
295 "Allow multi-writers to update mem tables in parallel.");
297 DEFINE_bool(enable_write_thread_adaptive_yield
, true,
298 "Use a yielding spin loop for brief writer thread waits.");
300 static const bool FLAGS_subcompactions_dummy
__attribute__((__unused__
)) =
301 RegisterFlagValidator(&FLAGS_subcompactions
, &ValidateUint32Range
);
303 static bool ValidateInt32Positive(const char* flagname
, int32_t value
) {
305 fprintf(stderr
, "Invalid value for --%s: %d, must be >=0\n",
311 DEFINE_int32(reopen
, 10, "Number of times database reopens");
312 static const bool FLAGS_reopen_dummy
__attribute__((__unused__
)) =
313 RegisterFlagValidator(&FLAGS_reopen
, &ValidateInt32Positive
);
315 DEFINE_int32(bloom_bits
, 10, "Bloom filter bits per key. "
316 "Negative means use default settings.");
318 DEFINE_bool(use_block_based_filter
, false, "use block based filter"
319 "instead of full filter for block based table");
321 DEFINE_string(db
, "", "Use the db with the following name.");
324 expected_values_path
, "",
325 "File where the array of expected uint32_t values will be stored. If "
326 "provided and non-empty, the DB state will be verified against these "
327 "values after recovery. --max_key and --column_family must be kept the "
328 "same across invocations of this program that use the same "
329 "--expected_values_path.");
331 DEFINE_bool(verify_checksum
, false,
332 "Verify checksum for every block read from storage");
334 DEFINE_bool(mmap_read
, rocksdb::Options().allow_mmap_reads
,
335 "Allow reads to occur via mmap-ing files");
337 DEFINE_bool(mmap_write
, rocksdb::Options().allow_mmap_writes
,
338 "Allow writes to occur via mmap-ing files");
340 DEFINE_bool(use_direct_reads
, rocksdb::Options().use_direct_reads
,
341 "Use O_DIRECT for reading data");
343 DEFINE_bool(use_direct_io_for_flush_and_compaction
,
344 rocksdb::Options().use_direct_io_for_flush_and_compaction
,
345 "Use O_DIRECT for writing data");
347 // Database statistics
348 static std::shared_ptr
<rocksdb::Statistics
> dbstats
;
349 DEFINE_bool(statistics
, false, "Create database statistics");
351 DEFINE_bool(sync
, false, "Sync all writes to disk");
353 DEFINE_bool(use_fsync
, false, "If true, issue fsync instead of fdatasync");
355 DEFINE_int32(kill_random_test
, 0,
356 "If non-zero, kill at various points in source code with "
357 "probability 1/this");
358 static const bool FLAGS_kill_random_test_dummy
__attribute__((__unused__
)) =
359 RegisterFlagValidator(&FLAGS_kill_random_test
, &ValidateInt32Positive
);
360 extern int rocksdb_kill_odds
;
362 DEFINE_string(kill_prefix_blacklist
, "",
363 "If non-empty, kill points with prefix in the list given will be"
364 " skipped. Items are comma-separated.");
365 extern std::vector
<std::string
> rocksdb_kill_prefix_blacklist
;
367 DEFINE_bool(disable_wal
, false, "If true, do not write WAL for write.");
369 DEFINE_int64(target_file_size_base
, rocksdb::Options().target_file_size_base
,
370 "Target level-1 file size for compaction");
372 DEFINE_int32(target_file_size_multiplier
, 1,
373 "A multiplier to compute target level-N file size (N >= 2)");
375 DEFINE_uint64(max_bytes_for_level_base
,
376 rocksdb::Options().max_bytes_for_level_base
,
377 "Max bytes for level-1");
379 DEFINE_double(max_bytes_for_level_multiplier
, 2,
380 "A multiplier to compute max bytes for level-N (N >= 2)");
382 DEFINE_int32(range_deletion_width
, 10,
383 "The width of the range deletion intervals.");
385 DEFINE_uint64(rate_limiter_bytes_per_sec
, 0, "Set options.rate_limiter value.");
387 DEFINE_bool(rate_limit_bg_reads
, false,
388 "Use options.rate_limiter on compaction reads");
390 DEFINE_bool(use_txn
, false,
391 "Use TransactionDB. Currently the default write policy is "
392 "TxnDBWritePolicy::WRITE_PREPARED");
394 DEFINE_int32(backup_one_in
, 0,
395 "If non-zero, then CreateNewBackup() will be called once for "
396 "every N operations on average. 0 indicates CreateNewBackup() "
399 DEFINE_int32(checkpoint_one_in
, 0,
400 "If non-zero, then CreateCheckpoint() will be called once for "
401 "every N operations on average. 0 indicates CreateCheckpoint() "
404 DEFINE_int32(ingest_external_file_one_in
, 0,
405 "If non-zero, then IngestExternalFile() will be called once for "
406 "every N operations on average. 0 indicates IngestExternalFile() "
409 DEFINE_int32(ingest_external_file_width
, 1000,
410 "The width of the ingested external files.");
412 DEFINE_int32(compact_files_one_in
, 0,
413 "If non-zero, then CompactFiles() will be called once for every N "
414 "operations on average. 0 indicates CompactFiles() is disabled.");
416 DEFINE_int32(compact_range_one_in
, 0,
417 "If non-zero, then CompactRange() will be called once for every N "
418 "operations on average. 0 indicates CompactRange() is disabled.");
420 DEFINE_int32(flush_one_in
, 0,
421 "If non-zero, then Flush() will be called once for every N ops "
422 "on average. 0 indicates calls to Flush() are disabled.");
424 DEFINE_int32(compact_range_width
, 10000,
425 "The width of the ranges passed to CompactRange().");
427 DEFINE_int32(acquire_snapshot_one_in
, 0,
428 "If non-zero, then acquires a snapshot once every N operations on "
431 DEFINE_bool(compare_full_db_state_snapshot
, false,
432 "If set we compare state of entire db (in one of the threads) with"
435 DEFINE_uint64(snapshot_hold_ops
, 0,
436 "If non-zero, then releases snapshots N operations after they're "
439 static bool ValidateInt32Percent(const char* flagname
, int32_t value
) {
440 if (value
< 0 || value
>100) {
441 fprintf(stderr
, "Invalid value for --%s: %d, 0<= pct <=100 \n",
448 DEFINE_int32(readpercent
, 10,
449 "Ratio of reads to total workload (expressed as a percentage)");
450 static const bool FLAGS_readpercent_dummy
__attribute__((__unused__
)) =
451 RegisterFlagValidator(&FLAGS_readpercent
, &ValidateInt32Percent
);
453 DEFINE_int32(prefixpercent
, 20,
454 "Ratio of prefix iterators to total workload (expressed as a"
456 static const bool FLAGS_prefixpercent_dummy
__attribute__((__unused__
)) =
457 RegisterFlagValidator(&FLAGS_prefixpercent
, &ValidateInt32Percent
);
459 DEFINE_int32(writepercent
, 45,
460 "Ratio of writes to total workload (expressed as a percentage)");
461 static const bool FLAGS_writepercent_dummy
__attribute__((__unused__
)) =
462 RegisterFlagValidator(&FLAGS_writepercent
, &ValidateInt32Percent
);
464 DEFINE_int32(delpercent
, 15,
465 "Ratio of deletes to total workload (expressed as a percentage)");
466 static const bool FLAGS_delpercent_dummy
__attribute__((__unused__
)) =
467 RegisterFlagValidator(&FLAGS_delpercent
, &ValidateInt32Percent
);
469 DEFINE_int32(delrangepercent
, 0,
470 "Ratio of range deletions to total workload (expressed as a "
471 "percentage). Cannot be used with test_batches_snapshots");
472 static const bool FLAGS_delrangepercent_dummy
__attribute__((__unused__
)) =
473 RegisterFlagValidator(&FLAGS_delrangepercent
, &ValidateInt32Percent
);
475 DEFINE_int32(nooverwritepercent
, 60,
476 "Ratio of keys without overwrite to total workload (expressed as "
478 static const bool FLAGS_nooverwritepercent_dummy
__attribute__((__unused__
)) =
479 RegisterFlagValidator(&FLAGS_nooverwritepercent
, &ValidateInt32Percent
);
481 DEFINE_int32(iterpercent
, 10, "Ratio of iterations to total workload"
482 " (expressed as a percentage)");
483 static const bool FLAGS_iterpercent_dummy
__attribute__((__unused__
)) =
484 RegisterFlagValidator(&FLAGS_iterpercent
, &ValidateInt32Percent
);
486 DEFINE_uint64(num_iterations
, 10, "Number of iterations per MultiIterate run");
487 static const bool FLAGS_num_iterations_dummy
__attribute__((__unused__
)) =
488 RegisterFlagValidator(&FLAGS_num_iterations
, &ValidateUint32Range
);
491 enum rocksdb::CompressionType
StringToCompressionType(const char* ctype
) {
494 if (!strcasecmp(ctype
, "none"))
495 return rocksdb::kNoCompression
;
496 else if (!strcasecmp(ctype
, "snappy"))
497 return rocksdb::kSnappyCompression
;
498 else if (!strcasecmp(ctype
, "zlib"))
499 return rocksdb::kZlibCompression
;
500 else if (!strcasecmp(ctype
, "bzip2"))
501 return rocksdb::kBZip2Compression
;
502 else if (!strcasecmp(ctype
, "lz4"))
503 return rocksdb::kLZ4Compression
;
504 else if (!strcasecmp(ctype
, "lz4hc"))
505 return rocksdb::kLZ4HCCompression
;
506 else if (!strcasecmp(ctype
, "xpress"))
507 return rocksdb::kXpressCompression
;
508 else if (!strcasecmp(ctype
, "zstd"))
509 return rocksdb::kZSTD
;
511 fprintf(stderr
, "Cannot parse compression type '%s'\n", ctype
);
512 return rocksdb::kSnappyCompression
; //default value
515 enum rocksdb::ChecksumType
StringToChecksumType(const char* ctype
) {
517 auto iter
= rocksdb::checksum_type_string_map
.find(ctype
);
518 if (iter
!= rocksdb::checksum_type_string_map
.end()) {
521 fprintf(stderr
, "Cannot parse checksum type '%s'\n", ctype
);
522 return rocksdb::kCRC32c
;
525 std::string
ChecksumTypeToString(rocksdb::ChecksumType ctype
) {
526 auto iter
= std::find_if(
527 rocksdb::checksum_type_string_map
.begin(),
528 rocksdb::checksum_type_string_map
.end(),
529 [&](const std::pair
<std::string
, rocksdb::ChecksumType
>&
530 name_and_enum_val
) { return name_and_enum_val
.second
== ctype
; });
531 assert(iter
!= rocksdb::checksum_type_string_map
.end());
535 std::vector
<std::string
> SplitString(std::string src
) {
536 std::vector
<std::string
> ret
;
542 while ((pos_comma
= src
.find(',', pos
)) != std::string::npos
) {
543 ret
.push_back(src
.substr(pos
, pos_comma
- pos
));
546 ret
.push_back(src
.substr(pos
, src
.length()));
551 DEFINE_string(compression_type
, "snappy",
552 "Algorithm to use to compress the database");
553 static enum rocksdb::CompressionType FLAGS_compression_type_e
=
554 rocksdb::kSnappyCompression
;
556 DEFINE_int32(compression_max_dict_bytes
, 0,
557 "Maximum size of dictionary used to prime the compression "
560 DEFINE_int32(compression_zstd_max_train_bytes
, 0,
561 "Maximum size of training data passed to zstd's dictionary "
564 DEFINE_string(checksum_type
, "kCRC32c", "Algorithm to use to checksum blocks");
565 static enum rocksdb::ChecksumType FLAGS_checksum_type_e
= rocksdb::kCRC32c
;
567 DEFINE_string(hdfs
, "", "Name of hdfs environment");
568 // posix or hdfs environment
569 static rocksdb::Env
* FLAGS_env
= rocksdb::Env::Default();
571 DEFINE_uint64(ops_per_thread
, 1200000, "Number of operations per thread.");
572 static const bool FLAGS_ops_per_thread_dummy
__attribute__((__unused__
)) =
573 RegisterFlagValidator(&FLAGS_ops_per_thread
, &ValidateUint32Range
);
575 DEFINE_uint64(log2_keys_per_lock
, 2, "Log2 of number of keys per lock");
576 static const bool FLAGS_log2_keys_per_lock_dummy
__attribute__((__unused__
)) =
577 RegisterFlagValidator(&FLAGS_log2_keys_per_lock
, &ValidateUint32Range
);
579 DEFINE_uint64(max_manifest_file_size
, 16384, "Maximum size of a MANIFEST file");
581 DEFINE_bool(in_place_update
, false, "On true, does inplace update in memtable");
590 enum RepFactory
StringToRepFactory(const char* ctype
) {
593 if (!strcasecmp(ctype
, "skip_list"))
595 else if (!strcasecmp(ctype
, "prefix_hash"))
596 return kHashSkipList
;
597 else if (!strcasecmp(ctype
, "vector"))
600 fprintf(stdout
, "Cannot parse memreptable %s\n", ctype
);
605 #pragma warning(push)
606 // truncation of constant value on static_cast
607 #pragma warning(disable : 4309)
609 bool GetNextPrefix(const rocksdb::Slice
& src
, std::string
* v
) {
610 std::string ret
= src
.ToString();
611 for (int i
= static_cast<int>(ret
.size()) - 1; i
>= 0; i
--) {
612 if (ret
[i
] != static_cast<char>(255)) {
618 // all FF. No next prefix
630 static enum RepFactory FLAGS_rep_factory
;
631 DEFINE_string(memtablerep
, "prefix_hash", "");
633 static bool ValidatePrefixSize(const char* flagname
, int32_t value
) {
634 if (value
< 0 || value
> 8) {
635 fprintf(stderr
, "Invalid value for --%s: %d. 0 <= PrefixSize <= 8\n",
641 DEFINE_int32(prefix_size
, 7, "Control the prefix size for HashSkipListRep");
642 static const bool FLAGS_prefix_size_dummy
__attribute__((__unused__
)) =
643 RegisterFlagValidator(&FLAGS_prefix_size
, &ValidatePrefixSize
);
645 DEFINE_bool(use_merge
, false, "On true, replaces all writes with a Merge "
646 "that behaves like a Put");
648 DEFINE_bool(use_full_merge_v1
, false,
649 "On true, use a merge operator that implement the deprecated "
650 "version of FullMerge");
654 // convert long to a big-endian slice key
655 static std::string
Key(int64_t val
) {
656 std::string little_endian_key
;
657 std::string big_endian_key
;
658 PutFixed64(&little_endian_key
, val
);
659 assert(little_endian_key
.size() == sizeof(val
));
660 big_endian_key
.resize(sizeof(val
));
661 for (size_t i
= 0 ; i
< sizeof(val
); ++i
) {
662 big_endian_key
[i
] = little_endian_key
[sizeof(val
) - 1 - i
];
664 return big_endian_key
;
667 static bool GetIntVal(std::string big_endian_key
, uint64_t *key_p
) {
668 unsigned int size_key
= sizeof(*key_p
);
669 assert(big_endian_key
.size() == size_key
);
670 std::string little_endian_key
;
671 little_endian_key
.resize(size_key
);
672 for (size_t i
= 0 ; i
< size_key
; ++i
) {
673 little_endian_key
[i
] = big_endian_key
[size_key
- 1 - i
];
675 Slice little_endian_slice
= Slice(little_endian_key
);
676 return GetFixed64(&little_endian_slice
, key_p
);
679 static std::string
StringToHex(const std::string
& str
) {
680 std::string result
= "0x";
681 result
.append(Slice(str
).ToString(true));
699 size_t single_deletes_
;
700 long iterator_size_sums_
;
703 long range_deletions_
;
704 long covered_by_range_deletions_
;
706 long num_compact_files_succeed_
;
707 long num_compact_files_failed_
;
710 uint64_t last_op_finish_
;
725 iterator_size_sums_
= 0;
728 range_deletions_
= 0;
729 covered_by_range_deletions_
= 0;
733 num_compact_files_succeed_
= 0;
734 num_compact_files_failed_
= 0;
735 start_
= FLAGS_env
->NowMicros();
736 last_op_finish_
= start_
;
740 void Merge(const Stats
& other
) {
741 hist_
.Merge(other
.hist_
);
742 done_
+= other
.done_
;
743 gets_
+= other
.gets_
;
744 prefixes_
+= other
.prefixes_
;
745 writes_
+= other
.writes_
;
746 deletes_
+= other
.deletes_
;
747 single_deletes_
+= other
.single_deletes_
;
748 iterator_size_sums_
+= other
.iterator_size_sums_
;
749 founds_
+= other
.founds_
;
750 iterations_
+= other
.iterations_
;
751 range_deletions_
+= other
.range_deletions_
;
752 covered_by_range_deletions_
= other
.covered_by_range_deletions_
;
753 errors_
+= other
.errors_
;
754 bytes_
+= other
.bytes_
;
755 seconds_
+= other
.seconds_
;
756 num_compact_files_succeed_
+= other
.num_compact_files_succeed_
;
757 num_compact_files_failed_
+= other
.num_compact_files_failed_
;
758 if (other
.start_
< start_
) start_
= other
.start_
;
759 if (other
.finish_
> finish_
) finish_
= other
.finish_
;
763 finish_
= FLAGS_env
->NowMicros();
764 seconds_
= (finish_
- start_
) * 1e-6;
767 void FinishedSingleOp() {
768 if (FLAGS_histogram
) {
769 auto now
= FLAGS_env
->NowMicros();
770 auto micros
= now
- last_op_finish_
;
772 if (micros
> 20000) {
773 fprintf(stdout
, "long op: %" PRIu64
" micros%30s\r", micros
, "");
775 last_op_finish_
= now
;
779 if (FLAGS_progress_reports
) {
780 if (done_
>= next_report_
) {
781 if (next_report_
< 1000) next_report_
+= 100;
782 else if (next_report_
< 5000) next_report_
+= 500;
783 else if (next_report_
< 10000) next_report_
+= 1000;
784 else if (next_report_
< 50000) next_report_
+= 5000;
785 else if (next_report_
< 100000) next_report_
+= 10000;
786 else if (next_report_
< 500000) next_report_
+= 50000;
787 else next_report_
+= 100000;
788 fprintf(stdout
, "... finished %ld ops%30s\r", done_
, "");
793 void AddBytesForWrites(int nwrites
, size_t nbytes
) {
798 void AddGets(int ngets
, int nfounds
) {
803 void AddPrefixes(int nprefixes
, int count
) {
804 prefixes_
+= nprefixes
;
805 iterator_size_sums_
+= count
;
808 void AddIterations(int n
) {
812 void AddDeletes(int n
) {
816 void AddSingleDeletes(size_t n
) { single_deletes_
+= n
; }
818 void AddRangeDeletions(int n
) {
819 range_deletions_
+= n
;
822 void AddCoveredByRangeDeletions(int n
) {
823 covered_by_range_deletions_
+= n
;
826 void AddErrors(int n
) {
830 void AddNumCompactFilesSucceed(int n
) { num_compact_files_succeed_
+= n
; }
832 void AddNumCompactFilesFailed(int n
) { num_compact_files_failed_
+= n
; }
834 void Report(const char* name
) {
836 if (bytes_
< 1 || done_
< 1) {
837 fprintf(stderr
, "No writes or ops?\n");
841 double elapsed
= (finish_
- start_
) * 1e-6;
842 double bytes_mb
= bytes_
/ 1048576.0;
843 double rate
= bytes_mb
/ elapsed
;
844 double throughput
= (double)done_
/elapsed
;
846 fprintf(stdout
, "%-12s: ", name
);
847 fprintf(stdout
, "%.3f micros/op %ld ops/sec\n",
848 seconds_
* 1e6
/ done_
, (long)throughput
);
849 fprintf(stdout
, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n",
850 "", bytes_mb
, rate
, (100*writes_
)/done_
, done_
);
851 fprintf(stdout
, "%-12s: Wrote %ld times\n", "", writes_
);
852 fprintf(stdout
, "%-12s: Deleted %ld times\n", "", deletes_
);
853 fprintf(stdout
, "%-12s: Single deleted %" ROCKSDB_PRIszt
" times\n", "",
855 fprintf(stdout
, "%-12s: %ld read and %ld found the key\n", "",
857 fprintf(stdout
, "%-12s: Prefix scanned %ld times\n", "", prefixes_
);
858 fprintf(stdout
, "%-12s: Iterator size sum is %ld\n", "",
859 iterator_size_sums_
);
860 fprintf(stdout
, "%-12s: Iterated %ld times\n", "", iterations_
);
861 fprintf(stdout
, "%-12s: Deleted %ld key-ranges\n", "", range_deletions_
);
862 fprintf(stdout
, "%-12s: Range deletions covered %ld keys\n", "",
863 covered_by_range_deletions_
);
865 fprintf(stdout
, "%-12s: Got errors %ld times\n", "", errors_
);
866 fprintf(stdout
, "%-12s: %ld CompactFiles() succeed\n", "",
867 num_compact_files_succeed_
);
868 fprintf(stdout
, "%-12s: %ld CompactFiles() did not succeed\n", "",
869 num_compact_files_failed_
);
871 if (FLAGS_histogram
) {
872 fprintf(stdout
, "Microseconds per op:\n%s\n", hist_
.ToString().c_str());
878 // State shared by all concurrent executions of the same benchmark.
881 // indicates a key may have any value (or not be present) as an operation on
883 static const uint32_t UNKNOWN_SENTINEL
;
884 // indicates a key should definitely be deleted
885 static const uint32_t DELETION_SENTINEL
;
887 explicit SharedState(StressTest
* stress_test
)
889 seed_(static_cast<uint32_t>(FLAGS_seed
)),
890 max_key_(FLAGS_max_key
),
891 log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock
)),
892 num_threads_(FLAGS_threads
),
898 start_verify_(false),
899 should_stop_bg_thread_(false),
900 bg_thread_finished_(false),
901 stress_test_(stress_test
),
902 verification_failure_(false),
903 no_overwrite_ids_(FLAGS_column_families
),
905 // Pick random keys in each column family that will not experience
908 printf("Choosing random keys with no overwrite\n");
910 // Start with the identity permutation. Subsequent iterations of
911 // for loop below will start with perm of previous for loop
912 int64_t *permutation
= new int64_t[max_key_
];
913 for (int64_t i
= 0; i
< max_key_
; i
++) {
916 // Now do the Knuth shuffle
917 int64_t num_no_overwrite_keys
= (max_key_
* FLAGS_nooverwritepercent
) / 100;
918 // Only need to figure out first num_no_overwrite_keys of permutation
919 no_overwrite_ids_
.reserve(num_no_overwrite_keys
);
920 for (int64_t i
= 0; i
< num_no_overwrite_keys
; i
++) {
921 int64_t rand_index
= i
+ rnd
.Next() % (max_key_
- i
);
922 // Swap i and rand_index;
923 int64_t temp
= permutation
[i
];
924 permutation
[i
] = permutation
[rand_index
];
925 permutation
[rand_index
] = temp
;
926 // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
928 no_overwrite_ids_
.insert(permutation
[i
]);
930 delete[] permutation
;
932 size_t expected_values_size
=
933 sizeof(std::atomic
<uint32_t>) * FLAGS_column_families
* max_key_
;
934 bool values_init_needed
= false;
936 if (!FLAGS_expected_values_path
.empty()) {
937 if (!std::atomic
<uint32_t>{}.is_lock_free()) {
938 status
= Status::InvalidArgument(
939 "Cannot use --expected_values_path on platforms without lock-free "
940 "std::atomic<uint32_t>");
942 if (status
.ok() && FLAGS_clear_column_family_one_in
> 0) {
943 status
= Status::InvalidArgument(
944 "Cannot use --expected_values_path on when "
945 "--clear_column_family_one_in is greater than zero.");
949 status
= FLAGS_env
->GetFileSize(FLAGS_expected_values_path
, &size
);
951 unique_ptr
<WritableFile
> wfile
;
952 if (status
.ok() && size
== 0) {
953 const EnvOptions soptions
;
954 status
= FLAGS_env
->NewWritableFile(FLAGS_expected_values_path
, &wfile
,
957 if (status
.ok() && size
== 0) {
958 std::string
buf(expected_values_size
, '\0');
959 status
= wfile
->Append(buf
);
960 values_init_needed
= true;
963 status
= FLAGS_env
->NewMemoryMappedFileBuffer(
964 FLAGS_expected_values_path
, &expected_mmap_buffer_
);
967 assert(expected_mmap_buffer_
->GetLen() == expected_values_size
);
969 static_cast<std::atomic
<uint32_t>*>(expected_mmap_buffer_
->GetBase());
970 assert(values_
!= nullptr);
972 fprintf(stderr
, "Failed opening shared file '%s' with error: %s\n",
973 FLAGS_expected_values_path
.c_str(), status
.ToString().c_str());
974 assert(values_
== nullptr);
977 if (values_
== nullptr) {
978 values_allocation_
.reset(
979 new std::atomic
<uint32_t>[FLAGS_column_families
* max_key_
]);
980 values_
= &values_allocation_
[0];
981 values_init_needed
= true;
983 assert(values_
!= nullptr);
984 if (values_init_needed
) {
985 for (int i
= 0; i
< FLAGS_column_families
; ++i
) {
986 for (int j
= 0; j
< max_key_
; ++j
) {
987 Delete(i
, j
, false /* pending */);
992 if (FLAGS_test_batches_snapshots
) {
993 fprintf(stdout
, "No lock creation because test_batches_snapshots set\n");
997 long num_locks
= static_cast<long>(max_key_
>> log2_keys_per_lock_
);
998 if (max_key_
& ((1 << log2_keys_per_lock_
) - 1)) {
1001 fprintf(stdout
, "Creating %ld locks\n", num_locks
* FLAGS_column_families
);
1002 key_locks_
.resize(FLAGS_column_families
);
1004 for (int i
= 0; i
< FLAGS_column_families
; ++i
) {
1005 key_locks_
[i
].resize(num_locks
);
1006 for (auto& ptr
: key_locks_
[i
]) {
1007 ptr
.reset(new port::Mutex
);
1014 port::Mutex
* GetMutex() {
1018 port::CondVar
* GetCondVar() {
1022 StressTest
* GetStressTest() const {
1023 return stress_test_
;
1026 int64_t GetMaxKey() const {
1030 uint32_t GetNumThreads() const {
1031 return num_threads_
;
1034 void IncInitialized() {
1038 void IncOperated() {
1046 void IncVotedReopen() {
1047 vote_reopen_
= (vote_reopen_
+ 1) % num_threads_
;
1050 bool AllInitialized() const {
1051 return num_initialized_
>= num_threads_
;
1054 bool AllOperated() const {
1055 return num_populated_
>= num_threads_
;
1058 bool AllDone() const {
1059 return num_done_
>= num_threads_
;
1062 bool AllVotedReopen() {
1063 return (vote_reopen_
== 0);
1070 void SetStartVerify() {
1071 start_verify_
= true;
1074 bool Started() const {
1078 bool VerifyStarted() const {
1079 return start_verify_
;
1082 void SetVerificationFailure() { verification_failure_
.store(true); }
1084 bool HasVerificationFailedYet() { return verification_failure_
.load(); }
1086 port::Mutex
* GetMutexForKey(int cf
, int64_t key
) {
1087 return key_locks_
[cf
][key
>> log2_keys_per_lock_
].get();
1090 void LockColumnFamily(int cf
) {
1091 for (auto& mutex
: key_locks_
[cf
]) {
1096 void UnlockColumnFamily(int cf
) {
1097 for (auto& mutex
: key_locks_
[cf
]) {
1102 std::atomic
<uint32_t>& Value(int cf
, int64_t key
) const {
1103 return values_
[cf
* max_key_
+ key
];
1106 void ClearColumnFamily(int cf
) {
1107 std::fill(&Value(cf
, 0 /* key */), &Value(cf
+ 1, 0 /* key */),
1111 // @param pending True if the update may have started but is not yet
1112 // guaranteed finished. This is useful for crash-recovery testing when the
1113 // process may crash before updating the expected values array.
1114 void Put(int cf
, int64_t key
, uint32_t value_base
, bool pending
) {
1116 // prevent expected-value update from reordering before Write
1117 std::atomic_thread_fence(std::memory_order_release
);
1119 Value(cf
, key
).store(pending
? UNKNOWN_SENTINEL
: value_base
,
1120 std::memory_order_relaxed
);
1122 // prevent Write from reordering before expected-value update
1123 std::atomic_thread_fence(std::memory_order_release
);
1127 uint32_t Get(int cf
, int64_t key
) const { return Value(cf
, key
); }
1129 // @param pending See comment above Put()
1130 // Returns true if the key was not yet deleted.
1131 bool Delete(int cf
, int64_t key
, bool pending
) {
1132 if (Value(cf
, key
) == DELETION_SENTINEL
) {
1135 Put(cf
, key
, DELETION_SENTINEL
, pending
);
1139 // @param pending See comment above Put()
1140 // Returns true if the key was not yet deleted.
1141 bool SingleDelete(int cf
, int64_t key
, bool pending
) {
1142 return Delete(cf
, key
, pending
);
1145 // @param pending See comment above Put()
1146 // Returns number of keys deleted by the call.
1147 int DeleteRange(int cf
, int64_t begin_key
, int64_t end_key
, bool pending
) {
1149 for (int64_t key
= begin_key
; key
< end_key
; ++key
) {
1150 if (Delete(cf
, key
, pending
)) {
1157 bool AllowsOverwrite(int64_t key
) {
1158 return no_overwrite_ids_
.find(key
) == no_overwrite_ids_
.end();
1161 bool Exists(int cf
, int64_t key
) {
1162 // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
1163 // is disallowed can't be accidentally added a second time, in which case
1164 // SingleDelete wouldn't be able to properly delete the key. It does allow
1165 // the case where a SingleDelete might be added which covers nothing, but
1166 // that's not a correctness issue.
1167 uint32_t expected_value
= Value(cf
, key
).load();
1168 return expected_value
!= DELETION_SENTINEL
;
1171 uint32_t GetSeed() const { return seed_
; }
1173 void SetShouldStopBgThread() { should_stop_bg_thread_
= true; }
1175 bool ShoudStopBgThread() { return should_stop_bg_thread_
; }
1177 void SetBgThreadFinish() { bg_thread_finished_
= true; }
1179 bool BgThreadFinished() const { return bg_thread_finished_
; }
1181 bool ShouldVerifyAtBeginning() const {
1182 return expected_mmap_buffer_
.get() != nullptr;
1188 const uint32_t seed_
;
1189 const int64_t max_key_
;
1190 const uint32_t log2_keys_per_lock_
;
1191 const int num_threads_
;
1192 long num_initialized_
;
1193 long num_populated_
;
1198 bool should_stop_bg_thread_
;
1199 bool bg_thread_finished_
;
1200 StressTest
* stress_test_
;
1201 std::atomic
<bool> verification_failure_
;
1203 // Keys that should not be overwritten
1204 std::unordered_set
<size_t> no_overwrite_ids_
;
1206 std::atomic
<uint32_t>* values_
;
1207 std::unique_ptr
<std::atomic
<uint32_t>[]> values_allocation_
;
1208 // Has to make it owned by a smart ptr as port::Mutex is not copyable
1209 // and storing it in the container may require copying depending on the impl.
1210 std::vector
<std::vector
<std::unique_ptr
<port::Mutex
> > > key_locks_
;
1211 std::unique_ptr
<MemoryMappedFileBuffer
> expected_mmap_buffer_
;
1214 const uint32_t SharedState::UNKNOWN_SENTINEL
= 0xfffffffe;
1215 const uint32_t SharedState::DELETION_SENTINEL
= 0xffffffff;
1217 // Per-thread state for concurrent executions of the same benchmark.
1218 struct ThreadState
{
1219 uint32_t tid
; // 0..n-1
1220 Random rand
; // Has different seeds for different threads
1221 SharedState
* shared
;
1223 struct SnapshotState
{
1224 const Snapshot
* snapshot
;
1225 // The cf from which we did a Get at this snapshot
1227 // The name of the cf at the time that we did a read
1228 std::string cf_at_name
;
1229 // The key with which we did a Get at this snapshot
1231 // The status of the Get
1233 // The value of the Get
1235 // optional state of all keys in the db
1236 std::vector
<bool> *key_vec
;
1238 std::queue
<std::pair
<uint64_t, SnapshotState
> > snapshot_queue
;
1240 ThreadState(uint32_t index
, SharedState
* _shared
)
1241 : tid(index
), rand(1000 + index
+ _shared
->GetSeed()), shared(_shared
) {}
1244 class DbStressListener
: public EventListener
{
1246 DbStressListener(const std::string
& db_name
,
1247 const std::vector
<DbPath
>& db_paths
,
1248 const std::vector
<ColumnFamilyDescriptor
>& column_families
)
1249 : db_name_(db_name
),
1250 db_paths_(db_paths
),
1251 column_families_(column_families
),
1252 num_pending_file_creations_(0) {}
1253 virtual ~DbStressListener() {
1254 assert(num_pending_file_creations_
== 0);
1256 #ifndef ROCKSDB_LITE
1257 virtual void OnFlushCompleted(DB
* /*db*/, const FlushJobInfo
& info
) override
{
1258 assert(IsValidColumnFamilyName(info
.cf_name
));
1259 VerifyFilePath(info
.file_path
);
1260 // pretending doing some work here
1261 std::this_thread::sleep_for(
1262 std::chrono::microseconds(Random::GetTLSInstance()->Uniform(5000)));
1265 virtual void OnCompactionCompleted(DB
* /*db*/,
1266 const CompactionJobInfo
& ci
) override
{
1267 assert(IsValidColumnFamilyName(ci
.cf_name
));
1268 assert(ci
.input_files
.size() + ci
.output_files
.size() > 0U);
1269 for (const auto& file_path
: ci
.input_files
) {
1270 VerifyFilePath(file_path
);
1272 for (const auto& file_path
: ci
.output_files
) {
1273 VerifyFilePath(file_path
);
1275 // pretending doing some work here
1276 std::this_thread::sleep_for(
1277 std::chrono::microseconds(Random::GetTLSInstance()->Uniform(5000)));
1280 virtual void OnTableFileCreationStarted(
1281 const TableFileCreationBriefInfo
& /*info*/) override
{
1282 ++num_pending_file_creations_
;
1284 virtual void OnTableFileCreated(const TableFileCreationInfo
& info
) override
{
1285 assert(info
.db_name
== db_name_
);
1286 assert(IsValidColumnFamilyName(info
.cf_name
));
1287 if (info
.file_size
) {
1288 VerifyFilePath(info
.file_path
);
1290 assert(info
.job_id
> 0 || FLAGS_compact_files_one_in
> 0);
1291 if (info
.status
.ok() && info
.file_size
> 0) {
1292 assert(info
.table_properties
.data_size
> 0);
1293 assert(info
.table_properties
.raw_key_size
> 0);
1294 assert(info
.table_properties
.num_entries
> 0);
1296 --num_pending_file_creations_
;
1300 bool IsValidColumnFamilyName(const std::string
& cf_name
) const {
1301 if (cf_name
== kDefaultColumnFamilyName
) {
1304 // The column family names in the stress tests are numbers.
1305 for (size_t i
= 0; i
< cf_name
.size(); ++i
) {
1306 if (cf_name
[i
] < '0' || cf_name
[i
] > '9') {
1313 void VerifyFileDir(const std::string
& file_dir
) {
1315 if (db_name_
== file_dir
) {
1318 for (const auto& db_path
: db_paths_
) {
1319 if (db_path
.path
== file_dir
) {
1323 for (auto& cf
: column_families_
) {
1324 for (const auto& cf_path
: cf
.options
.cf_paths
) {
1325 if (cf_path
.path
== file_dir
) {
1336 void VerifyFileName(const std::string
& file_name
) {
1338 uint64_t file_number
;
1340 bool result
= ParseFileName(file_name
, &file_number
, &file_type
);
1342 assert(file_type
== kTableFile
);
1348 void VerifyFilePath(const std::string
& file_path
) {
1350 size_t pos
= file_path
.find_last_of("/");
1351 if (pos
== std::string::npos
) {
1352 VerifyFileName(file_path
);
1355 VerifyFileDir(file_path
.substr(0, pos
));
1357 VerifyFileName(file_path
.substr(pos
));
1363 #endif // !ROCKSDB_LITE
1366 std::string db_name_
;
1367 std::vector
<DbPath
> db_paths_
;
1368 std::vector
<ColumnFamilyDescriptor
> column_families_
;
1369 std::atomic
<int> num_pending_file_creations_
;
1377 : cache_(NewCache(FLAGS_cache_size
)),
1378 compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size
)),
1379 filter_policy_(FLAGS_bloom_bits
>= 0
1380 ? FLAGS_use_block_based_filter
1381 ? NewBloomFilterPolicy(FLAGS_bloom_bits
, true)
1382 : NewBloomFilterPolicy(FLAGS_bloom_bits
, false)
1385 #ifndef ROCKSDB_LITE
1388 new_column_family_name_(1),
1389 num_times_reopened_(0) {
1390 if (FLAGS_destroy_db_initially
) {
1391 std::vector
<std::string
> files
;
1392 FLAGS_env
->GetChildren(FLAGS_db
, &files
);
1393 for (unsigned int i
= 0; i
< files
.size(); i
++) {
1394 if (Slice(files
[i
]).starts_with("heap-")) {
1395 FLAGS_env
->DeleteFile(FLAGS_db
+ "/" + files
[i
]);
1398 DestroyDB(FLAGS_db
, Options());
1402 virtual ~StressTest() {
1403 for (auto cf
: column_families_
) {
1406 column_families_
.clear();
1410 std::shared_ptr
<Cache
> NewCache(size_t capacity
) {
1411 if (capacity
<= 0) {
1414 if (FLAGS_use_clock_cache
) {
1415 auto cache
= NewClockCache((size_t)capacity
);
1417 fprintf(stderr
, "Clock cache not supported.");
1422 return NewLRUCache((size_t)capacity
);
1426 bool BuildOptionsTable() {
1427 if (FLAGS_set_options_one_in
<= 0) {
1431 std::unordered_map
<std::string
, std::vector
<std::string
> > options_tbl
= {
1432 {"write_buffer_size",
1433 {ToString(options_
.write_buffer_size
),
1434 ToString(options_
.write_buffer_size
* 2),
1435 ToString(options_
.write_buffer_size
* 4)}},
1436 {"max_write_buffer_number",
1437 {ToString(options_
.max_write_buffer_number
),
1438 ToString(options_
.max_write_buffer_number
* 2),
1439 ToString(options_
.max_write_buffer_number
* 4)}},
1440 {"arena_block_size",
1442 ToString(options_
.arena_block_size
),
1443 ToString(options_
.write_buffer_size
/ 4),
1444 ToString(options_
.write_buffer_size
/ 8),
1446 {"memtable_huge_page_size", {"0", ToString(2 * 1024 * 1024)}},
1447 {"max_successive_merges", {"0", "2", "4"}},
1448 {"inplace_update_num_locks", {"100", "200", "300"}},
1449 // TODO(ljin): enable test for this option
1450 // {"disable_auto_compactions", {"100", "200", "300"}},
1451 {"soft_rate_limit", {"0", "0.5", "0.9"}},
1452 {"hard_rate_limit", {"0", "1.1", "2.0"}},
1453 {"level0_file_num_compaction_trigger",
1455 ToString(options_
.level0_file_num_compaction_trigger
),
1456 ToString(options_
.level0_file_num_compaction_trigger
+ 2),
1457 ToString(options_
.level0_file_num_compaction_trigger
+ 4),
1459 {"level0_slowdown_writes_trigger",
1461 ToString(options_
.level0_slowdown_writes_trigger
),
1462 ToString(options_
.level0_slowdown_writes_trigger
+ 2),
1463 ToString(options_
.level0_slowdown_writes_trigger
+ 4),
1465 {"level0_stop_writes_trigger",
1467 ToString(options_
.level0_stop_writes_trigger
),
1468 ToString(options_
.level0_stop_writes_trigger
+ 2),
1469 ToString(options_
.level0_stop_writes_trigger
+ 4),
1471 {"max_compaction_bytes",
1473 ToString(options_
.target_file_size_base
* 5),
1474 ToString(options_
.target_file_size_base
* 15),
1475 ToString(options_
.target_file_size_base
* 100),
1477 {"target_file_size_base",
1479 ToString(options_
.target_file_size_base
),
1480 ToString(options_
.target_file_size_base
* 2),
1481 ToString(options_
.target_file_size_base
* 4),
1483 {"target_file_size_multiplier",
1485 ToString(options_
.target_file_size_multiplier
), "1", "2",
1487 {"max_bytes_for_level_base",
1489 ToString(options_
.max_bytes_for_level_base
/ 2),
1490 ToString(options_
.max_bytes_for_level_base
),
1491 ToString(options_
.max_bytes_for_level_base
* 2),
1493 {"max_bytes_for_level_multiplier",
1495 ToString(options_
.max_bytes_for_level_multiplier
), "1", "2",
1497 {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
1500 options_table_
= std::move(options_tbl
);
1502 for (const auto& iter
: options_table_
) {
1503 options_index_
.push_back(iter
.first
);
1509 uint64_t now
= FLAGS_env
->NowMicros();
1510 fprintf(stdout
, "%s Initializing db_stress\n",
1511 FLAGS_env
->TimeToString(now
/ 1000000).c_str());
1514 BuildOptionsTable();
1515 SharedState
shared(this);
1516 uint32_t n
= shared
.GetNumThreads();
1518 now
= FLAGS_env
->NowMicros();
1519 fprintf(stdout
, "%s Initializing worker threads\n",
1520 FLAGS_env
->TimeToString(now
/ 1000000).c_str());
1521 std::vector
<ThreadState
*> threads(n
);
1522 for (uint32_t i
= 0; i
< n
; i
++) {
1523 threads
[i
] = new ThreadState(i
, &shared
);
1524 FLAGS_env
->StartThread(ThreadBody
, threads
[i
]);
1526 ThreadState
bg_thread(0, &shared
);
1527 if (FLAGS_compaction_thread_pool_adjust_interval
> 0) {
1528 FLAGS_env
->StartThread(PoolSizeChangeThread
, &bg_thread
);
1531 // Each thread goes through the following states:
1532 // initializing -> wait for others to init -> read/populate/depopulate
1533 // wait for others to operate -> verify -> done
1536 MutexLock
l(shared
.GetMutex());
1537 while (!shared
.AllInitialized()) {
1538 shared
.GetCondVar()->Wait();
1540 if (shared
.ShouldVerifyAtBeginning()) {
1541 if (shared
.HasVerificationFailedYet()) {
1542 printf("Crash-recovery verification failed :(\n");
1544 printf("Crash-recovery verification passed :)\n");
1548 now
= FLAGS_env
->NowMicros();
1549 fprintf(stdout
, "%s Starting database operations\n",
1550 FLAGS_env
->TimeToString(now
/1000000).c_str());
1553 shared
.GetCondVar()->SignalAll();
1554 while (!shared
.AllOperated()) {
1555 shared
.GetCondVar()->Wait();
1558 now
= FLAGS_env
->NowMicros();
1559 if (FLAGS_test_batches_snapshots
) {
1560 fprintf(stdout
, "%s Limited verification already done during gets\n",
1561 FLAGS_env
->TimeToString((uint64_t) now
/1000000).c_str());
1563 fprintf(stdout
, "%s Starting verification\n",
1564 FLAGS_env
->TimeToString((uint64_t) now
/1000000).c_str());
1567 shared
.SetStartVerify();
1568 shared
.GetCondVar()->SignalAll();
1569 while (!shared
.AllDone()) {
1570 shared
.GetCondVar()->Wait();
1574 for (unsigned int i
= 1; i
< n
; i
++) {
1575 threads
[0]->stats
.Merge(threads
[i
]->stats
);
1577 threads
[0]->stats
.Report("Stress Test");
1579 for (unsigned int i
= 0; i
< n
; i
++) {
1581 threads
[i
] = nullptr;
1583 now
= FLAGS_env
->NowMicros();
1584 if (!FLAGS_test_batches_snapshots
&& !shared
.HasVerificationFailedYet()) {
1585 fprintf(stdout
, "%s Verification successful\n",
1586 FLAGS_env
->TimeToString(now
/1000000).c_str());
1590 if (FLAGS_compaction_thread_pool_adjust_interval
> 0) {
1591 MutexLock
l(shared
.GetMutex());
1592 shared
.SetShouldStopBgThread();
1593 while (!shared
.BgThreadFinished()) {
1594 shared
.GetCondVar()->Wait();
1598 if (shared
.HasVerificationFailedYet()) {
1599 printf("Verification failed :(\n");
1606 static void ThreadBody(void* v
) {
1607 ThreadState
* thread
= reinterpret_cast<ThreadState
*>(v
);
1608 SharedState
* shared
= thread
->shared
;
1610 if (shared
->ShouldVerifyAtBeginning()) {
1611 thread
->shared
->GetStressTest()->VerifyDb(thread
);
1614 MutexLock
l(shared
->GetMutex());
1615 shared
->IncInitialized();
1616 if (shared
->AllInitialized()) {
1617 shared
->GetCondVar()->SignalAll();
1619 while (!shared
->Started()) {
1620 shared
->GetCondVar()->Wait();
1623 thread
->shared
->GetStressTest()->OperateDb(thread
);
1626 MutexLock
l(shared
->GetMutex());
1627 shared
->IncOperated();
1628 if (shared
->AllOperated()) {
1629 shared
->GetCondVar()->SignalAll();
1631 while (!shared
->VerifyStarted()) {
1632 shared
->GetCondVar()->Wait();
1636 thread
->shared
->GetStressTest()->VerifyDb(thread
);
1639 MutexLock
l(shared
->GetMutex());
1641 if (shared
->AllDone()) {
1642 shared
->GetCondVar()->SignalAll();
1647 static void PoolSizeChangeThread(void* v
) {
1648 assert(FLAGS_compaction_thread_pool_adjust_interval
> 0);
1649 ThreadState
* thread
= reinterpret_cast<ThreadState
*>(v
);
1650 SharedState
* shared
= thread
->shared
;
1654 MutexLock
l(shared
->GetMutex());
1655 if (shared
->ShoudStopBgThread()) {
1656 shared
->SetBgThreadFinish();
1657 shared
->GetCondVar()->SignalAll();
1662 auto thread_pool_size_base
= FLAGS_max_background_compactions
;
1663 auto thread_pool_size_var
= FLAGS_compaction_thread_pool_variations
;
1664 int new_thread_pool_size
=
1665 thread_pool_size_base
- thread_pool_size_var
+
1666 thread
->rand
.Next() % (thread_pool_size_var
* 2 + 1);
1667 if (new_thread_pool_size
< 1) {
1668 new_thread_pool_size
= 1;
1670 FLAGS_env
->SetBackgroundThreads(new_thread_pool_size
);
1671 // Sleep up to 3 seconds
1672 FLAGS_env
->SleepForMicroseconds(
1673 thread
->rand
.Next() % FLAGS_compaction_thread_pool_adjust_interval
*
1679 static void PrintKeyValue(int cf
, uint64_t key
, const char* value
,
1681 if (!FLAGS_verbose
) {
1685 tmp
.reserve(sz
* 2 + 16);
1687 for (size_t i
= 0; i
< sz
; i
++) {
1688 snprintf(buf
, 4, "%X", value
[i
]);
1691 fprintf(stdout
, "[CF %d] %" PRIi64
" == > (%" ROCKSDB_PRIszt
") %s\n", cf
,
1692 key
, sz
, tmp
.c_str());
1695 static int64_t GenerateOneKey(ThreadState
* thread
, uint64_t iteration
) {
1696 const double completed_ratio
=
1697 static_cast<double>(iteration
) / FLAGS_ops_per_thread
;
1698 const int64_t base_key
= static_cast<int64_t>(
1699 completed_ratio
* (FLAGS_max_key
- FLAGS_active_width
));
1700 return base_key
+ thread
->rand
.Next() % FLAGS_active_width
;
1703 static size_t GenerateValue(uint32_t rand
, char *v
, size_t max_sz
) {
1705 ((rand
% kRandomValueMaxFactor
) + 1) * FLAGS_value_size_mult
;
1706 assert(value_sz
<= max_sz
&& value_sz
>= sizeof(uint32_t));
1708 *((uint32_t*)v
) = rand
;
1709 for (size_t i
=sizeof(uint32_t); i
< value_sz
; i
++) {
1710 v
[i
] = (char)(rand
^ i
);
1713 return value_sz
; // the size of the value set.
1716 Status
AssertSame(DB
* db
, ColumnFamilyHandle
* cf
,
1717 ThreadState::SnapshotState
& snap_state
) {
1719 if (cf
->GetName() != snap_state
.cf_at_name
) {
1723 ropt
.snapshot
= snap_state
.snapshot
;
1724 PinnableSlice
exp_v(&snap_state
.value
);
1727 s
= db
->Get(ropt
, cf
, snap_state
.key
, &v
);
1728 if (!s
.ok() && !s
.IsNotFound()) {
1731 if (snap_state
.status
!= s
) {
1732 return Status::Corruption(
1733 "The snapshot gave inconsistent results for key " +
1734 ToString(Hash(snap_state
.key
.c_str(), snap_state
.key
.size(), 0)) +
1735 " in cf " + cf
->GetName() + ": (" + snap_state
.status
.ToString() +
1736 ") vs. (" + s
.ToString() + ")");
1740 return Status::Corruption("The snapshot gave inconsistent values: (" +
1741 exp_v
.ToString() + ") vs. (" + v
.ToString() +
1745 if (snap_state
.key_vec
!= nullptr) {
1746 std::unique_ptr
<Iterator
> iterator(db
->NewIterator(ropt
));
1747 std::unique_ptr
<std::vector
<bool>> tmp_bitvec(new std::vector
<bool>(FLAGS_max_key
));
1748 for (iterator
->SeekToFirst(); iterator
->Valid(); iterator
->Next()) {
1750 if (GetIntVal(iterator
->key().ToString(), &key_val
)) {
1751 (*tmp_bitvec
.get())[key_val
] = true;
1754 if (!std::equal(snap_state
.key_vec
->begin(),
1755 snap_state
.key_vec
->end(),
1756 tmp_bitvec
.get()->begin())) {
1757 return Status::Corruption("Found inconsistent keys at this snapshot");
1760 return Status::OK();
1763 Status
SetOptions(ThreadState
* thread
) {
1764 assert(FLAGS_set_options_one_in
> 0);
1765 std::unordered_map
<std::string
, std::string
> opts
;
1766 std::string name
= options_index_
[
1767 thread
->rand
.Next() % options_index_
.size()];
1768 int value_idx
= thread
->rand
.Next() % options_table_
[name
].size();
1769 if (name
== "soft_rate_limit" || name
== "hard_rate_limit") {
1770 opts
["soft_rate_limit"] = options_table_
["soft_rate_limit"][value_idx
];
1771 opts
["hard_rate_limit"] = options_table_
["hard_rate_limit"][value_idx
];
1772 } else if (name
== "level0_file_num_compaction_trigger" ||
1773 name
== "level0_slowdown_writes_trigger" ||
1774 name
== "level0_stop_writes_trigger") {
1775 opts
["level0_file_num_compaction_trigger"] =
1776 options_table_
["level0_file_num_compaction_trigger"][value_idx
];
1777 opts
["level0_slowdown_writes_trigger"] =
1778 options_table_
["level0_slowdown_writes_trigger"][value_idx
];
1779 opts
["level0_stop_writes_trigger"] =
1780 options_table_
["level0_stop_writes_trigger"][value_idx
];
1782 opts
[name
] = options_table_
[name
][value_idx
];
1785 int rand_cf_idx
= thread
->rand
.Next() % FLAGS_column_families
;
1786 auto cfh
= column_families_
[rand_cf_idx
];
1787 return db_
->SetOptions(cfh
, opts
);
1790 #ifndef ROCKSDB_LITE
1791 Status
NewTxn(WriteOptions
& write_opts
, Transaction
** txn
) {
1792 if (!FLAGS_use_txn
) {
1793 return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
1795 static std::atomic
<uint64_t> txn_id
= {0};
1796 TransactionOptions txn_options
;
1797 *txn
= txn_db_
->BeginTransaction(write_opts
, txn_options
);
1798 auto istr
= std::to_string(txn_id
.fetch_add(1));
1799 Status s
= (*txn
)->SetName("xid" + istr
);
1803 Status
CommitTxn(Transaction
* txn
) {
1804 if (!FLAGS_use_txn
) {
1805 return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
1807 Status s
= txn
->Prepare();
1816 virtual void OperateDb(ThreadState
* thread
) {
1817 ReadOptions
read_opts(FLAGS_verify_checksum
, true);
1818 WriteOptions write_opts
;
1819 auto shared
= thread
->shared
;
1821 std::string from_db
;
1823 write_opts
.sync
= true;
1825 write_opts
.disableWAL
= FLAGS_disable_wal
;
1826 const int prefixBound
= (int)FLAGS_readpercent
+ (int)FLAGS_prefixpercent
;
1827 const int writeBound
= prefixBound
+ (int)FLAGS_writepercent
;
1828 const int delBound
= writeBound
+ (int)FLAGS_delpercent
;
1829 const int delRangeBound
= delBound
+ (int)FLAGS_delrangepercent
;
1831 thread
->stats
.Start();
1832 for (uint64_t i
= 0; i
< FLAGS_ops_per_thread
; i
++) {
1833 if (thread
->shared
->HasVerificationFailedYet()) {
1836 if (i
!= 0 && (i
% (FLAGS_ops_per_thread
/ (FLAGS_reopen
+ 1))) == 0) {
1838 thread
->stats
.FinishedSingleOp();
1839 MutexLock
l(thread
->shared
->GetMutex());
1840 while (!thread
->snapshot_queue
.empty()) {
1841 db_
->ReleaseSnapshot(
1842 thread
->snapshot_queue
.front().second
.snapshot
);
1843 delete thread
->snapshot_queue
.front().second
.key_vec
;
1844 thread
->snapshot_queue
.pop();
1846 thread
->shared
->IncVotedReopen();
1847 if (thread
->shared
->AllVotedReopen()) {
1848 thread
->shared
->GetStressTest()->Reopen();
1849 thread
->shared
->GetCondVar()->SignalAll();
1852 thread
->shared
->GetCondVar()->Wait();
1854 // Commenting this out as we don't want to reset stats on each open.
1855 // thread->stats.Start();
1860 if (FLAGS_set_options_one_in
> 0 &&
1861 thread
->rand
.OneIn(FLAGS_set_options_one_in
)) {
1865 if (FLAGS_set_in_place_one_in
> 0 &&
1866 thread
->rand
.OneIn(FLAGS_set_in_place_one_in
)) {
1867 options_
.inplace_update_support
^= options_
.inplace_update_support
;
1870 MaybeClearOneColumnFamily(thread
);
1872 #ifndef ROCKSDB_LITE
1873 if (FLAGS_checkpoint_one_in
> 0 &&
1874 thread
->rand
.Uniform(FLAGS_checkpoint_one_in
) == 0) {
1875 std::string checkpoint_dir
=
1876 FLAGS_db
+ "/.checkpoint" + ToString(thread
->tid
);
1877 DestroyDB(checkpoint_dir
, Options());
1878 Checkpoint
* checkpoint
;
1879 Status s
= Checkpoint::Create(db_
, &checkpoint
);
1881 s
= checkpoint
->CreateCheckpoint(checkpoint_dir
);
1883 std::vector
<std::string
> files
;
1885 s
= FLAGS_env
->GetChildren(checkpoint_dir
, &files
);
1887 DestroyDB(checkpoint_dir
, Options());
1890 printf("A checkpoint operation failed with: %s\n",
1891 s
.ToString().c_str());
1895 if (FLAGS_backup_one_in
> 0 &&
1896 thread
->rand
.Uniform(FLAGS_backup_one_in
) == 0) {
1897 std::string backup_dir
= FLAGS_db
+ "/.backup" + ToString(thread
->tid
);
1898 BackupableDBOptions
backup_opts(backup_dir
);
1899 BackupEngine
* backup_engine
= nullptr;
1900 Status s
= BackupEngine::Open(FLAGS_env
, backup_opts
, &backup_engine
);
1902 s
= backup_engine
->CreateNewBackup(db_
);
1905 s
= backup_engine
->PurgeOldBackups(0 /* num_backups_to_keep */);
1908 printf("A BackupEngine operation failed with: %s\n",
1909 s
.ToString().c_str());
1911 if (backup_engine
!= nullptr) {
1912 delete backup_engine
;
1916 if (FLAGS_compact_files_one_in
> 0 &&
1917 thread
->rand
.Uniform(FLAGS_compact_files_one_in
) == 0) {
1919 column_families_
[thread
->rand
.Next() % FLAGS_column_families
];
1920 rocksdb::ColumnFamilyMetaData cf_meta_data
;
1921 db_
->GetColumnFamilyMetaData(random_cf
, &cf_meta_data
);
1923 // Randomly compact up to three consecutive files from a level
1924 const int kMaxRetry
= 3;
1925 for (int attempt
= 0; attempt
< kMaxRetry
; ++attempt
) {
1926 size_t random_level
= thread
->rand
.Uniform(
1927 static_cast<int>(cf_meta_data
.levels
.size()));
1929 const auto& files
= cf_meta_data
.levels
[random_level
].files
;
1930 if (files
.size() > 0) {
1931 size_t random_file_index
=
1932 thread
->rand
.Uniform(static_cast<int>(files
.size()));
1933 if (files
[random_file_index
].being_compacted
) {
1934 // Retry as the selected file is currently being compacted
1938 std::vector
<std::string
> input_files
;
1939 input_files
.push_back(files
[random_file_index
].name
);
1940 if (random_file_index
> 0 &&
1941 !files
[random_file_index
- 1].being_compacted
) {
1942 input_files
.push_back(files
[random_file_index
- 1].name
);
1944 if (random_file_index
+ 1 < files
.size() &&
1945 !files
[random_file_index
+ 1].being_compacted
) {
1946 input_files
.push_back(files
[random_file_index
+ 1].name
);
1949 size_t output_level
=
1950 std::min(random_level
+ 1, cf_meta_data
.levels
.size() - 1);
1952 db_
->CompactFiles(CompactionOptions(), random_cf
, input_files
,
1953 static_cast<int>(output_level
));
1955 fprintf(stdout
, "Unable to perform CompactFiles(): %s\n",
1956 s
.ToString().c_str());
1957 thread
->stats
.AddNumCompactFilesFailed(1);
1959 thread
->stats
.AddNumCompactFilesSucceed(1);
1965 #endif // !ROCKSDB_LITE
1966 int64_t rand_key
= GenerateOneKey(thread
, i
);
1967 int rand_column_family
= thread
->rand
.Next() % FLAGS_column_families
;
1968 std::string keystr
= Key(rand_key
);
1970 std::unique_ptr
<MutexLock
> lock
;
1971 if (ShouldAcquireMutexOnKey()) {
1972 lock
.reset(new MutexLock(
1973 shared
->GetMutexForKey(rand_column_family
, rand_key
)));
1976 auto column_family
= column_families_
[rand_column_family
];
1978 if (FLAGS_flush_one_in
> 0 &&
1979 thread
->rand
.Uniform(FLAGS_flush_one_in
) == 0) {
1980 FlushOptions flush_opts
;
1981 Status status
= db_
->Flush(flush_opts
, column_family
);
1983 fprintf(stdout
, "Unable to perform Flush(): %s\n", status
.ToString().c_str());
1987 if (FLAGS_compact_range_one_in
> 0 &&
1988 thread
->rand
.Uniform(FLAGS_compact_range_one_in
) == 0) {
1989 int64_t end_key_num
;
1990 if (port::kMaxInt64
- rand_key
< FLAGS_compact_range_width
) {
1991 end_key_num
= port::kMaxInt64
;
1993 end_key_num
= FLAGS_compact_range_width
+ rand_key
;
1995 std::string end_key_buf
= Key(end_key_num
);
1996 Slice
end_key(end_key_buf
);
1998 CompactRangeOptions cro
;
1999 cro
.exclusive_manual_compaction
=
2000 static_cast<bool>(thread
->rand
.Next() % 2);
2001 Status status
= db_
->CompactRange(cro
, column_family
, &key
, &end_key
);
2003 printf("Unable to perform CompactRange(): %s\n",
2004 status
.ToString().c_str());
2008 std::vector
<int> rand_column_families
=
2009 GenerateColumnFamilies(FLAGS_column_families
, rand_column_family
);
2010 std::vector
<int64_t> rand_keys
= GenerateKeys(rand_key
);
2012 if (FLAGS_ingest_external_file_one_in
> 0 &&
2013 thread
->rand
.Uniform(FLAGS_ingest_external_file_one_in
) == 0) {
2014 TestIngestExternalFile(thread
, rand_column_families
, rand_keys
, lock
);
2017 if (FLAGS_acquire_snapshot_one_in
> 0 &&
2018 thread
->rand
.Uniform(FLAGS_acquire_snapshot_one_in
) == 0) {
2019 auto snapshot
= db_
->GetSnapshot();
2021 ropt
.snapshot
= snapshot
;
2022 std::string value_at
;
2023 // When taking a snapshot, we also read a key from that snapshot. We
2024 // will later read the same key before releasing the snapshot and verify
2025 // that the results are the same.
2026 auto status_at
= db_
->Get(ropt
, column_family
, key
, &value_at
);
2027 std::vector
<bool> *key_vec
= nullptr;
2029 if (FLAGS_compare_full_db_state_snapshot
&&
2030 (thread
->tid
== 0)) {
2031 key_vec
= new std::vector
<bool>(FLAGS_max_key
);
2032 std::unique_ptr
<Iterator
> iterator(db_
->NewIterator(ropt
));
2033 for (iterator
->SeekToFirst(); iterator
->Valid(); iterator
->Next()) {
2035 if (GetIntVal(iterator
->key().ToString(), &key_val
)) {
2036 (*key_vec
)[key_val
] = true;
2041 ThreadState::SnapshotState snap_state
= {
2042 snapshot
, rand_column_family
, column_family
->GetName(),
2043 keystr
, status_at
, value_at
, key_vec
};
2044 thread
->snapshot_queue
.emplace(
2045 std::min(FLAGS_ops_per_thread
- 1, i
+ FLAGS_snapshot_hold_ops
),
2048 while (!thread
->snapshot_queue
.empty() &&
2049 i
== thread
->snapshot_queue
.front().first
) {
2050 auto snap_state
= thread
->snapshot_queue
.front().second
;
2051 assert(snap_state
.snapshot
);
2052 // Note: this is unsafe as the cf might be dropped concurrently. But it
2053 // is ok since unclean cf drop is cunnrently not supported by write
2054 // prepared transactions.
2056 AssertSame(db_
, column_families_
[snap_state
.cf_at
], snap_state
);
2058 VerificationAbort(shared
, "Snapshot gave inconsistent state", s
);
2060 db_
->ReleaseSnapshot(snap_state
.snapshot
);
2061 delete snap_state
.key_vec
;
2062 thread
->snapshot_queue
.pop();
2065 int prob_op
= thread
->rand
.Uniform(100);
2066 if (prob_op
>= 0 && prob_op
< (int)FLAGS_readpercent
) {
2068 TestGet(thread
, read_opts
, rand_column_families
, rand_keys
);
2069 } else if ((int)FLAGS_readpercent
<= prob_op
&& prob_op
< prefixBound
) {
2070 // OPERATION prefix scan
2071 // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
2072 // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
2073 // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
2075 TestPrefixScan(thread
, read_opts
, rand_column_families
, rand_keys
);
2076 } else if (prefixBound
<= prob_op
&& prob_op
< writeBound
) {
2078 TestPut(thread
, write_opts
, read_opts
, rand_column_families
, rand_keys
,
2080 } else if (writeBound
<= prob_op
&& prob_op
< delBound
) {
2082 TestDelete(thread
, write_opts
, rand_column_families
, rand_keys
, lock
);
2083 } else if (delBound
<= prob_op
&& prob_op
< delRangeBound
) {
2084 // OPERATION delete range
2085 TestDeleteRange(thread
, write_opts
, rand_column_families
, rand_keys
,
2088 // OPERATION iterate
2089 TestIterate(thread
, read_opts
, rand_column_families
, rand_keys
);
2091 thread
->stats
.FinishedSingleOp();
2094 thread
->stats
.Stop();
2097 virtual void VerifyDb(ThreadState
* thread
) const = 0;
2099 virtual void MaybeClearOneColumnFamily(ThreadState
* /* thread */) {}
2101 virtual bool ShouldAcquireMutexOnKey() const { return false; }
2103 virtual std::vector
<int> GenerateColumnFamilies(
2104 const int /* num_column_families */, int rand_column_family
) const {
2105 return {rand_column_family
};
2108 virtual std::vector
<int64_t> GenerateKeys(int64_t rand_key
) const {
2112 virtual Status
TestGet(ThreadState
* thread
,
2113 const ReadOptions
& read_opts
,
2114 const std::vector
<int>& rand_column_families
,
2115 const std::vector
<int64_t>& rand_keys
) = 0;
2117 virtual Status
TestPrefixScan(ThreadState
* thread
,
2118 const ReadOptions
& read_opts
,
2119 const std::vector
<int>& rand_column_families
,
2120 const std::vector
<int64_t>& rand_keys
) = 0;
2122 virtual Status
TestPut(ThreadState
* thread
,
2123 WriteOptions
& write_opts
, const ReadOptions
& read_opts
,
2124 const std::vector
<int>& cf_ids
, const std::vector
<int64_t>& keys
,
2125 char (&value
)[100], std::unique_ptr
<MutexLock
>& lock
) = 0;
2127 virtual Status
TestDelete(ThreadState
* thread
, WriteOptions
& write_opts
,
2128 const std::vector
<int>& rand_column_families
,
2129 const std::vector
<int64_t>& rand_keys
,
2130 std::unique_ptr
<MutexLock
>& lock
) = 0;
2132 virtual Status
TestDeleteRange(ThreadState
* thread
,
2133 WriteOptions
& write_opts
,
2134 const std::vector
<int>& rand_column_families
,
2135 const std::vector
<int64_t>& rand_keys
,
2136 std::unique_ptr
<MutexLock
>& lock
) = 0;
2138 virtual void TestIngestExternalFile(
2139 ThreadState
* thread
, const std::vector
<int>& rand_column_families
,
2140 const std::vector
<int64_t>& rand_keys
,
2141 std::unique_ptr
<MutexLock
>& lock
) = 0;
2143 // Given a key K, this creates an iterator which scans to K and then
2144 // does a random sequence of Next/Prev operations.
2145 virtual Status
TestIterate(ThreadState
* thread
,
2146 const ReadOptions
& read_opts
,
2147 const std::vector
<int>& rand_column_families
,
2148 const std::vector
<int64_t>& rand_keys
) {
2150 const Snapshot
* snapshot
= db_
->GetSnapshot();
2151 ReadOptions readoptionscopy
= read_opts
;
2152 readoptionscopy
.snapshot
= snapshot
;
2154 std::string upper_bound_str
;
2156 if (thread
->rand
.OneIn(16)) {
2157 // in 1/16 chance, set a iterator upper bound
2158 int64_t rand_upper_key
= GenerateOneKey(thread
, FLAGS_ops_per_thread
);
2159 upper_bound_str
= Key(rand_upper_key
);
2160 upper_bound
= Slice(upper_bound_str
);
2161 // uppder_bound can be smaller than seek key, but the query itself
2162 // should not crash either.
2163 readoptionscopy
.iterate_upper_bound
= &upper_bound
;
2165 std::string lower_bound_str
;
2167 if (thread
->rand
.OneIn(16)) {
2168 // in 1/16 chance, set a iterator lower bound
2169 int64_t rand_lower_key
= GenerateOneKey(thread
, FLAGS_ops_per_thread
);
2170 lower_bound_str
= Key(rand_lower_key
);
2171 lower_bound
= Slice(lower_bound_str
);
2172 // uppder_bound can be smaller than seek key, but the query itself
2173 // should not crash either.
2174 readoptionscopy
.iterate_lower_bound
= &lower_bound
;
2177 auto cfh
= column_families_
[rand_column_families
[0]];
2178 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(readoptionscopy
, cfh
));
2180 std::string key_str
= Key(rand_keys
[0]);
2181 Slice key
= key_str
;
2183 for (uint64_t i
= 0; i
< FLAGS_num_iterations
&& iter
->Valid(); i
++) {
2184 if (thread
->rand
.OneIn(2)) {
2192 thread
->stats
.AddIterations(1);
2194 thread
->stats
.AddErrors(1);
2197 db_
->ReleaseSnapshot(snapshot
);
2202 void VerificationAbort(SharedState
* shared
, std::string msg
, Status s
) const {
2203 printf("Verification failed: %s. Status is %s\n", msg
.c_str(),
2204 s
.ToString().c_str());
2205 shared
->SetVerificationFailure();
2208 void VerificationAbort(SharedState
* shared
, std::string msg
, int cf
,
2209 int64_t key
) const {
2210 printf("Verification failed for column family %d key %" PRIi64
": %s\n", cf
, key
,
2212 shared
->SetVerificationFailure();
2215 void PrintEnv() const {
2216 fprintf(stdout
, "RocksDB version : %d.%d\n", kMajorVersion
,
2218 fprintf(stdout
, "Format version : %d\n", FLAGS_format_version
);
2219 fprintf(stdout
, "TransactionDB : %s\n",
2220 FLAGS_use_txn
? "true" : "false");
2221 fprintf(stdout
, "Column families : %d\n", FLAGS_column_families
);
2222 if (!FLAGS_test_batches_snapshots
) {
2223 fprintf(stdout
, "Clear CFs one in : %d\n",
2224 FLAGS_clear_column_family_one_in
);
2226 fprintf(stdout
, "Number of threads : %d\n", FLAGS_threads
);
2227 fprintf(stdout
, "Ops per thread : %lu\n",
2228 (unsigned long)FLAGS_ops_per_thread
);
2229 std::string
ttl_state("unused");
2230 if (FLAGS_ttl
> 0) {
2231 ttl_state
= NumberToString(FLAGS_ttl
);
2233 fprintf(stdout
, "Time to live(sec) : %s\n", ttl_state
.c_str());
2234 fprintf(stdout
, "Read percentage : %d%%\n", FLAGS_readpercent
);
2235 fprintf(stdout
, "Prefix percentage : %d%%\n", FLAGS_prefixpercent
);
2236 fprintf(stdout
, "Write percentage : %d%%\n", FLAGS_writepercent
);
2237 fprintf(stdout
, "Delete percentage : %d%%\n", FLAGS_delpercent
);
2238 fprintf(stdout
, "Delete range percentage : %d%%\n", FLAGS_delrangepercent
);
2239 fprintf(stdout
, "No overwrite percentage : %d%%\n",
2240 FLAGS_nooverwritepercent
);
2241 fprintf(stdout
, "Iterate percentage : %d%%\n", FLAGS_iterpercent
);
2242 fprintf(stdout
, "DB-write-buffer-size : %" PRIu64
"\n",
2243 FLAGS_db_write_buffer_size
);
2244 fprintf(stdout
, "Write-buffer-size : %d\n",
2245 FLAGS_write_buffer_size
);
2246 fprintf(stdout
, "Iterations : %lu\n",
2247 (unsigned long)FLAGS_num_iterations
);
2248 fprintf(stdout
, "Max key : %lu\n",
2249 (unsigned long)FLAGS_max_key
);
2250 fprintf(stdout
, "Ratio #ops/#keys : %f\n",
2251 (1.0 * FLAGS_ops_per_thread
* FLAGS_threads
) / FLAGS_max_key
);
2252 fprintf(stdout
, "Num times DB reopens : %d\n", FLAGS_reopen
);
2253 fprintf(stdout
, "Batches/snapshots : %d\n",
2254 FLAGS_test_batches_snapshots
);
2255 fprintf(stdout
, "Do update in place : %d\n", FLAGS_in_place_update
);
2256 fprintf(stdout
, "Num keys per lock : %d\n",
2257 1 << FLAGS_log2_keys_per_lock
);
2258 std::string compression
= CompressionTypeToString(FLAGS_compression_type_e
);
2259 fprintf(stdout
, "Compression : %s\n", compression
.c_str());
2260 std::string checksum
= ChecksumTypeToString(FLAGS_checksum_type_e
);
2261 fprintf(stdout
, "Checksum type : %s\n", checksum
.c_str());
2262 fprintf(stdout
, "Max subcompactions : %" PRIu64
"\n",
2263 FLAGS_subcompactions
);
2265 const char* memtablerep
= "";
2266 switch (FLAGS_rep_factory
) {
2268 memtablerep
= "skip_list";
2271 memtablerep
= "prefix_hash";
2274 memtablerep
= "vector";
2278 fprintf(stdout
, "Memtablerep : %s\n", memtablerep
);
2280 fprintf(stdout
, "Test kill odd : %d\n", rocksdb_kill_odds
);
2281 if (!rocksdb_kill_prefix_blacklist
.empty()) {
2282 fprintf(stdout
, "Skipping kill points prefixes:\n");
2283 for (auto& p
: rocksdb_kill_prefix_blacklist
) {
2284 fprintf(stdout
, " %s\n", p
.c_str());
2288 fprintf(stdout
, "------------------------------------------------\n");
2292 assert(db_
== nullptr);
2293 #ifndef ROCKSDB_LITE
2294 assert(txn_db_
== nullptr);
2296 if (FLAGS_options_file
.empty()) {
2297 BlockBasedTableOptions block_based_options
;
2298 block_based_options
.block_cache
= cache_
;
2299 block_based_options
.block_cache_compressed
= compressed_cache_
;
2300 block_based_options
.checksum
= FLAGS_checksum_type_e
;
2301 block_based_options
.block_size
= FLAGS_block_size
;
2302 block_based_options
.format_version
=
2303 static_cast<uint32_t>(FLAGS_format_version
);
2304 block_based_options
.index_block_restart_interval
=
2305 static_cast<int32_t>(FLAGS_index_block_restart_interval
);
2306 block_based_options
.filter_policy
= filter_policy_
;
2307 options_
.table_factory
.reset(
2308 NewBlockBasedTableFactory(block_based_options
));
2309 options_
.db_write_buffer_size
= FLAGS_db_write_buffer_size
;
2310 options_
.write_buffer_size
= FLAGS_write_buffer_size
;
2311 options_
.max_write_buffer_number
= FLAGS_max_write_buffer_number
;
2312 options_
.min_write_buffer_number_to_merge
=
2313 FLAGS_min_write_buffer_number_to_merge
;
2314 options_
.max_write_buffer_number_to_maintain
=
2315 FLAGS_max_write_buffer_number_to_maintain
;
2316 options_
.memtable_prefix_bloom_size_ratio
=
2317 FLAGS_memtable_prefix_bloom_size_ratio
;
2318 options_
.max_background_compactions
= FLAGS_max_background_compactions
;
2319 options_
.max_background_flushes
= FLAGS_max_background_flushes
;
2320 options_
.compaction_style
=
2321 static_cast<rocksdb::CompactionStyle
>(FLAGS_compaction_style
);
2322 options_
.prefix_extractor
.reset(
2323 NewFixedPrefixTransform(FLAGS_prefix_size
));
2324 options_
.max_open_files
= FLAGS_open_files
;
2325 options_
.statistics
= dbstats
;
2326 options_
.env
= FLAGS_env
;
2327 options_
.use_fsync
= FLAGS_use_fsync
;
2328 options_
.compaction_readahead_size
= FLAGS_compaction_readahead_size
;
2329 options_
.allow_mmap_reads
= FLAGS_mmap_read
;
2330 options_
.allow_mmap_writes
= FLAGS_mmap_write
;
2331 options_
.use_direct_reads
= FLAGS_use_direct_reads
;
2332 options_
.use_direct_io_for_flush_and_compaction
=
2333 FLAGS_use_direct_io_for_flush_and_compaction
;
2334 options_
.target_file_size_base
= FLAGS_target_file_size_base
;
2335 options_
.target_file_size_multiplier
= FLAGS_target_file_size_multiplier
;
2336 options_
.max_bytes_for_level_base
= FLAGS_max_bytes_for_level_base
;
2337 options_
.max_bytes_for_level_multiplier
=
2338 FLAGS_max_bytes_for_level_multiplier
;
2339 options_
.level0_stop_writes_trigger
= FLAGS_level0_stop_writes_trigger
;
2340 options_
.level0_slowdown_writes_trigger
=
2341 FLAGS_level0_slowdown_writes_trigger
;
2342 options_
.level0_file_num_compaction_trigger
=
2343 FLAGS_level0_file_num_compaction_trigger
;
2344 options_
.compression
= FLAGS_compression_type_e
;
2345 options_
.compression_opts
.max_dict_bytes
=
2346 FLAGS_compression_max_dict_bytes
;
2347 options_
.compression_opts
.zstd_max_train_bytes
=
2348 FLAGS_compression_zstd_max_train_bytes
;
2349 options_
.create_if_missing
= true;
2350 options_
.max_manifest_file_size
= FLAGS_max_manifest_file_size
;
2351 options_
.inplace_update_support
= FLAGS_in_place_update
;
2352 options_
.max_subcompactions
= static_cast<uint32_t>(FLAGS_subcompactions
);
2353 options_
.allow_concurrent_memtable_write
=
2354 FLAGS_allow_concurrent_memtable_write
;
2355 options_
.enable_pipelined_write
= FLAGS_enable_pipelined_write
;
2356 options_
.enable_write_thread_adaptive_yield
=
2357 FLAGS_enable_write_thread_adaptive_yield
;
2358 options_
.compaction_options_universal
.size_ratio
=
2359 FLAGS_universal_size_ratio
;
2360 options_
.compaction_options_universal
.min_merge_width
=
2361 FLAGS_universal_min_merge_width
;
2362 options_
.compaction_options_universal
.max_merge_width
=
2363 FLAGS_universal_max_merge_width
;
2364 options_
.compaction_options_universal
.max_size_amplification_percent
=
2365 FLAGS_universal_max_size_amplification_percent
;
2368 fprintf(stderr
, "--options_file not supported in lite mode\n");
2371 DBOptions db_options
;
2372 std::vector
<ColumnFamilyDescriptor
> cf_descriptors
;
2373 Status s
= LoadOptionsFromFile(FLAGS_options_file
, Env::Default(),
2374 &db_options
, &cf_descriptors
);
2376 fprintf(stderr
, "Unable to load options file %s --- %s\n",
2377 FLAGS_options_file
.c_str(), s
.ToString().c_str());
2380 options_
= Options(db_options
, cf_descriptors
[0].options
);
2381 #endif // ROCKSDB_LITE
2384 if (FLAGS_rate_limiter_bytes_per_sec
> 0) {
2385 options_
.rate_limiter
.reset(NewGenericRateLimiter(
2386 FLAGS_rate_limiter_bytes_per_sec
, 1000 /* refill_period_us */,
2388 FLAGS_rate_limit_bg_reads
? RateLimiter::Mode::kReadsOnly
2389 : RateLimiter::Mode::kWritesOnly
));
2390 if (FLAGS_rate_limit_bg_reads
) {
2391 options_
.new_table_reader_for_compaction_inputs
= true;
2395 if (FLAGS_prefix_size
== 0 && FLAGS_rep_factory
== kHashSkipList
) {
2397 "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
2400 if (FLAGS_prefix_size
!= 0 && FLAGS_rep_factory
!= kHashSkipList
) {
2402 "WARNING: prefix_size is non-zero but "
2403 "memtablerep != prefix_hash\n");
2405 switch (FLAGS_rep_factory
) {
2407 // no need to do anything
2409 #ifndef ROCKSDB_LITE
2411 options_
.memtable_factory
.reset(NewHashSkipListRepFactory(10000));
2414 options_
.memtable_factory
.reset(new VectorRepFactory());
2419 "RocksdbLite only supports skip list mem table. Skip "
2421 #endif // ROCKSDB_LITE
2424 if (FLAGS_use_full_merge_v1
) {
2425 options_
.merge_operator
= MergeOperators::CreateDeprecatedPutOperator();
2427 options_
.merge_operator
= MergeOperators::CreatePutOperator();
2430 fprintf(stdout
, "DB path: [%s]\n", FLAGS_db
.c_str());
2433 if (FLAGS_ttl
== -1) {
2434 std::vector
<std::string
> existing_column_families
;
2435 s
= DB::ListColumnFamilies(DBOptions(options_
), FLAGS_db
,
2436 &existing_column_families
); // ignore errors
2439 assert(existing_column_families
.empty());
2440 assert(column_family_names_
.empty());
2441 column_family_names_
.push_back(kDefaultColumnFamilyName
);
2442 } else if (column_family_names_
.empty()) {
2443 // this is the first call to the function Open()
2444 column_family_names_
= existing_column_families
;
2446 // this is a reopen. just assert that existing column_family_names are
2447 // equivalent to what we remember
2448 auto sorted_cfn
= column_family_names_
;
2449 std::sort(sorted_cfn
.begin(), sorted_cfn
.end());
2450 std::sort(existing_column_families
.begin(),
2451 existing_column_families
.end());
2452 if (sorted_cfn
!= existing_column_families
) {
2454 "Expected column families differ from the existing:\n");
2455 printf("Expected: {");
2456 for (auto cf
: sorted_cfn
) {
2457 printf("%s ", cf
.c_str());
2460 printf("Existing: {");
2461 for (auto cf
: existing_column_families
) {
2462 printf("%s ", cf
.c_str());
2466 assert(sorted_cfn
== existing_column_families
);
2468 std::vector
<ColumnFamilyDescriptor
> cf_descriptors
;
2469 for (auto name
: column_family_names_
) {
2470 if (name
!= kDefaultColumnFamilyName
) {
2471 new_column_family_name_
=
2472 std::max(new_column_family_name_
.load(), std::stoi(name
) + 1);
2474 cf_descriptors
.emplace_back(name
, ColumnFamilyOptions(options_
));
2476 while (cf_descriptors
.size() < (size_t)FLAGS_column_families
) {
2477 std::string name
= ToString(new_column_family_name_
.load());
2478 new_column_family_name_
++;
2479 cf_descriptors
.emplace_back(name
, ColumnFamilyOptions(options_
));
2480 column_family_names_
.push_back(name
);
2482 options_
.listeners
.clear();
2483 options_
.listeners
.emplace_back(
2484 new DbStressListener(FLAGS_db
, options_
.db_paths
, cf_descriptors
));
2485 options_
.create_missing_column_families
= true;
2486 if (!FLAGS_use_txn
) {
2487 s
= DB::Open(DBOptions(options_
), FLAGS_db
, cf_descriptors
,
2488 &column_families_
, &db_
);
2490 #ifndef ROCKSDB_LITE
2491 TransactionDBOptions txn_db_options
;
2492 // For the moment it is sufficient to test WRITE_PREPARED policy
2493 txn_db_options
.write_policy
= TxnDBWritePolicy::WRITE_PREPARED
;
2494 s
= TransactionDB::Open(options_
, txn_db_options
, FLAGS_db
,
2495 cf_descriptors
, &column_families_
, &txn_db_
);
2497 // after a crash, rollback to commit recovered transactions
2498 std::vector
<Transaction
*> trans
;
2499 txn_db_
->GetAllPreparedTransactions(&trans
);
2500 Random
rand(static_cast<uint32_t>(FLAGS_seed
));
2501 for (auto txn
: trans
) {
2502 if (rand
.OneIn(2)) {
2506 s
= txn
->Rollback();
2512 txn_db_
->GetAllPreparedTransactions(&trans
);
2513 assert(trans
.size() == 0);
2516 assert(!s
.ok() || column_families_
.size() ==
2517 static_cast<size_t>(FLAGS_column_families
));
2519 #ifndef ROCKSDB_LITE
2520 DBWithTTL
* db_with_ttl
;
2521 s
= DBWithTTL::Open(options_
, FLAGS_db
, &db_with_ttl
, FLAGS_ttl
);
2524 fprintf(stderr
, "TTL is not supported in RocksDBLite\n");
2529 fprintf(stderr
, "open error: %s\n", s
.ToString().c_str());
2535 for (auto cf
: column_families_
) {
2538 column_families_
.clear();
2541 #ifndef ROCKSDB_LITE
2545 num_times_reopened_
++;
2546 auto now
= FLAGS_env
->NowMicros();
2547 fprintf(stdout
, "%s Reopening database for the %dth time\n",
2548 FLAGS_env
->TimeToString(now
/1000000).c_str(),
2549 num_times_reopened_
);
2553 void PrintStatistics() {
2555 fprintf(stdout
, "STATISTICS:\n%s\n", dbstats
->ToString().c_str());
2559 std::shared_ptr
<Cache
> cache_
;
2560 std::shared_ptr
<Cache
> compressed_cache_
;
2561 std::shared_ptr
<const FilterPolicy
> filter_policy_
;
2563 #ifndef ROCKSDB_LITE
2564 TransactionDB
* txn_db_
;
2567 std::vector
<ColumnFamilyHandle
*> column_families_
;
2568 std::vector
<std::string
> column_family_names_
;
2569 std::atomic
<int> new_column_family_name_
;
2570 int num_times_reopened_
;
2571 std::unordered_map
<std::string
, std::vector
<std::string
>> options_table_
;
2572 std::vector
<std::string
> options_index_
;
2575 class NonBatchedOpsStressTest
: public StressTest
{
2577 NonBatchedOpsStressTest() {}
2579 virtual ~NonBatchedOpsStressTest() {}
2581 virtual void VerifyDb(ThreadState
* thread
) const {
2582 ReadOptions
options(FLAGS_verify_checksum
, true);
2583 auto shared
= thread
->shared
;
2584 const int64_t max_key
= shared
->GetMaxKey();
2585 const int64_t keys_per_thread
= max_key
/ shared
->GetNumThreads();
2586 int64_t start
= keys_per_thread
* thread
->tid
;
2587 int64_t end
= start
+ keys_per_thread
;
2588 if (thread
->tid
== shared
->GetNumThreads() - 1) {
2591 for (size_t cf
= 0; cf
< column_families_
.size(); ++cf
) {
2592 if (thread
->shared
->HasVerificationFailedYet()) {
2595 if (!thread
->rand
.OneIn(2)) {
2596 // Use iterator to verify this range
2597 unique_ptr
<Iterator
> iter(
2598 db_
->NewIterator(options
, column_families_
[cf
]));
2599 iter
->Seek(Key(start
));
2600 for (auto i
= start
; i
< end
; i
++) {
2601 if (thread
->shared
->HasVerificationFailedYet()) {
2604 // TODO(ljin): update "long" to uint64_t
2605 // Reseek when the prefix changes
2606 if (i
% (static_cast<int64_t>(1) << 8 * (8 - FLAGS_prefix_size
)) ==
2610 std::string from_db
;
2611 std::string keystr
= Key(i
);
2613 Status s
= iter
->status();
2614 if (iter
->Valid()) {
2615 if (iter
->key().compare(k
) > 0) {
2616 s
= Status::NotFound(Slice());
2617 } else if (iter
->key().compare(k
) == 0) {
2618 from_db
= iter
->value().ToString();
2620 } else if (iter
->key().compare(k
) < 0) {
2621 VerificationAbort(shared
, "An out of range key was found",
2622 static_cast<int>(cf
), i
);
2625 // The iterator found no value for the key in question, so do not
2626 // move to the next item in the iterator
2627 s
= Status::NotFound(Slice());
2629 VerifyValue(static_cast<int>(cf
), i
, options
, shared
, from_db
, s
,
2631 if (from_db
.length()) {
2632 PrintKeyValue(static_cast<int>(cf
), static_cast<uint32_t>(i
),
2633 from_db
.data(), from_db
.length());
2637 // Use Get to verify this range
2638 for (auto i
= start
; i
< end
; i
++) {
2639 if (thread
->shared
->HasVerificationFailedYet()) {
2642 std::string from_db
;
2643 std::string keystr
= Key(i
);
2645 Status s
= db_
->Get(options
, column_families_
[cf
], k
, &from_db
);
2646 VerifyValue(static_cast<int>(cf
), i
, options
, shared
, from_db
, s
,
2648 if (from_db
.length()) {
2649 PrintKeyValue(static_cast<int>(cf
), static_cast<uint32_t>(i
),
2650 from_db
.data(), from_db
.length());
2657 virtual void MaybeClearOneColumnFamily(ThreadState
* thread
) {
2658 if (FLAGS_clear_column_family_one_in
!= 0 && FLAGS_column_families
> 1) {
2659 if (thread
->rand
.OneIn(FLAGS_clear_column_family_one_in
)) {
2660 // drop column family and then create it again (can't drop default)
2661 int cf
= thread
->rand
.Next() % (FLAGS_column_families
- 1) + 1;
2662 std::string new_name
=
2663 ToString(new_column_family_name_
.fetch_add(1));
2665 MutexLock
l(thread
->shared
->GetMutex());
2668 "[CF %d] Dropping and recreating column family. new name: %s\n",
2669 cf
, new_name
.c_str());
2671 thread
->shared
->LockColumnFamily(cf
);
2672 Status s
= db_
->DropColumnFamily(column_families_
[cf
]);
2673 delete column_families_
[cf
];
2675 fprintf(stderr
, "dropping column family error: %s\n",
2676 s
.ToString().c_str());
2679 s
= db_
->CreateColumnFamily(ColumnFamilyOptions(options_
), new_name
,
2680 &column_families_
[cf
]);
2681 column_family_names_
[cf
] = new_name
;
2682 thread
->shared
->ClearColumnFamily(cf
);
2684 fprintf(stderr
, "creating column family error: %s\n",
2685 s
.ToString().c_str());
2688 thread
->shared
->UnlockColumnFamily(cf
);
2693 virtual bool ShouldAcquireMutexOnKey() const { return true; }
2695 virtual Status
TestGet(ThreadState
* thread
,
2696 const ReadOptions
& read_opts
,
2697 const std::vector
<int>& rand_column_families
,
2698 const std::vector
<int64_t>& rand_keys
) {
2699 auto cfh
= column_families_
[rand_column_families
[0]];
2700 std::string key_str
= Key(rand_keys
[0]);
2701 Slice key
= key_str
;
2702 std::string from_db
;
2703 Status s
= db_
->Get(read_opts
, cfh
, key
, &from_db
);
2706 thread
->stats
.AddGets(1, 1);
2707 } else if (s
.IsNotFound()) {
2709 thread
->stats
.AddGets(1, 0);
2712 thread
->stats
.AddErrors(1);
2717 virtual Status
TestPrefixScan(ThreadState
* thread
,
2718 const ReadOptions
& read_opts
,
2719 const std::vector
<int>& rand_column_families
,
2720 const std::vector
<int64_t>& rand_keys
) {
2721 auto cfh
= column_families_
[rand_column_families
[0]];
2722 std::string key_str
= Key(rand_keys
[0]);
2723 Slice key
= key_str
;
2724 Slice prefix
= Slice(key
.data(), FLAGS_prefix_size
);
2726 std::string upper_bound
;
2728 ReadOptions ro_copy
= read_opts
;
2729 if (thread
->rand
.OneIn(2) && GetNextPrefix(prefix
, &upper_bound
)) {
2730 // For half of the time, set the upper bound to the next prefix
2731 ub_slice
= Slice(upper_bound
);
2732 ro_copy
.iterate_upper_bound
= &ub_slice
;
2735 Iterator
* iter
= db_
->NewIterator(ro_copy
, cfh
);
2737 for (iter
->Seek(prefix
);
2738 iter
->Valid() && iter
->key().starts_with(prefix
); iter
->Next()) {
2742 (static_cast<int64_t>(1) << ((8 - FLAGS_prefix_size
) * 8)));
2743 Status s
= iter
->status();
2744 if (iter
->status().ok()) {
2745 thread
->stats
.AddPrefixes(1, static_cast<int>(count
));
2747 thread
->stats
.AddErrors(1);
2753 virtual Status
TestPut(ThreadState
* thread
,
2754 WriteOptions
& write_opts
, const ReadOptions
& read_opts
,
2755 const std::vector
<int>& rand_column_families
,
2756 const std::vector
<int64_t>& rand_keys
,
2757 char (&value
) [100], std::unique_ptr
<MutexLock
>& lock
) {
2758 auto shared
= thread
->shared
;
2759 int64_t max_key
= shared
->GetMaxKey();
2760 int64_t rand_key
= rand_keys
[0];
2761 int rand_column_family
= rand_column_families
[0];
2762 while (!shared
->AllowsOverwrite(rand_key
) &&
2763 (FLAGS_use_merge
|| shared
->Exists(rand_column_family
, rand_key
))) {
2765 rand_key
= thread
->rand
.Next() % max_key
;
2766 rand_column_family
= thread
->rand
.Next() % FLAGS_column_families
;
2767 lock
.reset(new MutexLock(
2768 shared
->GetMutexForKey(rand_column_family
, rand_key
)));
2771 std::string key_str
= Key(rand_key
);
2772 Slice key
= key_str
;
2773 ColumnFamilyHandle
* cfh
= column_families_
[rand_column_family
];
2775 if (FLAGS_verify_before_write
) {
2776 std::string key_str2
= Key(rand_key
);
2778 std::string from_db
;
2779 Status s
= db_
->Get(read_opts
, cfh
, k
, &from_db
);
2780 if (!VerifyValue(rand_column_family
, rand_key
, read_opts
, shared
,
2781 from_db
, s
, true)) {
2785 uint32_t value_base
= thread
->rand
.Next() % shared
->UNKNOWN_SENTINEL
;
2786 size_t sz
= GenerateValue(value_base
, value
, sizeof(value
));
2788 shared
->Put(rand_column_family
, rand_key
, value_base
, true /* pending */);
2790 if (FLAGS_use_merge
) {
2791 if (!FLAGS_use_txn
) {
2792 s
= db_
->Merge(write_opts
, cfh
, key
, v
);
2794 #ifndef ROCKSDB_LITE
2796 s
= NewTxn(write_opts
, &txn
);
2798 s
= txn
->Merge(cfh
, key
, v
);
2806 if (!FLAGS_use_txn
) {
2807 s
= db_
->Put(write_opts
, cfh
, key
, v
);
2809 #ifndef ROCKSDB_LITE
2811 s
= NewTxn(write_opts
, &txn
);
2813 s
= txn
->Put(cfh
, key
, v
);
2821 shared
->Put(rand_column_family
, rand_key
, value_base
, false /* pending */);
2823 fprintf(stderr
, "put or merge error: %s\n", s
.ToString().c_str());
2826 thread
->stats
.AddBytesForWrites(1, sz
);
2827 PrintKeyValue(rand_column_family
, static_cast<uint32_t>(rand_key
),
2832 virtual Status
TestDelete(ThreadState
* thread
, WriteOptions
& write_opts
,
2833 const std::vector
<int>& rand_column_families
,
2834 const std::vector
<int64_t>& rand_keys
,
2835 std::unique_ptr
<MutexLock
>& lock
) {
2836 int64_t rand_key
= rand_keys
[0];
2837 int rand_column_family
= rand_column_families
[0];
2838 auto shared
= thread
->shared
;
2839 int64_t max_key
= shared
->GetMaxKey();
2842 // If the chosen key does not allow overwrite and it does not exist,
2843 // choose another key.
2844 while (!shared
->AllowsOverwrite(rand_key
) &&
2845 !shared
->Exists(rand_column_family
, rand_key
)) {
2847 rand_key
= thread
->rand
.Next() % max_key
;
2848 rand_column_family
= thread
->rand
.Next() % FLAGS_column_families
;
2849 lock
.reset(new MutexLock(
2850 shared
->GetMutexForKey(rand_column_family
, rand_key
)));
2853 std::string key_str
= Key(rand_key
);
2854 Slice key
= key_str
;
2855 auto cfh
= column_families_
[rand_column_family
];
2857 // Use delete if the key may be overwritten and a single deletion
2860 if (shared
->AllowsOverwrite(rand_key
)) {
2861 shared
->Delete(rand_column_family
, rand_key
, true /* pending */);
2862 if (!FLAGS_use_txn
) {
2863 s
= db_
->Delete(write_opts
, cfh
, key
);
2865 #ifndef ROCKSDB_LITE
2867 s
= NewTxn(write_opts
, &txn
);
2869 s
= txn
->Delete(cfh
, key
);
2876 shared
->Delete(rand_column_family
, rand_key
, false /* pending */);
2877 thread
->stats
.AddDeletes(1);
2879 fprintf(stderr
, "delete error: %s\n", s
.ToString().c_str());
2883 shared
->SingleDelete(rand_column_family
, rand_key
, true /* pending */);
2884 if (!FLAGS_use_txn
) {
2885 s
= db_
->SingleDelete(write_opts
, cfh
, key
);
2887 #ifndef ROCKSDB_LITE
2889 s
= NewTxn(write_opts
, &txn
);
2891 s
= txn
->SingleDelete(cfh
, key
);
2898 shared
->SingleDelete(rand_column_family
, rand_key
, false /* pending */);
2899 thread
->stats
.AddSingleDeletes(1);
2901 fprintf(stderr
, "single delete error: %s\n",
2902 s
.ToString().c_str());
2909 virtual Status
TestDeleteRange(ThreadState
* thread
,
2910 WriteOptions
& write_opts
,
2911 const std::vector
<int>& rand_column_families
,
2912 const std::vector
<int64_t>& rand_keys
,
2913 std::unique_ptr
<MutexLock
>& lock
) {
2914 // OPERATION delete range
2915 std::vector
<std::unique_ptr
<MutexLock
>> range_locks
;
2916 // delete range does not respect disallowed overwrites. the keys for
2917 // which overwrites are disallowed are randomly distributed so it
2918 // could be expensive to find a range where each key allows
2920 int64_t rand_key
= rand_keys
[0];
2921 int rand_column_family
= rand_column_families
[0];
2922 auto shared
= thread
->shared
;
2923 int64_t max_key
= shared
->GetMaxKey();
2924 if (rand_key
> max_key
- FLAGS_range_deletion_width
) {
2926 rand_key
= thread
->rand
.Next() %
2927 (max_key
- FLAGS_range_deletion_width
+ 1);
2928 range_locks
.emplace_back(new MutexLock(
2929 shared
->GetMutexForKey(rand_column_family
, rand_key
)));
2931 range_locks
.emplace_back(std::move(lock
));
2933 for (int j
= 1; j
< FLAGS_range_deletion_width
; ++j
) {
2934 if (((rand_key
+ j
) & ((1 << FLAGS_log2_keys_per_lock
) - 1)) == 0) {
2935 range_locks
.emplace_back(new MutexLock(
2936 shared
->GetMutexForKey(rand_column_family
, rand_key
+ j
)));
2939 shared
->DeleteRange(rand_column_family
, rand_key
,
2940 rand_key
+ FLAGS_range_deletion_width
,
2941 true /* pending */);
2943 std::string keystr
= Key(rand_key
);
2945 auto cfh
= column_families_
[rand_column_family
];
2946 std::string end_keystr
= Key(rand_key
+ FLAGS_range_deletion_width
);
2947 Slice end_key
= end_keystr
;
2948 Status s
= db_
->DeleteRange(write_opts
, cfh
, key
, end_key
);
2950 fprintf(stderr
, "delete range error: %s\n",
2951 s
.ToString().c_str());
2954 int covered
= shared
->DeleteRange(
2955 rand_column_family
, rand_key
,
2956 rand_key
+ FLAGS_range_deletion_width
, false /* pending */);
2957 thread
->stats
.AddRangeDeletions(1);
2958 thread
->stats
.AddCoveredByRangeDeletions(covered
);
2963 virtual void TestIngestExternalFile(
2964 ThreadState
* /* thread */,
2965 const std::vector
<int>& /* rand_column_families */,
2966 const std::vector
<int64_t>& /* rand_keys */,
2967 std::unique_ptr
<MutexLock
>& /* lock */) {
2970 "RocksDB lite does not support "
2971 "TestIngestExternalFile\n");
2975 virtual void TestIngestExternalFile(
2976 ThreadState
* thread
, const std::vector
<int>& rand_column_families
,
2977 const std::vector
<int64_t>& rand_keys
, std::unique_ptr
<MutexLock
>& lock
) {
2978 const std::string sst_filename
=
2979 FLAGS_db
+ "/." + ToString(thread
->tid
) + ".sst";
2981 if (FLAGS_env
->FileExists(sst_filename
).ok()) {
2982 // Maybe we terminated abnormally before, so cleanup to give this file
2983 // ingestion a clean slate
2984 s
= FLAGS_env
->DeleteFile(sst_filename
);
2987 SstFileWriter
sst_file_writer(EnvOptions(), options_
);
2989 s
= sst_file_writer
.Open(sst_filename
);
2991 int64_t key_base
= rand_keys
[0];
2992 int column_family
= rand_column_families
[0];
2993 std::vector
<std::unique_ptr
<MutexLock
> > range_locks
;
2994 std::vector
<uint32_t> values
;
2995 SharedState
* shared
= thread
->shared
;
2997 // Grab locks, set pending state on expected values, and add keys
2998 for (int64_t key
= key_base
;
2999 s
.ok() && key
< std::min(key_base
+ FLAGS_ingest_external_file_width
,
3000 shared
->GetMaxKey());
3002 if (key
== key_base
) {
3003 range_locks
.emplace_back(std::move(lock
));
3004 } else if ((key
& ((1 << FLAGS_log2_keys_per_lock
) - 1)) == 0) {
3005 range_locks
.emplace_back(
3006 new MutexLock(shared
->GetMutexForKey(column_family
, key
)));
3009 uint32_t value_base
= thread
->rand
.Next() % shared
->UNKNOWN_SENTINEL
;
3010 values
.push_back(value_base
);
3011 shared
->Put(column_family
, key
, value_base
, true /* pending */);
3014 size_t value_len
= GenerateValue(value_base
, value
, sizeof(value
));
3015 auto key_str
= Key(key
);
3016 s
= sst_file_writer
.Put(Slice(key_str
), Slice(value
, value_len
));
3020 s
= sst_file_writer
.Finish();
3023 s
= db_
->IngestExternalFile(column_families_
[column_family
],
3024 {sst_filename
}, IngestExternalFileOptions());
3027 fprintf(stderr
, "file ingestion error: %s\n", s
.ToString().c_str());
3030 int64_t key
= key_base
;
3031 for (int32_t value
: values
) {
3032 shared
->Put(column_family
, key
, value
, false /* pending */);
3036 #endif // ROCKSDB_LITE
3038 bool VerifyValue(int cf
, int64_t key
, const ReadOptions
& /*opts*/,
3039 SharedState
* shared
, const std::string
& value_from_db
,
3040 Status s
, bool strict
= false) const {
3041 if (shared
->HasVerificationFailedYet()) {
3044 // compare value_from_db with the value in the shared state
3045 char value
[kValueMaxLen
];
3046 uint32_t value_base
= shared
->Get(cf
, key
);
3047 if (value_base
== SharedState::UNKNOWN_SENTINEL
) {
3050 if (value_base
== SharedState::DELETION_SENTINEL
&& !strict
) {
3055 if (value_base
== SharedState::DELETION_SENTINEL
) {
3056 VerificationAbort(shared
, "Unexpected value found", cf
, key
);
3059 size_t sz
= GenerateValue(value_base
, value
, sizeof(value
));
3060 if (value_from_db
.length() != sz
) {
3061 VerificationAbort(shared
, "Length of value read is not equal", cf
, key
);
3064 if (memcmp(value_from_db
.data(), value
, sz
) != 0) {
3065 VerificationAbort(shared
, "Contents of value read don't match", cf
,
3070 if (value_base
!= SharedState::DELETION_SENTINEL
) {
3071 VerificationAbort(shared
, "Value not found: " + s
.ToString(), cf
, key
);
3079 class BatchedOpsStressTest
: public StressTest
{
3081 BatchedOpsStressTest() {}
3082 virtual ~BatchedOpsStressTest() {}
3084 // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
3085 // ("9"+K, "9"+V) in DB atomically i.e in a single batch.
3086 // Also refer BatchedOpsStressTest::TestGet
3087 virtual Status
TestPut(ThreadState
* thread
,
3088 WriteOptions
& write_opts
, const ReadOptions
& /* read_opts */,
3089 const std::vector
<int>& rand_column_families
, const std::vector
<int64_t>& rand_keys
,
3090 char (&value
)[100], std::unique_ptr
<MutexLock
>& /* lock */) {
3091 uint32_t value_base
=
3092 thread
->rand
.Next() % thread
->shared
->UNKNOWN_SENTINEL
;
3093 size_t sz
= GenerateValue(value_base
, value
, sizeof(value
));
3095 std::string keys
[10] = {"9", "8", "7", "6", "5",
3096 "4", "3", "2", "1", "0"};
3097 std::string values
[10] = {"9", "8", "7", "6", "5",
3098 "4", "3", "2", "1", "0"};
3099 Slice value_slices
[10];
3102 auto cfh
= column_families_
[rand_column_families
[0]];
3103 std::string key_str
= Key(rand_keys
[0]);
3104 for (int i
= 0; i
< 10; i
++) {
3106 values
[i
] += v
.ToString();
3107 value_slices
[i
] = values
[i
];
3108 if (FLAGS_use_merge
) {
3109 batch
.Merge(cfh
, keys
[i
], value_slices
[i
]);
3111 batch
.Put(cfh
, keys
[i
], value_slices
[i
]);
3115 s
= db_
->Write(write_opts
, &batch
);
3117 fprintf(stderr
, "multiput error: %s\n", s
.ToString().c_str());
3118 thread
->stats
.AddErrors(1);
3120 // we did 10 writes each of size sz + 1
3121 thread
->stats
.AddBytesForWrites(10, (sz
+ 1) * 10);
3127 // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K)
3128 // in DB atomically i.e in a single batch. Also refer MultiGet.
3129 virtual Status
TestDelete(ThreadState
* thread
, WriteOptions
& writeoptions
,
3130 const std::vector
<int>& rand_column_families
,
3131 const std::vector
<int64_t>& rand_keys
,
3132 std::unique_ptr
<MutexLock
>& /* lock */) {
3133 std::string keys
[10] = {"9", "7", "5", "3", "1",
3134 "8", "6", "4", "2", "0"};
3138 auto cfh
= column_families_
[rand_column_families
[0]];
3139 std::string key_str
= Key(rand_keys
[0]);
3140 for (int i
= 0; i
< 10; i
++) {
3142 batch
.Delete(cfh
, keys
[i
]);
3145 s
= db_
->Write(writeoptions
, &batch
);
3147 fprintf(stderr
, "multidelete error: %s\n", s
.ToString().c_str());
3148 thread
->stats
.AddErrors(1);
3150 thread
->stats
.AddDeletes(10);
3156 virtual Status
TestDeleteRange(ThreadState
* /* thread */,
3157 WriteOptions
& /* write_opts */,
3158 const std::vector
<int>& /* rand_column_families */,
3159 const std::vector
<int64_t>& /* rand_keys */,
3160 std::unique_ptr
<MutexLock
>& /* lock */) {
3162 return Status::NotSupported("BatchedOpsStressTest does not support "
3166 virtual void TestIngestExternalFile(
3167 ThreadState
* /* thread */,
3168 const std::vector
<int>& /* rand_column_families */,
3169 const std::vector
<int64_t>& /* rand_keys */,
3170 std::unique_ptr
<MutexLock
>& /* lock */) {
3173 "BatchedOpsStressTest does not support "
3174 "TestIngestExternalFile\n");
3178 // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K
3179 // in the same snapshot, and verifies that all the values are of the form
3180 // "0"+V, "1"+V,..."9"+V.
3181 // ASSUMES that BatchedOpsStressTest::TestPut was used to put (K, V) into
3183 virtual Status
TestGet(ThreadState
* thread
, const ReadOptions
& readoptions
,
3184 const std::vector
<int>& rand_column_families
,
3185 const std::vector
<int64_t>& rand_keys
) {
3186 std::string keys
[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
3187 Slice key_slices
[10];
3188 std::string values
[10];
3189 ReadOptions readoptionscopy
= readoptions
;
3190 readoptionscopy
.snapshot
= db_
->GetSnapshot();
3191 std::string key_str
= Key(rand_keys
[0]);
3192 Slice key
= key_str
;
3193 auto cfh
= column_families_
[rand_column_families
[0]];
3194 std::string from_db
;
3196 for (int i
= 0; i
< 10; i
++) {
3197 keys
[i
] += key
.ToString();
3198 key_slices
[i
] = keys
[i
];
3199 s
= db_
->Get(readoptionscopy
, cfh
, key_slices
[i
], &from_db
);
3200 if (!s
.ok() && !s
.IsNotFound()) {
3201 fprintf(stderr
, "get error: %s\n", s
.ToString().c_str());
3203 thread
->stats
.AddErrors(1);
3204 // we continue after error rather than exiting so that we can
3205 // find more errors if any
3206 } else if (s
.IsNotFound()) {
3208 thread
->stats
.AddGets(1, 0);
3210 values
[i
] = from_db
;
3212 char expected_prefix
= (keys
[i
])[0];
3213 char actual_prefix
= (values
[i
])[0];
3214 if (actual_prefix
!= expected_prefix
) {
3215 fprintf(stderr
, "error expected prefix = %c actual = %c\n",
3216 expected_prefix
, actual_prefix
);
3218 (values
[i
])[0] = ' '; // blank out the differing character
3219 thread
->stats
.AddGets(1, 1);
3222 db_
->ReleaseSnapshot(readoptionscopy
.snapshot
);
3224 // Now that we retrieved all values, check that they all match
3225 for (int i
= 1; i
< 10; i
++) {
3226 if (values
[i
] != values
[0]) {
3227 fprintf(stderr
, "error : inconsistent values for key %s: %s, %s\n",
3228 key
.ToString(true).c_str(), StringToHex(values
[0]).c_str(),
3229 StringToHex(values
[i
]).c_str());
3230 // we continue after error rather than exiting so that we can
3231 // find more errors if any
3238 // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
3239 // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
3240 // of the key. Each of these 10 scans returns a series of values;
3241 // each series should be the same length, and it is verified for each
3242 // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V.
3243 // ASSUMES that MultiPut was used to put (K, V)
3244 virtual Status
TestPrefixScan(ThreadState
* thread
, const ReadOptions
& readoptions
,
3245 const std::vector
<int>& rand_column_families
,
3246 const std::vector
<int64_t>& rand_keys
) {
3247 std::string key_str
= Key(rand_keys
[0]);
3248 Slice key
= key_str
;
3249 auto cfh
= column_families_
[rand_column_families
[0]];
3250 std::string prefixes
[10] = {"0", "1", "2", "3", "4",
3251 "5", "6", "7", "8", "9"};
3252 Slice prefix_slices
[10];
3253 ReadOptions readoptionscopy
[10];
3254 const Snapshot
* snapshot
= db_
->GetSnapshot();
3255 Iterator
* iters
[10];
3256 std::string upper_bounds
[10];
3257 Slice ub_slices
[10];
3258 Status s
= Status::OK();
3259 for (int i
= 0; i
< 10; i
++) {
3260 prefixes
[i
] += key
.ToString();
3261 prefixes
[i
].resize(FLAGS_prefix_size
);
3262 prefix_slices
[i
] = Slice(prefixes
[i
]);
3263 readoptionscopy
[i
] = readoptions
;
3264 readoptionscopy
[i
].snapshot
= snapshot
;
3265 if (thread
->rand
.OneIn(2) &&
3266 GetNextPrefix(prefix_slices
[i
], &(upper_bounds
[i
]))) {
3267 // For half of the time, set the upper bound to the next prefix
3268 ub_slices
[i
] = Slice(upper_bounds
[i
]);
3269 readoptionscopy
[i
].iterate_upper_bound
= &(ub_slices
[i
]);
3271 iters
[i
] = db_
->NewIterator(readoptionscopy
[i
], cfh
);
3272 iters
[i
]->Seek(prefix_slices
[i
]);
3276 while (iters
[0]->Valid() && iters
[0]->key().starts_with(prefix_slices
[0])) {
3278 std::string values
[10];
3279 // get list of all values for this iteration
3280 for (int i
= 0; i
< 10; i
++) {
3281 // no iterator should finish before the first one
3282 assert(iters
[i
]->Valid() &&
3283 iters
[i
]->key().starts_with(prefix_slices
[i
]));
3284 values
[i
] = iters
[i
]->value().ToString();
3286 char expected_first
= (prefixes
[i
])[0];
3287 char actual_first
= (values
[i
])[0];
3289 if (actual_first
!= expected_first
) {
3290 fprintf(stderr
, "error expected first = %c actual = %c\n",
3291 expected_first
, actual_first
);
3293 (values
[i
])[0] = ' '; // blank out the differing character
3295 // make sure all values are equivalent
3296 for (int i
= 0; i
< 10; i
++) {
3297 if (values
[i
] != values
[0]) {
3298 fprintf(stderr
, "error : %d, inconsistent values for prefix %s: %s, %s\n",
3299 i
, prefixes
[i
].c_str(), StringToHex(values
[0]).c_str(),
3300 StringToHex(values
[i
]).c_str());
3301 // we continue after error rather than exiting so that we can
3302 // find more errors if any
3308 // cleanup iterators and snapshot
3309 for (int i
= 0; i
< 10; i
++) {
3310 // if the first iterator finished, they should have all finished
3311 assert(!iters
[i
]->Valid() ||
3312 !iters
[i
]->key().starts_with(prefix_slices
[i
]));
3313 assert(iters
[i
]->status().ok());
3316 db_
->ReleaseSnapshot(snapshot
);
3319 thread
->stats
.AddPrefixes(1, count
);
3321 thread
->stats
.AddErrors(1);
3327 virtual void VerifyDb(ThreadState
* /* thread */) const {}
3330 } // namespace rocksdb
3332 int main(int argc
, char** argv
) {
3333 SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv
[0]) +
3335 ParseCommandLineFlags(&argc
, &argv
, true);
3337 if (FLAGS_statistics
) {
3338 dbstats
= rocksdb::CreateDBStatistics();
3340 FLAGS_compression_type_e
=
3341 StringToCompressionType(FLAGS_compression_type
.c_str());
3342 FLAGS_checksum_type_e
= StringToChecksumType(FLAGS_checksum_type
.c_str());
3343 if (!FLAGS_hdfs
.empty()) {
3344 FLAGS_env
= new rocksdb::HdfsEnv(FLAGS_hdfs
);
3346 FLAGS_rep_factory
= StringToRepFactory(FLAGS_memtablerep
.c_str());
3348 // The number of background threads should be at least as much the
3349 // max number of concurrent compactions.
3350 FLAGS_env
->SetBackgroundThreads(FLAGS_max_background_compactions
);
3351 FLAGS_env
->SetBackgroundThreads(FLAGS_num_bottom_pri_threads
,
3352 rocksdb::Env::Priority::BOTTOM
);
3353 if (FLAGS_prefixpercent
> 0 && FLAGS_prefix_size
<= 0) {
3355 "Error: prefixpercent is non-zero while prefix_size is "
3359 if (FLAGS_test_batches_snapshots
&& FLAGS_prefix_size
<= 0) {
3361 "Error: please specify prefix_size for "
3362 "test_batches_snapshots test!\n");
3365 if (FLAGS_memtable_prefix_bloom_size_ratio
> 0.0 && FLAGS_prefix_size
<= 0) {
3367 "Error: please specify positive prefix_size in order to use "
3368 "memtable_prefix_bloom_size_ratio\n");
3371 if ((FLAGS_readpercent
+ FLAGS_prefixpercent
+
3372 FLAGS_writepercent
+ FLAGS_delpercent
+ FLAGS_delrangepercent
+
3373 FLAGS_iterpercent
) != 100) {
3375 "Error: Read+Prefix+Write+Delete+DeleteRange+Iterate percents != "
3379 if (FLAGS_disable_wal
== 1 && FLAGS_reopen
> 0) {
3380 fprintf(stderr
, "Error: Db cannot reopen safely with disable_wal set!\n");
3383 if ((unsigned)FLAGS_reopen
>= FLAGS_ops_per_thread
) {
3385 "Error: #DB-reopens should be < ops_per_thread\n"
3386 "Provided reopens = %d and ops_per_thread = %lu\n",
3388 (unsigned long)FLAGS_ops_per_thread
);
3391 if (FLAGS_test_batches_snapshots
&& FLAGS_delrangepercent
> 0) {
3392 fprintf(stderr
, "Error: nonzero delrangepercent unsupported in "
3393 "test_batches_snapshots mode\n");
3396 if (FLAGS_active_width
> FLAGS_max_key
) {
3397 fprintf(stderr
, "Error: active_width can be at most max_key\n");
3399 } else if (FLAGS_active_width
== 0) {
3400 FLAGS_active_width
= FLAGS_max_key
;
3402 if (FLAGS_value_size_mult
* kRandomValueMaxFactor
> kValueMaxLen
) {
3403 fprintf(stderr
, "Error: value_size_mult can be at most %d\n",
3404 kValueMaxLen
/ kRandomValueMaxFactor
);
3407 if (FLAGS_use_merge
&& FLAGS_nooverwritepercent
== 100) {
3410 "Error: nooverwritepercent must not be 100 when using merge operands");
3413 if (FLAGS_ingest_external_file_one_in
> 0 && FLAGS_nooverwritepercent
> 0) {
3415 "Error: nooverwritepercent must be 0 when using file ingestion\n");
3419 // Choose a location for the test database if none given with --db=<path>
3420 if (FLAGS_db
.empty()) {
3421 std::string default_db_path
;
3422 rocksdb::Env::Default()->GetTestDirectory(&default_db_path
);
3423 default_db_path
+= "/dbstress";
3424 FLAGS_db
= default_db_path
;
3427 rocksdb_kill_odds
= FLAGS_kill_random_test
;
3428 rocksdb_kill_prefix_blacklist
= SplitString(FLAGS_kill_prefix_blacklist
);
3430 std::unique_ptr
<rocksdb::StressTest
> stress
;
3431 if (FLAGS_test_batches_snapshots
) {
3432 stress
.reset(new rocksdb::BatchedOpsStressTest());
3434 stress
.reset(new rocksdb::NonBatchedOpsStressTest());
3436 if (stress
->Run()) {