]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/tools/db_stress.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / tools / db_stress.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9//
10// The test uses an array to compare against values written to the database.
11// Keys written to the array are in 1:1 correspondence to the actual values in
12// the database according to the formula in the function GenerateValue.
13
14// Space is reserved in the array from 0 to FLAGS_max_key and values are
15// randomly written/deleted/read from those positions. During verification we
16// compare all the positions in the array. To shorten/elongate the running
17// time, you could change the settings: FLAGS_max_key, FLAGS_ops_per_thread,
18// (sometimes also FLAGS_threads).
19//
20// NOTE that if FLAGS_test_batches_snapshots is set, the test will have
21// different behavior. See comment of the flag for details.
22
23#ifndef GFLAGS
24#include <cstdio>
25int main() {
26 fprintf(stderr, "Please install gflags to run rocksdb tools\n");
27 return 1;
28}
29#else
30
11fdf7f2 31#ifndef __STDC_FORMAT_MACROS
7c673cae 32#define __STDC_FORMAT_MACROS
11fdf7f2
TL
33#endif // __STDC_FORMAT_MACROS
34
35#include <fcntl.h>
7c673cae
FG
36#include <inttypes.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <sys/types.h>
40#include <algorithm>
41#include <chrono>
42#include <exception>
11fdf7f2 43#include <queue>
7c673cae
FG
44#include <thread>
45
7c673cae
FG
46#include "db/db_impl.h"
47#include "db/version_set.h"
48#include "hdfs/env_hdfs.h"
49#include "monitoring/histogram.h"
11fdf7f2 50#include "options/options_helper.h"
7c673cae
FG
51#include "port/port.h"
52#include "rocksdb/cache.h"
53#include "rocksdb/env.h"
54#include "rocksdb/slice.h"
55#include "rocksdb/slice_transform.h"
56#include "rocksdb/statistics.h"
11fdf7f2
TL
57#include "rocksdb/utilities/backupable_db.h"
58#include "rocksdb/utilities/checkpoint.h"
7c673cae 59#include "rocksdb/utilities/db_ttl.h"
11fdf7f2
TL
60#include "rocksdb/utilities/options_util.h"
61#include "rocksdb/utilities/transaction.h"
62#include "rocksdb/utilities/transaction_db.h"
7c673cae
FG
63#include "rocksdb/write_batch.h"
64#include "util/coding.h"
65#include "util/compression.h"
66#include "util/crc32c.h"
11fdf7f2 67#include "util/gflags_compat.h"
7c673cae
FG
68#include "util/logging.h"
69#include "util/mutexlock.h"
70#include "util/random.h"
71#include "util/string_util.h"
11fdf7f2
TL
72// SyncPoint is not supported in Released Windows Mode.
73#if !(defined NDEBUG) || !defined(OS_WIN)
74#include "util/sync_point.h"
75#endif // !(defined NDEBUG) || !defined(OS_WIN)
7c673cae 76#include "util/testutil.h"
11fdf7f2 77
7c673cae
FG
78#include "utilities/merge_operators.h"
79
11fdf7f2
TL
80using GFLAGS_NAMESPACE::ParseCommandLineFlags;
81using GFLAGS_NAMESPACE::RegisterFlagValidator;
82using GFLAGS_NAMESPACE::SetUsageMessage;
7c673cae
FG
83
84static const long KB = 1024;
11fdf7f2
TL
85static const int kRandomValueMaxFactor = 3;
86static const int kValueMaxLen = 100;
7c673cae
FG
87
88static bool ValidateUint32Range(const char* flagname, uint64_t value) {
89 if (value > std::numeric_limits<uint32_t>::max()) {
90 fprintf(stderr,
91 "Invalid value for --%s: %lu, overflow\n",
92 flagname,
93 (unsigned long)value);
94 return false;
95 }
96 return true;
97}
98
99DEFINE_uint64(seed, 2341234, "Seed for PRNG");
11fdf7f2 100static const bool FLAGS_seed_dummy __attribute__((__unused__)) =
7c673cae
FG
101 RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range);
102
494da23a
TL
103DEFINE_bool(read_only, false, "True if open DB in read-only mode during tests");
104
7c673cae
FG
105DEFINE_int64(max_key, 1 * KB* KB,
106 "Max number of key/values to place in database");
107
108DEFINE_int32(column_families, 10, "Number of column families");
109
11fdf7f2
TL
110DEFINE_string(
111 options_file, "",
112 "The path to a RocksDB options file. If specified, then db_stress will "
113 "run with the RocksDB options in the default column family of the "
114 "specified options file. Note that, when an options file is provided, "
115 "db_stress will ignore the flag values for all options that may be passed "
116 "via options file.");
117
118DEFINE_int64(
119 active_width, 0,
120 "Number of keys in active span of the key-range at any given time. The "
121 "span begins with its left endpoint at key 0, gradually moves rightwards, "
122 "and ends with its right endpoint at max_key. If set to 0, active_width "
123 "will be sanitized to be equal to max_key.");
124
7c673cae
FG
125// TODO(noetzli) Add support for single deletes
126DEFINE_bool(test_batches_snapshots, false,
127 "If set, the test uses MultiGet(), MultiPut() and MultiDelete()"
128 " which read/write/delete multiple keys in a batch. In this mode,"
129 " we do not verify db content by comparing the content with the "
130 "pre-allocated array. Instead, we do partial verification inside"
131 " MultiGet() by checking various values in a batch. Benefit of"
132 " this mode:\n"
133 "\t(a) No need to acquire mutexes during writes (less cache "
134 "flushes in multi-core leading to speed up)\n"
135 "\t(b) No long validation at the end (more speed up)\n"
136 "\t(c) Test snapshot and atomicity of batch writes");
137
494da23a
TL
138DEFINE_bool(atomic_flush, false,
139 "If set, enables atomic flush in the options.\n");
140
141DEFINE_bool(test_atomic_flush, false,
142 "If set, runs the stress test dedicated to verifying atomic flush "
143 "functionality. Setting this implies `atomic_flush=true`.\n");
144
7c673cae
FG
145DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
146
147DEFINE_int32(ttl, -1,
148 "Opens the db with this ttl value if this is not -1. "
149 "Carefully specify a large value such that verifications on "
150 "deleted values don't fail");
151
152DEFINE_int32(value_size_mult, 8,
153 "Size of value will be this number times rand_int(1,3) bytes");
154
155DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
156
11fdf7f2
TL
157DEFINE_bool(enable_pipelined_write, false, "Pipeline WAL/memtable writes");
158
7c673cae
FG
159DEFINE_bool(verify_before_write, false, "Verify before write");
160
161DEFINE_bool(histogram, false, "Print histogram of operation timings");
162
163DEFINE_bool(destroy_db_initially, true,
164 "Destroys the database dir before start if this is true");
165
166DEFINE_bool(verbose, false, "Verbose");
167
168DEFINE_bool(progress_reports, true,
169 "If true, db_stress will report number of finished operations");
170
171DEFINE_uint64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
172 "Number of bytes to buffer in all memtables before compacting");
173
174DEFINE_int32(write_buffer_size,
175 static_cast<int32_t>(rocksdb::Options().write_buffer_size),
176 "Number of bytes to buffer in memtable before compacting");
177
178DEFINE_int32(max_write_buffer_number,
179 rocksdb::Options().max_write_buffer_number,
180 "The number of in-memory memtables. "
181 "Each memtable is of size FLAGS_write_buffer_size.");
182
183DEFINE_int32(min_write_buffer_number_to_merge,
184 rocksdb::Options().min_write_buffer_number_to_merge,
185 "The minimum number of write buffers that will be merged together "
186 "before writing to storage. This is cheap because it is an "
187 "in-memory merge. If this feature is not enabled, then all these "
188 "write buffers are flushed to L0 as separate files and this "
189 "increases read amplification because a get request has to check "
190 "in all of these files. Also, an in-memory merge may result in "
191 "writing less data to storage if there are duplicate records in"
192 " each of these individual write buffers.");
193
194DEFINE_int32(max_write_buffer_number_to_maintain,
195 rocksdb::Options().max_write_buffer_number_to_maintain,
196 "The total maximum number of write buffers to maintain in memory "
197 "including copies of buffers that have already been flushed. "
198 "Unlike max_write_buffer_number, this parameter does not affect "
199 "flushing. This controls the minimum amount of write history "
200 "that will be available in memory for conflict checking when "
201 "Transactions are used. If this value is too low, some "
202 "transactions may fail at commit time due to not being able to "
203 "determine whether there were any write conflicts. Setting this "
204 "value to 0 will cause write buffers to be freed immediately "
205 "after they are flushed. If this value is set to -1, "
206 "'max_write_buffer_number' will be used.");
207
11fdf7f2
TL
208DEFINE_double(memtable_prefix_bloom_size_ratio,
209 rocksdb::Options().memtable_prefix_bloom_size_ratio,
210 "creates prefix blooms for memtables, each with size "
211 "`write_buffer_size * memtable_prefix_bloom_size_ratio`.");
212
494da23a
TL
213DEFINE_bool(memtable_whole_key_filtering,
214 rocksdb::Options().memtable_whole_key_filtering,
215 "Enable whole key filtering in memtables.");
216
7c673cae
FG
217DEFINE_int32(open_files, rocksdb::Options().max_open_files,
218 "Maximum number of files to keep open at the same time "
219 "(use default if == 0)");
220
221DEFINE_int64(compressed_cache_size, -1,
222 "Number of bytes to use as a cache of compressed data."
223 " Negative means use default settings.");
224
225DEFINE_int32(compaction_style, rocksdb::Options().compaction_style, "");
226
227DEFINE_int32(level0_file_num_compaction_trigger,
228 rocksdb::Options().level0_file_num_compaction_trigger,
229 "Level0 compaction start trigger");
230
231DEFINE_int32(level0_slowdown_writes_trigger,
232 rocksdb::Options().level0_slowdown_writes_trigger,
233 "Number of files in level-0 that will slow down writes");
234
235DEFINE_int32(level0_stop_writes_trigger,
236 rocksdb::Options().level0_stop_writes_trigger,
237 "Number of files in level-0 that will trigger put stop.");
238
239DEFINE_int32(block_size,
240 static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
241 "Number of bytes in a block.");
242
11fdf7f2
TL
243DEFINE_int32(
244 format_version,
245 static_cast<int32_t>(rocksdb::BlockBasedTableOptions().format_version),
246 "Format version of SST files.");
247
248DEFINE_int32(index_block_restart_interval,
249 rocksdb::BlockBasedTableOptions().index_block_restart_interval,
250 "Number of keys between restart points "
251 "for delta encoding of keys in index block.");
252
7c673cae
FG
253DEFINE_int32(max_background_compactions,
254 rocksdb::Options().max_background_compactions,
255 "The maximum number of concurrent background compactions "
256 "that can occur in parallel.");
257
11fdf7f2
TL
258DEFINE_int32(num_bottom_pri_threads, 0,
259 "The number of threads in the bottom-priority thread pool (used "
260 "by universal compaction only).");
261
7c673cae
FG
262DEFINE_int32(compaction_thread_pool_adjust_interval, 0,
263 "The interval (in milliseconds) to adjust compaction thread pool "
264 "size. Don't change it periodically if the value is 0.");
265
266DEFINE_int32(compaction_thread_pool_variations, 2,
267 "Range of background thread pool size variations when adjusted "
268 "periodically.");
269
270DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes,
271 "The maximum number of concurrent background flushes "
272 "that can occur in parallel.");
273
274DEFINE_int32(universal_size_ratio, 0, "The ratio of file sizes that trigger"
275 " compaction in universal style");
276
277DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files to "
278 "compact in universal style compaction");
279
280DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
281 " in universal style compaction");
282
283DEFINE_int32(universal_max_size_amplification_percent, 0,
284 "The max size amplification for universal style compaction");
285
286DEFINE_int32(clear_column_family_one_in, 1000000,
287 "With a chance of 1/N, delete a column family and then recreate "
288 "it again. If N == 0, never drop/create column families. "
289 "When test_batches_snapshots is true, this flag has no effect");
290
291DEFINE_int32(set_options_one_in, 0,
292 "With a chance of 1/N, change some random options");
293
294DEFINE_int32(set_in_place_one_in, 0,
295 "With a chance of 1/N, toggle in place support option");
296
297DEFINE_int64(cache_size, 2LL * KB * KB * KB,
298 "Number of bytes to use as a cache of uncompressed data.");
299
300DEFINE_bool(use_clock_cache, false,
301 "Replace default LRU block cache with clock cache.");
302
303DEFINE_uint64(subcompactions, 1,
304 "Maximum number of subcompactions to divide L0-L1 compactions "
305 "into.");
306
11fdf7f2 307DEFINE_bool(allow_concurrent_memtable_write, false,
7c673cae
FG
308 "Allow multi-writers to update mem tables in parallel.");
309
310DEFINE_bool(enable_write_thread_adaptive_yield, true,
311 "Use a yielding spin loop for brief writer thread waits.");
312
11fdf7f2 313static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) =
7c673cae
FG
314 RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);
315
316static bool ValidateInt32Positive(const char* flagname, int32_t value) {
317 if (value < 0) {
318 fprintf(stderr, "Invalid value for --%s: %d, must be >=0\n",
319 flagname, value);
320 return false;
321 }
322 return true;
323}
324DEFINE_int32(reopen, 10, "Number of times database reopens");
11fdf7f2 325static const bool FLAGS_reopen_dummy __attribute__((__unused__)) =
7c673cae
FG
326 RegisterFlagValidator(&FLAGS_reopen, &ValidateInt32Positive);
327
328DEFINE_int32(bloom_bits, 10, "Bloom filter bits per key. "
329 "Negative means use default settings.");
330
331DEFINE_bool(use_block_based_filter, false, "use block based filter"
332 "instead of full filter for block based table");
333
334DEFINE_string(db, "", "Use the db with the following name.");
335
11fdf7f2
TL
336DEFINE_string(
337 expected_values_path, "",
338 "File where the array of expected uint32_t values will be stored. If "
339 "provided and non-empty, the DB state will be verified against these "
340 "values after recovery. --max_key and --column_family must be kept the "
341 "same across invocations of this program that use the same "
342 "--expected_values_path.");
343
7c673cae
FG
344DEFINE_bool(verify_checksum, false,
345 "Verify checksum for every block read from storage");
346
347DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads,
348 "Allow reads to occur via mmap-ing files");
349
350DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes,
351 "Allow writes to occur via mmap-ing files");
352
353DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads,
354 "Use O_DIRECT for reading data");
355
356DEFINE_bool(use_direct_io_for_flush_and_compaction,
357 rocksdb::Options().use_direct_io_for_flush_and_compaction,
358 "Use O_DIRECT for writing data");
359
360// Database statistics
361static std::shared_ptr<rocksdb::Statistics> dbstats;
362DEFINE_bool(statistics, false, "Create database statistics");
363
364DEFINE_bool(sync, false, "Sync all writes to disk");
365
366DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
367
368DEFINE_int32(kill_random_test, 0,
369 "If non-zero, kill at various points in source code with "
370 "probability 1/this");
11fdf7f2 371static const bool FLAGS_kill_random_test_dummy __attribute__((__unused__)) =
7c673cae
FG
372 RegisterFlagValidator(&FLAGS_kill_random_test, &ValidateInt32Positive);
373extern int rocksdb_kill_odds;
374
375DEFINE_string(kill_prefix_blacklist, "",
376 "If non-empty, kill points with prefix in the list given will be"
377 " skipped. Items are comma-separated.");
378extern std::vector<std::string> rocksdb_kill_prefix_blacklist;
379
380DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
381
494da23a
TL
382DEFINE_uint64(recycle_log_file_num, rocksdb::Options().recycle_log_file_num,
383 "Number of old WAL files to keep around for later recycling");
384
11fdf7f2 385DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base,
7c673cae
FG
386 "Target level-1 file size for compaction");
387
388DEFINE_int32(target_file_size_multiplier, 1,
389 "A multiplier to compute target level-N file size (N >= 2)");
390
11fdf7f2
TL
391DEFINE_uint64(max_bytes_for_level_base,
392 rocksdb::Options().max_bytes_for_level_base,
393 "Max bytes for level-1");
7c673cae
FG
394
395DEFINE_double(max_bytes_for_level_multiplier, 2,
396 "A multiplier to compute max bytes for level-N (N >= 2)");
397
398DEFINE_int32(range_deletion_width, 10,
399 "The width of the range deletion intervals.");
400
11fdf7f2
TL
401DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
402
403DEFINE_bool(rate_limit_bg_reads, false,
404 "Use options.rate_limiter on compaction reads");
405
406DEFINE_bool(use_txn, false,
407 "Use TransactionDB. Currently the default write policy is "
408 "TxnDBWritePolicy::WRITE_PREPARED");
409
410DEFINE_int32(backup_one_in, 0,
411 "If non-zero, then CreateNewBackup() will be called once for "
412 "every N operations on average. 0 indicates CreateNewBackup() "
413 "is disabled.");
414
415DEFINE_int32(checkpoint_one_in, 0,
416 "If non-zero, then CreateCheckpoint() will be called once for "
417 "every N operations on average. 0 indicates CreateCheckpoint() "
418 "is disabled.");
419
420DEFINE_int32(ingest_external_file_one_in, 0,
421 "If non-zero, then IngestExternalFile() will be called once for "
422 "every N operations on average. 0 indicates IngestExternalFile() "
423 "is disabled.");
424
425DEFINE_int32(ingest_external_file_width, 1000,
426 "The width of the ingested external files.");
427
7c673cae 428DEFINE_int32(compact_files_one_in, 0,
11fdf7f2
TL
429 "If non-zero, then CompactFiles() will be called once for every N "
430 "operations on average. 0 indicates CompactFiles() is disabled.");
431
432DEFINE_int32(compact_range_one_in, 0,
433 "If non-zero, then CompactRange() will be called once for every N "
434 "operations on average. 0 indicates CompactRange() is disabled.");
435
436DEFINE_int32(flush_one_in, 0,
437 "If non-zero, then Flush() will be called once for every N ops "
438 "on average. 0 indicates calls to Flush() are disabled.");
439
440DEFINE_int32(compact_range_width, 10000,
441 "The width of the ranges passed to CompactRange().");
442
443DEFINE_int32(acquire_snapshot_one_in, 0,
444 "If non-zero, then acquires a snapshot once every N operations on "
445 "average.");
446
447DEFINE_bool(compare_full_db_state_snapshot, false,
448 "If set we compare state of entire db (in one of the threads) with"
449 "each snapshot.");
450
451DEFINE_uint64(snapshot_hold_ops, 0,
452 "If non-zero, then releases snapshots N operations after they're "
453 "acquired.");
7c673cae
FG
454
455static bool ValidateInt32Percent(const char* flagname, int32_t value) {
456 if (value < 0 || value>100) {
457 fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n",
458 flagname, value);
459 return false;
460 }
461 return true;
462}
463
464DEFINE_int32(readpercent, 10,
465 "Ratio of reads to total workload (expressed as a percentage)");
11fdf7f2 466static const bool FLAGS_readpercent_dummy __attribute__((__unused__)) =
7c673cae
FG
467 RegisterFlagValidator(&FLAGS_readpercent, &ValidateInt32Percent);
468
469DEFINE_int32(prefixpercent, 20,
470 "Ratio of prefix iterators to total workload (expressed as a"
471 " percentage)");
11fdf7f2 472static const bool FLAGS_prefixpercent_dummy __attribute__((__unused__)) =
7c673cae
FG
473 RegisterFlagValidator(&FLAGS_prefixpercent, &ValidateInt32Percent);
474
475DEFINE_int32(writepercent, 45,
476 "Ratio of writes to total workload (expressed as a percentage)");
11fdf7f2 477static const bool FLAGS_writepercent_dummy __attribute__((__unused__)) =
7c673cae
FG
478 RegisterFlagValidator(&FLAGS_writepercent, &ValidateInt32Percent);
479
480DEFINE_int32(delpercent, 15,
481 "Ratio of deletes to total workload (expressed as a percentage)");
11fdf7f2 482static const bool FLAGS_delpercent_dummy __attribute__((__unused__)) =
7c673cae
FG
483 RegisterFlagValidator(&FLAGS_delpercent, &ValidateInt32Percent);
484
485DEFINE_int32(delrangepercent, 0,
486 "Ratio of range deletions to total workload (expressed as a "
487 "percentage). Cannot be used with test_batches_snapshots");
11fdf7f2 488static const bool FLAGS_delrangepercent_dummy __attribute__((__unused__)) =
7c673cae
FG
489 RegisterFlagValidator(&FLAGS_delrangepercent, &ValidateInt32Percent);
490
491DEFINE_int32(nooverwritepercent, 60,
492 "Ratio of keys without overwrite to total workload (expressed as "
493 " a percentage)");
494static const bool FLAGS_nooverwritepercent_dummy __attribute__((__unused__)) =
495 RegisterFlagValidator(&FLAGS_nooverwritepercent, &ValidateInt32Percent);
496
497DEFINE_int32(iterpercent, 10, "Ratio of iterations to total workload"
498 " (expressed as a percentage)");
11fdf7f2 499static const bool FLAGS_iterpercent_dummy __attribute__((__unused__)) =
7c673cae
FG
500 RegisterFlagValidator(&FLAGS_iterpercent, &ValidateInt32Percent);
501
502DEFINE_uint64(num_iterations, 10, "Number of iterations per MultiIterate run");
11fdf7f2 503static const bool FLAGS_num_iterations_dummy __attribute__((__unused__)) =
7c673cae
FG
504 RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range);
505
506namespace {
507enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
508 assert(ctype);
509
510 if (!strcasecmp(ctype, "none"))
511 return rocksdb::kNoCompression;
512 else if (!strcasecmp(ctype, "snappy"))
513 return rocksdb::kSnappyCompression;
514 else if (!strcasecmp(ctype, "zlib"))
515 return rocksdb::kZlibCompression;
516 else if (!strcasecmp(ctype, "bzip2"))
517 return rocksdb::kBZip2Compression;
518 else if (!strcasecmp(ctype, "lz4"))
519 return rocksdb::kLZ4Compression;
520 else if (!strcasecmp(ctype, "lz4hc"))
521 return rocksdb::kLZ4HCCompression;
522 else if (!strcasecmp(ctype, "xpress"))
523 return rocksdb::kXpressCompression;
524 else if (!strcasecmp(ctype, "zstd"))
525 return rocksdb::kZSTD;
526
11fdf7f2 527 fprintf(stderr, "Cannot parse compression type '%s'\n", ctype);
7c673cae
FG
528 return rocksdb::kSnappyCompression; //default value
529}
530
11fdf7f2
TL
531enum rocksdb::ChecksumType StringToChecksumType(const char* ctype) {
532 assert(ctype);
533 auto iter = rocksdb::checksum_type_string_map.find(ctype);
534 if (iter != rocksdb::checksum_type_string_map.end()) {
535 return iter->second;
536 }
537 fprintf(stderr, "Cannot parse checksum type '%s'\n", ctype);
538 return rocksdb::kCRC32c;
539}
540
541std::string ChecksumTypeToString(rocksdb::ChecksumType ctype) {
542 auto iter = std::find_if(
543 rocksdb::checksum_type_string_map.begin(),
544 rocksdb::checksum_type_string_map.end(),
545 [&](const std::pair<std::string, rocksdb::ChecksumType>&
546 name_and_enum_val) { return name_and_enum_val.second == ctype; });
547 assert(iter != rocksdb::checksum_type_string_map.end());
548 return iter->first;
549}
550
7c673cae
FG
551std::vector<std::string> SplitString(std::string src) {
552 std::vector<std::string> ret;
553 if (src.empty()) {
554 return ret;
555 }
556 size_t pos = 0;
557 size_t pos_comma;
558 while ((pos_comma = src.find(',', pos)) != std::string::npos) {
559 ret.push_back(src.substr(pos, pos_comma - pos));
560 pos = pos_comma + 1;
561 }
562 ret.push_back(src.substr(pos, src.length()));
563 return ret;
564}
565} // namespace
566
567DEFINE_string(compression_type, "snappy",
568 "Algorithm to use to compress the database");
569static enum rocksdb::CompressionType FLAGS_compression_type_e =
570 rocksdb::kSnappyCompression;
571
11fdf7f2
TL
572DEFINE_int32(compression_max_dict_bytes, 0,
573 "Maximum size of dictionary used to prime the compression "
574 "library.");
575
576DEFINE_int32(compression_zstd_max_train_bytes, 0,
577 "Maximum size of training data passed to zstd's dictionary "
578 "trainer.");
579
580DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks");
581static enum rocksdb::ChecksumType FLAGS_checksum_type_e = rocksdb::kCRC32c;
582
7c673cae
FG
583DEFINE_string(hdfs, "", "Name of hdfs environment");
584// posix or hdfs environment
585static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
586
587DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread.");
11fdf7f2 588static const bool FLAGS_ops_per_thread_dummy __attribute__((__unused__)) =
7c673cae
FG
589 RegisterFlagValidator(&FLAGS_ops_per_thread, &ValidateUint32Range);
590
591DEFINE_uint64(log2_keys_per_lock, 2, "Log2 of number of keys per lock");
11fdf7f2 592static const bool FLAGS_log2_keys_per_lock_dummy __attribute__((__unused__)) =
7c673cae
FG
593 RegisterFlagValidator(&FLAGS_log2_keys_per_lock, &ValidateUint32Range);
594
11fdf7f2
TL
595DEFINE_uint64(max_manifest_file_size, 16384, "Maximum size of a MANIFEST file");
596
7c673cae
FG
597DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable");
598
599enum RepFactory {
600 kSkipList,
601 kHashSkipList,
602 kVectorRep
603};
604
605namespace {
606enum RepFactory StringToRepFactory(const char* ctype) {
607 assert(ctype);
608
609 if (!strcasecmp(ctype, "skip_list"))
610 return kSkipList;
611 else if (!strcasecmp(ctype, "prefix_hash"))
612 return kHashSkipList;
613 else if (!strcasecmp(ctype, "vector"))
614 return kVectorRep;
615
616 fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
617 return kSkipList;
618}
11fdf7f2
TL
619
620#ifdef _MSC_VER
621#pragma warning(push)
622// truncation of constant value on static_cast
623#pragma warning(disable : 4309)
624#endif
625bool GetNextPrefix(const rocksdb::Slice& src, std::string* v) {
626 std::string ret = src.ToString();
627 for (int i = static_cast<int>(ret.size()) - 1; i >= 0; i--) {
628 if (ret[i] != static_cast<char>(255)) {
629 ret[i] = ret[i] + 1;
630 break;
631 } else if (i != 0) {
632 ret[i] = 0;
633 } else {
634 // all FF. No next prefix
635 return false;
636 }
637 }
638 *v = ret;
639 return true;
640}
641#ifdef _MSC_VER
642#pragma warning(pop)
643#endif
7c673cae
FG
644} // namespace
645
646static enum RepFactory FLAGS_rep_factory;
647DEFINE_string(memtablerep, "prefix_hash", "");
648
649static bool ValidatePrefixSize(const char* flagname, int32_t value) {
650 if (value < 0 || value > 8) {
651 fprintf(stderr, "Invalid value for --%s: %d. 0 <= PrefixSize <= 8\n",
652 flagname, value);
653 return false;
654 }
655 return true;
656}
657DEFINE_int32(prefix_size, 7, "Control the prefix size for HashSkipListRep");
11fdf7f2 658static const bool FLAGS_prefix_size_dummy __attribute__((__unused__)) =
7c673cae
FG
659 RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
660
661DEFINE_bool(use_merge, false, "On true, replaces all writes with a Merge "
662 "that behaves like a Put");
663
664DEFINE_bool(use_full_merge_v1, false,
665 "On true, use a merge operator that implement the deprecated "
666 "version of FullMerge");
667
668namespace rocksdb {
669
670// convert long to a big-endian slice key
671static std::string Key(int64_t val) {
672 std::string little_endian_key;
673 std::string big_endian_key;
674 PutFixed64(&little_endian_key, val);
675 assert(little_endian_key.size() == sizeof(val));
676 big_endian_key.resize(sizeof(val));
677 for (size_t i = 0 ; i < sizeof(val); ++i) {
678 big_endian_key[i] = little_endian_key[sizeof(val) - 1 - i];
679 }
680 return big_endian_key;
681}
682
11fdf7f2
TL
683static bool GetIntVal(std::string big_endian_key, uint64_t *key_p) {
684 unsigned int size_key = sizeof(*key_p);
685 assert(big_endian_key.size() == size_key);
686 std::string little_endian_key;
687 little_endian_key.resize(size_key);
688 for (size_t i = 0 ; i < size_key; ++i) {
689 little_endian_key[i] = big_endian_key[size_key - 1 - i];
690 }
691 Slice little_endian_slice = Slice(little_endian_key);
692 return GetFixed64(&little_endian_slice, key_p);
693}
694
7c673cae
FG
695static std::string StringToHex(const std::string& str) {
696 std::string result = "0x";
697 result.append(Slice(str).ToString(true));
698 return result;
699}
700
701
702class StressTest;
703namespace {
704
705class Stats {
706 private:
707 uint64_t start_;
708 uint64_t finish_;
709 double seconds_;
710 long done_;
711 long gets_;
712 long prefixes_;
713 long writes_;
714 long deletes_;
715 size_t single_deletes_;
716 long iterator_size_sums_;
717 long founds_;
718 long iterations_;
719 long range_deletions_;
720 long covered_by_range_deletions_;
721 long errors_;
722 long num_compact_files_succeed_;
723 long num_compact_files_failed_;
724 int next_report_;
725 size_t bytes_;
726 uint64_t last_op_finish_;
727 HistogramImpl hist_;
728
729 public:
730 Stats() { }
731
732 void Start() {
733 next_report_ = 100;
734 hist_.Clear();
735 done_ = 0;
736 gets_ = 0;
737 prefixes_ = 0;
738 writes_ = 0;
739 deletes_ = 0;
740 single_deletes_ = 0;
741 iterator_size_sums_ = 0;
742 founds_ = 0;
743 iterations_ = 0;
744 range_deletions_ = 0;
745 covered_by_range_deletions_ = 0;
746 errors_ = 0;
747 bytes_ = 0;
748 seconds_ = 0;
749 num_compact_files_succeed_ = 0;
750 num_compact_files_failed_ = 0;
751 start_ = FLAGS_env->NowMicros();
752 last_op_finish_ = start_;
753 finish_ = start_;
754 }
755
756 void Merge(const Stats& other) {
757 hist_.Merge(other.hist_);
758 done_ += other.done_;
759 gets_ += other.gets_;
760 prefixes_ += other.prefixes_;
761 writes_ += other.writes_;
762 deletes_ += other.deletes_;
763 single_deletes_ += other.single_deletes_;
764 iterator_size_sums_ += other.iterator_size_sums_;
765 founds_ += other.founds_;
766 iterations_ += other.iterations_;
767 range_deletions_ += other.range_deletions_;
768 covered_by_range_deletions_ = other.covered_by_range_deletions_;
769 errors_ += other.errors_;
770 bytes_ += other.bytes_;
771 seconds_ += other.seconds_;
772 num_compact_files_succeed_ += other.num_compact_files_succeed_;
773 num_compact_files_failed_ += other.num_compact_files_failed_;
774 if (other.start_ < start_) start_ = other.start_;
775 if (other.finish_ > finish_) finish_ = other.finish_;
776 }
777
778 void Stop() {
779 finish_ = FLAGS_env->NowMicros();
780 seconds_ = (finish_ - start_) * 1e-6;
781 }
782
783 void FinishedSingleOp() {
784 if (FLAGS_histogram) {
785 auto now = FLAGS_env->NowMicros();
786 auto micros = now - last_op_finish_;
787 hist_.Add(micros);
788 if (micros > 20000) {
789 fprintf(stdout, "long op: %" PRIu64 " micros%30s\r", micros, "");
790 }
791 last_op_finish_ = now;
792 }
793
794 done_++;
795 if (FLAGS_progress_reports) {
796 if (done_ >= next_report_) {
797 if (next_report_ < 1000) next_report_ += 100;
798 else if (next_report_ < 5000) next_report_ += 500;
799 else if (next_report_ < 10000) next_report_ += 1000;
800 else if (next_report_ < 50000) next_report_ += 5000;
801 else if (next_report_ < 100000) next_report_ += 10000;
802 else if (next_report_ < 500000) next_report_ += 50000;
803 else next_report_ += 100000;
804 fprintf(stdout, "... finished %ld ops%30s\r", done_, "");
805 }
806 }
807 }
808
494da23a 809 void AddBytesForWrites(long nwrites, size_t nbytes) {
7c673cae
FG
810 writes_ += nwrites;
811 bytes_ += nbytes;
812 }
813
494da23a 814 void AddGets(long ngets, long nfounds) {
7c673cae
FG
815 founds_ += nfounds;
816 gets_ += ngets;
817 }
818
494da23a 819 void AddPrefixes(long nprefixes, long count) {
7c673cae
FG
820 prefixes_ += nprefixes;
821 iterator_size_sums_ += count;
822 }
823
494da23a 824 void AddIterations(long n) { iterations_ += n; }
7c673cae 825
494da23a 826 void AddDeletes(long n) { deletes_ += n; }
7c673cae
FG
827
828 void AddSingleDeletes(size_t n) { single_deletes_ += n; }
829
494da23a 830 void AddRangeDeletions(long n) { range_deletions_ += n; }
7c673cae 831
494da23a 832 void AddCoveredByRangeDeletions(long n) { covered_by_range_deletions_ += n; }
7c673cae 833
494da23a 834 void AddErrors(long n) { errors_ += n; }
7c673cae 835
494da23a 836 void AddNumCompactFilesSucceed(long n) { num_compact_files_succeed_ += n; }
7c673cae 837
494da23a 838 void AddNumCompactFilesFailed(long n) { num_compact_files_failed_ += n; }
7c673cae
FG
839
840 void Report(const char* name) {
841 std::string extra;
842 if (bytes_ < 1 || done_ < 1) {
843 fprintf(stderr, "No writes or ops?\n");
844 return;
845 }
846
847 double elapsed = (finish_ - start_) * 1e-6;
848 double bytes_mb = bytes_ / 1048576.0;
849 double rate = bytes_mb / elapsed;
850 double throughput = (double)done_/elapsed;
851
852 fprintf(stdout, "%-12s: ", name);
853 fprintf(stdout, "%.3f micros/op %ld ops/sec\n",
854 seconds_ * 1e6 / done_, (long)throughput);
855 fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n",
856 "", bytes_mb, rate, (100*writes_)/done_, done_);
857 fprintf(stdout, "%-12s: Wrote %ld times\n", "", writes_);
858 fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_);
859 fprintf(stdout, "%-12s: Single deleted %" ROCKSDB_PRIszt " times\n", "",
860 single_deletes_);
861 fprintf(stdout, "%-12s: %ld read and %ld found the key\n", "",
862 gets_, founds_);
863 fprintf(stdout, "%-12s: Prefix scanned %ld times\n", "", prefixes_);
864 fprintf(stdout, "%-12s: Iterator size sum is %ld\n", "",
865 iterator_size_sums_);
866 fprintf(stdout, "%-12s: Iterated %ld times\n", "", iterations_);
867 fprintf(stdout, "%-12s: Deleted %ld key-ranges\n", "", range_deletions_);
868 fprintf(stdout, "%-12s: Range deletions covered %ld keys\n", "",
869 covered_by_range_deletions_);
870
871 fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_);
872 fprintf(stdout, "%-12s: %ld CompactFiles() succeed\n", "",
873 num_compact_files_succeed_);
874 fprintf(stdout, "%-12s: %ld CompactFiles() did not succeed\n", "",
875 num_compact_files_failed_);
876
877 if (FLAGS_histogram) {
878 fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
879 }
880 fflush(stdout);
881 }
882};
883
884// State shared by all concurrent executions of the same benchmark.
885class SharedState {
886 public:
11fdf7f2
TL
887 // indicates a key may have any value (or not be present) as an operation on
888 // it is incomplete.
889 static const uint32_t UNKNOWN_SENTINEL;
890 // indicates a key should definitely be deleted
891 static const uint32_t DELETION_SENTINEL;
7c673cae
FG
892
893 explicit SharedState(StressTest* stress_test)
894 : cv_(&mu_),
895 seed_(static_cast<uint32_t>(FLAGS_seed)),
896 max_key_(FLAGS_max_key),
897 log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
898 num_threads_(FLAGS_threads),
899 num_initialized_(0),
900 num_populated_(0),
901 vote_reopen_(0),
902 num_done_(0),
903 start_(false),
904 start_verify_(false),
905 should_stop_bg_thread_(false),
906 bg_thread_finished_(false),
907 stress_test_(stress_test),
908 verification_failure_(false),
11fdf7f2
TL
909 no_overwrite_ids_(FLAGS_column_families),
910 values_(nullptr) {
7c673cae
FG
911 // Pick random keys in each column family that will not experience
912 // overwrite
913
914 printf("Choosing random keys with no overwrite\n");
11fdf7f2
TL
915 Random64 rnd(seed_);
916 // Start with the identity permutation. Subsequent iterations of
917 // for loop below will start with perm of previous for loop
918 int64_t *permutation = new int64_t[max_key_];
919 for (int64_t i = 0; i < max_key_; i++) {
920 permutation[i] = i;
921 }
922 // Now do the Knuth shuffle
923 int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
924 // Only need to figure out first num_no_overwrite_keys of permutation
925 no_overwrite_ids_.reserve(num_no_overwrite_keys);
926 for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
927 int64_t rand_index = i + rnd.Next() % (max_key_ - i);
928 // Swap i and rand_index;
929 int64_t temp = permutation[i];
930 permutation[i] = permutation[rand_index];
931 permutation[rand_index] = temp;
932 // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
933 // permutation
934 no_overwrite_ids_.insert(permutation[i]);
935 }
936 delete[] permutation;
937
938 size_t expected_values_size =
939 sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_;
940 bool values_init_needed = false;
941 Status status;
942 if (!FLAGS_expected_values_path.empty()) {
943 if (!std::atomic<uint32_t>{}.is_lock_free()) {
944 status = Status::InvalidArgument(
945 "Cannot use --expected_values_path on platforms without lock-free "
946 "std::atomic<uint32_t>");
947 }
948 if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
949 status = Status::InvalidArgument(
950 "Cannot use --expected_values_path on when "
951 "--clear_column_family_one_in is greater than zero.");
952 }
953 uint64_t size = 0;
954 if (status.ok()) {
955 status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size);
956 }
494da23a 957 std::unique_ptr<WritableFile> wfile;
11fdf7f2
TL
958 if (status.ok() && size == 0) {
959 const EnvOptions soptions;
960 status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile,
961 soptions);
962 }
963 if (status.ok() && size == 0) {
964 std::string buf(expected_values_size, '\0');
965 status = wfile->Append(buf);
966 values_init_needed = true;
967 }
968 if (status.ok()) {
969 status = FLAGS_env->NewMemoryMappedFileBuffer(
970 FLAGS_expected_values_path, &expected_mmap_buffer_);
971 }
972 if (status.ok()) {
973 assert(expected_mmap_buffer_->GetLen() == expected_values_size);
974 values_ =
975 static_cast<std::atomic<uint32_t>*>(expected_mmap_buffer_->GetBase());
976 assert(values_ != nullptr);
977 } else {
978 fprintf(stderr, "Failed opening shared file '%s' with error: %s\n",
979 FLAGS_expected_values_path.c_str(), status.ToString().c_str());
980 assert(values_ == nullptr);
981 }
982 }
983 if (values_ == nullptr) {
984 values_allocation_.reset(
985 new std::atomic<uint32_t>[FLAGS_column_families * max_key_]);
986 values_ = &values_allocation_[0];
987 values_init_needed = true;
988 }
989 assert(values_ != nullptr);
990 if (values_init_needed) {
991 for (int i = 0; i < FLAGS_column_families; ++i) {
992 for (int j = 0; j < max_key_; ++j) {
993 Delete(i, j, false /* pending */);
994 }
7c673cae 995 }
7c673cae
FG
996 }
997
998 if (FLAGS_test_batches_snapshots) {
999 fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
1000 return;
1001 }
7c673cae
FG
1002
1003 long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
1004 if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
1005 num_locks++;
1006 }
1007 fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families);
1008 key_locks_.resize(FLAGS_column_families);
1009
1010 for (int i = 0; i < FLAGS_column_families; ++i) {
1011 key_locks_[i].resize(num_locks);
1012 for (auto& ptr : key_locks_[i]) {
1013 ptr.reset(new port::Mutex);
1014 }
1015 }
1016 }
1017
1018 ~SharedState() {}
1019
1020 port::Mutex* GetMutex() {
1021 return &mu_;
1022 }
1023
1024 port::CondVar* GetCondVar() {
1025 return &cv_;
1026 }
1027
1028 StressTest* GetStressTest() const {
1029 return stress_test_;
1030 }
1031
1032 int64_t GetMaxKey() const {
1033 return max_key_;
1034 }
1035
1036 uint32_t GetNumThreads() const {
1037 return num_threads_;
1038 }
1039
1040 void IncInitialized() {
1041 num_initialized_++;
1042 }
1043
1044 void IncOperated() {
1045 num_populated_++;
1046 }
1047
1048 void IncDone() {
1049 num_done_++;
1050 }
1051
1052 void IncVotedReopen() {
1053 vote_reopen_ = (vote_reopen_ + 1) % num_threads_;
1054 }
1055
1056 bool AllInitialized() const {
1057 return num_initialized_ >= num_threads_;
1058 }
1059
1060 bool AllOperated() const {
1061 return num_populated_ >= num_threads_;
1062 }
1063
1064 bool AllDone() const {
1065 return num_done_ >= num_threads_;
1066 }
1067
1068 bool AllVotedReopen() {
1069 return (vote_reopen_ == 0);
1070 }
1071
1072 void SetStart() {
1073 start_ = true;
1074 }
1075
1076 void SetStartVerify() {
1077 start_verify_ = true;
1078 }
1079
1080 bool Started() const {
1081 return start_;
1082 }
1083
1084 bool VerifyStarted() const {
1085 return start_verify_;
1086 }
1087
1088 void SetVerificationFailure() { verification_failure_.store(true); }
1089
1090 bool HasVerificationFailedYet() { return verification_failure_.load(); }
1091
11fdf7f2 1092 port::Mutex* GetMutexForKey(int cf, int64_t key) {
7c673cae
FG
1093 return key_locks_[cf][key >> log2_keys_per_lock_].get();
1094 }
1095
1096 void LockColumnFamily(int cf) {
1097 for (auto& mutex : key_locks_[cf]) {
1098 mutex->Lock();
1099 }
1100 }
1101
1102 void UnlockColumnFamily(int cf) {
1103 for (auto& mutex : key_locks_[cf]) {
1104 mutex->Unlock();
1105 }
1106 }
1107
11fdf7f2
TL
1108 std::atomic<uint32_t>& Value(int cf, int64_t key) const {
1109 return values_[cf * max_key_ + key];
1110 }
1111
7c673cae 1112 void ClearColumnFamily(int cf) {
11fdf7f2
TL
1113 std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */),
1114 DELETION_SENTINEL);
7c673cae
FG
1115 }
1116
11fdf7f2
TL
1117 // @param pending True if the update may have started but is not yet
1118 // guaranteed finished. This is useful for crash-recovery testing when the
1119 // process may crash before updating the expected values array.
1120 void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
1121 if (!pending) {
1122 // prevent expected-value update from reordering before Write
1123 std::atomic_thread_fence(std::memory_order_release);
1124 }
1125 Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base,
1126 std::memory_order_relaxed);
1127 if (pending) {
1128 // prevent Write from reordering before expected-value update
1129 std::atomic_thread_fence(std::memory_order_release);
1130 }
7c673cae
FG
1131 }
1132
11fdf7f2 1133 uint32_t Get(int cf, int64_t key) const { return Value(cf, key); }
7c673cae 1134
11fdf7f2
TL
1135 // @param pending See comment above Put()
1136 // Returns true if the key was not yet deleted.
1137 bool Delete(int cf, int64_t key, bool pending) {
1138 if (Value(cf, key) == DELETION_SENTINEL) {
1139 return false;
1140 }
1141 Put(cf, key, DELETION_SENTINEL, pending);
1142 return true;
1143 }
7c673cae 1144
11fdf7f2
TL
1145 // @param pending See comment above Put()
1146 // Returns true if the key was not yet deleted.
1147 bool SingleDelete(int cf, int64_t key, bool pending) {
1148 return Delete(cf, key, pending);
1149 }
7c673cae 1150
11fdf7f2
TL
1151 // @param pending See comment above Put()
1152 // Returns number of keys deleted by the call.
1153 int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
7c673cae
FG
1154 int covered = 0;
1155 for (int64_t key = begin_key; key < end_key; ++key) {
11fdf7f2 1156 if (Delete(cf, key, pending)) {
7c673cae
FG
1157 ++covered;
1158 }
7c673cae
FG
1159 }
1160 return covered;
1161 }
1162
11fdf7f2
TL
1163 bool AllowsOverwrite(int64_t key) {
1164 return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
7c673cae
FG
1165 }
1166
11fdf7f2
TL
1167 bool Exists(int cf, int64_t key) {
1168 // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
1169 // is disallowed can't be accidentally added a second time, in which case
1170 // SingleDelete wouldn't be able to properly delete the key. It does allow
1171 // the case where a SingleDelete might be added which covers nothing, but
1172 // that's not a correctness issue.
1173 uint32_t expected_value = Value(cf, key).load();
1174 return expected_value != DELETION_SENTINEL;
1175 }
7c673cae
FG
1176
1177 uint32_t GetSeed() const { return seed_; }
1178
1179 void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
1180
1181 bool ShoudStopBgThread() { return should_stop_bg_thread_; }
1182
1183 void SetBgThreadFinish() { bg_thread_finished_ = true; }
1184
1185 bool BgThreadFinished() const { return bg_thread_finished_; }
1186
11fdf7f2
TL
1187 bool ShouldVerifyAtBeginning() const {
1188 return expected_mmap_buffer_.get() != nullptr;
1189 }
1190
7c673cae
FG
1191 private:
1192 port::Mutex mu_;
1193 port::CondVar cv_;
1194 const uint32_t seed_;
1195 const int64_t max_key_;
1196 const uint32_t log2_keys_per_lock_;
1197 const int num_threads_;
1198 long num_initialized_;
1199 long num_populated_;
1200 long vote_reopen_;
1201 long num_done_;
1202 bool start_;
1203 bool start_verify_;
1204 bool should_stop_bg_thread_;
1205 bool bg_thread_finished_;
1206 StressTest* stress_test_;
1207 std::atomic<bool> verification_failure_;
1208
1209 // Keys that should not be overwritten
11fdf7f2 1210 std::unordered_set<size_t> no_overwrite_ids_;
7c673cae 1211
11fdf7f2
TL
1212 std::atomic<uint32_t>* values_;
1213 std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_;
7c673cae
FG
1214 // Has to make it owned by a smart ptr as port::Mutex is not copyable
1215 // and storing it in the container may require copying depending on the impl.
1216 std::vector<std::vector<std::unique_ptr<port::Mutex> > > key_locks_;
11fdf7f2 1217 std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_;
7c673cae
FG
1218};
1219
11fdf7f2
TL
1220const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe;
1221const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff;
7c673cae
FG
1222
1223// Per-thread state for concurrent executions of the same benchmark.
1224struct ThreadState {
11fdf7f2
TL
1225 uint32_t tid; // 0..n-1
1226 Random rand; // Has different seeds for different threads
7c673cae
FG
1227 SharedState* shared;
1228 Stats stats;
11fdf7f2
TL
1229 struct SnapshotState {
1230 const Snapshot* snapshot;
1231 // The cf from which we did a Get at this snapshot
1232 int cf_at;
1233 // The name of the cf at the time that we did a read
1234 std::string cf_at_name;
1235 // The key with which we did a Get at this snapshot
1236 std::string key;
1237 // The status of the Get
1238 Status status;
1239 // The value of the Get
1240 std::string value;
1241 // optional state of all keys in the db
1242 std::vector<bool> *key_vec;
1243 };
1244 std::queue<std::pair<uint64_t, SnapshotState> > snapshot_queue;
7c673cae
FG
1245
1246 ThreadState(uint32_t index, SharedState* _shared)
1247 : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
1248};
1249
1250class DbStressListener : public EventListener {
1251 public:
11fdf7f2
TL
1252 DbStressListener(const std::string& db_name,
1253 const std::vector<DbPath>& db_paths,
1254 const std::vector<ColumnFamilyDescriptor>& column_families)
1255 : db_name_(db_name),
1256 db_paths_(db_paths),
1257 column_families_(column_families),
1258 num_pending_file_creations_(0) {}
1259 virtual ~DbStressListener() {
1260 assert(num_pending_file_creations_ == 0);
1261 }
7c673cae 1262#ifndef ROCKSDB_LITE
11fdf7f2 1263 virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
7c673cae
FG
1264 assert(IsValidColumnFamilyName(info.cf_name));
1265 VerifyFilePath(info.file_path);
1266 // pretending doing some work here
1267 std::this_thread::sleep_for(
11fdf7f2 1268 std::chrono::microseconds(Random::GetTLSInstance()->Uniform(5000)));
7c673cae
FG
1269 }
1270
11fdf7f2
TL
1271 virtual void OnCompactionCompleted(DB* /*db*/,
1272 const CompactionJobInfo& ci) override {
7c673cae
FG
1273 assert(IsValidColumnFamilyName(ci.cf_name));
1274 assert(ci.input_files.size() + ci.output_files.size() > 0U);
1275 for (const auto& file_path : ci.input_files) {
1276 VerifyFilePath(file_path);
1277 }
1278 for (const auto& file_path : ci.output_files) {
1279 VerifyFilePath(file_path);
1280 }
1281 // pretending doing some work here
1282 std::this_thread::sleep_for(
11fdf7f2 1283 std::chrono::microseconds(Random::GetTLSInstance()->Uniform(5000)));
7c673cae
FG
1284 }
1285
11fdf7f2
TL
1286 virtual void OnTableFileCreationStarted(
1287 const TableFileCreationBriefInfo& /*info*/) override {
1288 ++num_pending_file_creations_;
1289 }
1290 virtual void OnTableFileCreated(const TableFileCreationInfo& info) override {
7c673cae
FG
1291 assert(info.db_name == db_name_);
1292 assert(IsValidColumnFamilyName(info.cf_name));
11fdf7f2
TL
1293 if (info.file_size) {
1294 VerifyFilePath(info.file_path);
1295 }
7c673cae 1296 assert(info.job_id > 0 || FLAGS_compact_files_one_in > 0);
11fdf7f2 1297 if (info.status.ok() && info.file_size > 0) {
494da23a
TL
1298 assert(info.table_properties.data_size > 0 ||
1299 info.table_properties.num_range_deletions > 0);
7c673cae
FG
1300 assert(info.table_properties.raw_key_size > 0);
1301 assert(info.table_properties.num_entries > 0);
1302 }
11fdf7f2 1303 --num_pending_file_creations_;
7c673cae
FG
1304 }
1305
1306 protected:
1307 bool IsValidColumnFamilyName(const std::string& cf_name) const {
1308 if (cf_name == kDefaultColumnFamilyName) {
1309 return true;
1310 }
1311 // The column family names in the stress tests are numbers.
1312 for (size_t i = 0; i < cf_name.size(); ++i) {
1313 if (cf_name[i] < '0' || cf_name[i] > '9') {
1314 return false;
1315 }
1316 }
1317 return true;
1318 }
1319
1320 void VerifyFileDir(const std::string& file_dir) {
1321#ifndef NDEBUG
1322 if (db_name_ == file_dir) {
1323 return;
1324 }
1325 for (const auto& db_path : db_paths_) {
1326 if (db_path.path == file_dir) {
1327 return;
1328 }
1329 }
11fdf7f2
TL
1330 for (auto& cf : column_families_) {
1331 for (const auto& cf_path : cf.options.cf_paths) {
1332 if (cf_path.path == file_dir) {
1333 return;
1334 }
1335 }
1336 }
7c673cae 1337 assert(false);
11fdf7f2
TL
1338#else
1339 (void)file_dir;
7c673cae
FG
1340#endif // !NDEBUG
1341 }
1342
1343 void VerifyFileName(const std::string& file_name) {
1344#ifndef NDEBUG
1345 uint64_t file_number;
1346 FileType file_type;
1347 bool result = ParseFileName(file_name, &file_number, &file_type);
1348 assert(result);
1349 assert(file_type == kTableFile);
11fdf7f2
TL
1350#else
1351 (void)file_name;
7c673cae
FG
1352#endif // !NDEBUG
1353 }
1354
1355 void VerifyFilePath(const std::string& file_path) {
1356#ifndef NDEBUG
1357 size_t pos = file_path.find_last_of("/");
1358 if (pos == std::string::npos) {
1359 VerifyFileName(file_path);
1360 } else {
1361 if (pos > 0) {
1362 VerifyFileDir(file_path.substr(0, pos));
1363 }
1364 VerifyFileName(file_path.substr(pos));
1365 }
11fdf7f2
TL
1366#else
1367 (void)file_path;
7c673cae
FG
1368#endif // !NDEBUG
1369 }
1370#endif // !ROCKSDB_LITE
1371
1372 private:
1373 std::string db_name_;
1374 std::vector<DbPath> db_paths_;
11fdf7f2
TL
1375 std::vector<ColumnFamilyDescriptor> column_families_;
1376 std::atomic<int> num_pending_file_creations_;
7c673cae
FG
1377};
1378
1379} // namespace
1380
1381class StressTest {
1382 public:
1383 StressTest()
1384 : cache_(NewCache(FLAGS_cache_size)),
1385 compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
1386 filter_policy_(FLAGS_bloom_bits >= 0
1387 ? FLAGS_use_block_based_filter
1388 ? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
1389 : NewBloomFilterPolicy(FLAGS_bloom_bits, false)
1390 : nullptr),
1391 db_(nullptr),
11fdf7f2
TL
1392#ifndef ROCKSDB_LITE
1393 txn_db_(nullptr),
1394#endif
7c673cae 1395 new_column_family_name_(1),
494da23a
TL
1396 num_times_reopened_(0),
1397 db_preload_finished_(false) {
7c673cae
FG
1398 if (FLAGS_destroy_db_initially) {
1399 std::vector<std::string> files;
1400 FLAGS_env->GetChildren(FLAGS_db, &files);
1401 for (unsigned int i = 0; i < files.size(); i++) {
1402 if (Slice(files[i]).starts_with("heap-")) {
1403 FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
1404 }
1405 }
494da23a
TL
1406 Options options;
1407 options.env = FLAGS_env;
1408 Status s = DestroyDB(FLAGS_db, options);
1409 if (!s.ok()) {
1410 fprintf(stderr, "Cannot destroy original db: %s\n",
1411 s.ToString().c_str());
1412 exit(1);
1413 }
7c673cae
FG
1414 }
1415 }
1416
11fdf7f2 1417 virtual ~StressTest() {
7c673cae
FG
1418 for (auto cf : column_families_) {
1419 delete cf;
1420 }
1421 column_families_.clear();
1422 delete db_;
1423 }
1424
1425 std::shared_ptr<Cache> NewCache(size_t capacity) {
1426 if (capacity <= 0) {
1427 return nullptr;
1428 }
1429 if (FLAGS_use_clock_cache) {
1430 auto cache = NewClockCache((size_t)capacity);
1431 if (!cache) {
1432 fprintf(stderr, "Clock cache not supported.");
1433 exit(1);
1434 }
1435 return cache;
1436 } else {
1437 return NewLRUCache((size_t)capacity);
1438 }
1439 }
1440
1441 bool BuildOptionsTable() {
1442 if (FLAGS_set_options_one_in <= 0) {
1443 return true;
1444 }
1445
1446 std::unordered_map<std::string, std::vector<std::string> > options_tbl = {
1447 {"write_buffer_size",
11fdf7f2
TL
1448 {ToString(options_.write_buffer_size),
1449 ToString(options_.write_buffer_size * 2),
1450 ToString(options_.write_buffer_size * 4)}},
7c673cae 1451 {"max_write_buffer_number",
11fdf7f2
TL
1452 {ToString(options_.max_write_buffer_number),
1453 ToString(options_.max_write_buffer_number * 2),
1454 ToString(options_.max_write_buffer_number * 4)}},
7c673cae
FG
1455 {"arena_block_size",
1456 {
11fdf7f2
TL
1457 ToString(options_.arena_block_size),
1458 ToString(options_.write_buffer_size / 4),
1459 ToString(options_.write_buffer_size / 8),
7c673cae 1460 }},
7c673cae
FG
1461 {"memtable_huge_page_size", {"0", ToString(2 * 1024 * 1024)}},
1462 {"max_successive_merges", {"0", "2", "4"}},
1463 {"inplace_update_num_locks", {"100", "200", "300"}},
1464 // TODO(ljin): enable test for this option
1465 // {"disable_auto_compactions", {"100", "200", "300"}},
1466 {"soft_rate_limit", {"0", "0.5", "0.9"}},
1467 {"hard_rate_limit", {"0", "1.1", "2.0"}},
1468 {"level0_file_num_compaction_trigger",
1469 {
11fdf7f2
TL
1470 ToString(options_.level0_file_num_compaction_trigger),
1471 ToString(options_.level0_file_num_compaction_trigger + 2),
1472 ToString(options_.level0_file_num_compaction_trigger + 4),
7c673cae
FG
1473 }},
1474 {"level0_slowdown_writes_trigger",
1475 {
11fdf7f2
TL
1476 ToString(options_.level0_slowdown_writes_trigger),
1477 ToString(options_.level0_slowdown_writes_trigger + 2),
1478 ToString(options_.level0_slowdown_writes_trigger + 4),
7c673cae
FG
1479 }},
1480 {"level0_stop_writes_trigger",
1481 {
11fdf7f2
TL
1482 ToString(options_.level0_stop_writes_trigger),
1483 ToString(options_.level0_stop_writes_trigger + 2),
1484 ToString(options_.level0_stop_writes_trigger + 4),
7c673cae
FG
1485 }},
1486 {"max_compaction_bytes",
1487 {
11fdf7f2
TL
1488 ToString(options_.target_file_size_base * 5),
1489 ToString(options_.target_file_size_base * 15),
1490 ToString(options_.target_file_size_base * 100),
7c673cae
FG
1491 }},
1492 {"target_file_size_base",
1493 {
11fdf7f2
TL
1494 ToString(options_.target_file_size_base),
1495 ToString(options_.target_file_size_base * 2),
1496 ToString(options_.target_file_size_base * 4),
7c673cae
FG
1497 }},
1498 {"target_file_size_multiplier",
1499 {
11fdf7f2 1500 ToString(options_.target_file_size_multiplier), "1", "2",
7c673cae
FG
1501 }},
1502 {"max_bytes_for_level_base",
1503 {
11fdf7f2
TL
1504 ToString(options_.max_bytes_for_level_base / 2),
1505 ToString(options_.max_bytes_for_level_base),
1506 ToString(options_.max_bytes_for_level_base * 2),
7c673cae
FG
1507 }},
1508 {"max_bytes_for_level_multiplier",
1509 {
11fdf7f2 1510 ToString(options_.max_bytes_for_level_multiplier), "1", "2",
7c673cae
FG
1511 }},
1512 {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
1513 };
1514
1515 options_table_ = std::move(options_tbl);
1516
1517 for (const auto& iter : options_table_) {
1518 options_index_.push_back(iter.first);
1519 }
1520 return true;
1521 }
1522
1523 bool Run() {
11fdf7f2
TL
1524 uint64_t now = FLAGS_env->NowMicros();
1525 fprintf(stdout, "%s Initializing db_stress\n",
1526 FLAGS_env->TimeToString(now / 1000000).c_str());
7c673cae 1527 PrintEnv();
7c673cae 1528 Open();
11fdf7f2 1529 BuildOptionsTable();
7c673cae 1530 SharedState shared(this);
494da23a
TL
1531
1532 if (FLAGS_read_only) {
1533 now = FLAGS_env->NowMicros();
1534 fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
1535 FLAGS_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
1536 PreloadDbAndReopenAsReadOnly(FLAGS_max_key, &shared);
1537 }
7c673cae
FG
1538 uint32_t n = shared.GetNumThreads();
1539
11fdf7f2
TL
1540 now = FLAGS_env->NowMicros();
1541 fprintf(stdout, "%s Initializing worker threads\n",
1542 FLAGS_env->TimeToString(now / 1000000).c_str());
7c673cae
FG
1543 std::vector<ThreadState*> threads(n);
1544 for (uint32_t i = 0; i < n; i++) {
1545 threads[i] = new ThreadState(i, &shared);
1546 FLAGS_env->StartThread(ThreadBody, threads[i]);
1547 }
1548 ThreadState bg_thread(0, &shared);
1549 if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
1550 FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread);
1551 }
1552
1553 // Each thread goes through the following states:
1554 // initializing -> wait for others to init -> read/populate/depopulate
1555 // wait for others to operate -> verify -> done
1556
1557 {
1558 MutexLock l(shared.GetMutex());
1559 while (!shared.AllInitialized()) {
1560 shared.GetCondVar()->Wait();
1561 }
11fdf7f2
TL
1562 if (shared.ShouldVerifyAtBeginning()) {
1563 if (shared.HasVerificationFailedYet()) {
1564 printf("Crash-recovery verification failed :(\n");
1565 } else {
1566 printf("Crash-recovery verification passed :)\n");
1567 }
1568 }
7c673cae 1569
11fdf7f2 1570 now = FLAGS_env->NowMicros();
7c673cae
FG
1571 fprintf(stdout, "%s Starting database operations\n",
1572 FLAGS_env->TimeToString(now/1000000).c_str());
1573
1574 shared.SetStart();
1575 shared.GetCondVar()->SignalAll();
1576 while (!shared.AllOperated()) {
1577 shared.GetCondVar()->Wait();
1578 }
1579
1580 now = FLAGS_env->NowMicros();
1581 if (FLAGS_test_batches_snapshots) {
1582 fprintf(stdout, "%s Limited verification already done during gets\n",
1583 FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
1584 } else {
1585 fprintf(stdout, "%s Starting verification\n",
1586 FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
1587 }
1588
1589 shared.SetStartVerify();
1590 shared.GetCondVar()->SignalAll();
1591 while (!shared.AllDone()) {
1592 shared.GetCondVar()->Wait();
1593 }
1594 }
1595
1596 for (unsigned int i = 1; i < n; i++) {
1597 threads[0]->stats.Merge(threads[i]->stats);
1598 }
1599 threads[0]->stats.Report("Stress Test");
1600
1601 for (unsigned int i = 0; i < n; i++) {
1602 delete threads[i];
1603 threads[i] = nullptr;
1604 }
11fdf7f2
TL
1605 now = FLAGS_env->NowMicros();
1606 if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) {
7c673cae
FG
1607 fprintf(stdout, "%s Verification successful\n",
1608 FLAGS_env->TimeToString(now/1000000).c_str());
1609 }
1610 PrintStatistics();
1611
1612 if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
1613 MutexLock l(shared.GetMutex());
1614 shared.SetShouldStopBgThread();
1615 while (!shared.BgThreadFinished()) {
1616 shared.GetCondVar()->Wait();
1617 }
1618 }
1619
1620 if (shared.HasVerificationFailedYet()) {
1621 printf("Verification failed :(\n");
1622 return false;
1623 }
1624 return true;
1625 }
1626
11fdf7f2 1627 protected:
7c673cae
FG
1628 static void ThreadBody(void* v) {
1629 ThreadState* thread = reinterpret_cast<ThreadState*>(v);
1630 SharedState* shared = thread->shared;
1631
11fdf7f2
TL
1632 if (shared->ShouldVerifyAtBeginning()) {
1633 thread->shared->GetStressTest()->VerifyDb(thread);
1634 }
7c673cae
FG
1635 {
1636 MutexLock l(shared->GetMutex());
1637 shared->IncInitialized();
1638 if (shared->AllInitialized()) {
1639 shared->GetCondVar()->SignalAll();
1640 }
1641 while (!shared->Started()) {
1642 shared->GetCondVar()->Wait();
1643 }
1644 }
1645 thread->shared->GetStressTest()->OperateDb(thread);
1646
1647 {
1648 MutexLock l(shared->GetMutex());
1649 shared->IncOperated();
1650 if (shared->AllOperated()) {
1651 shared->GetCondVar()->SignalAll();
1652 }
1653 while (!shared->VerifyStarted()) {
1654 shared->GetCondVar()->Wait();
1655 }
1656 }
1657
11fdf7f2 1658 thread->shared->GetStressTest()->VerifyDb(thread);
7c673cae
FG
1659
1660 {
1661 MutexLock l(shared->GetMutex());
1662 shared->IncDone();
1663 if (shared->AllDone()) {
1664 shared->GetCondVar()->SignalAll();
1665 }
1666 }
7c673cae
FG
1667 }
1668
1669 static void PoolSizeChangeThread(void* v) {
1670 assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
1671 ThreadState* thread = reinterpret_cast<ThreadState*>(v);
1672 SharedState* shared = thread->shared;
1673
1674 while (true) {
1675 {
1676 MutexLock l(shared->GetMutex());
1677 if (shared->ShoudStopBgThread()) {
1678 shared->SetBgThreadFinish();
1679 shared->GetCondVar()->SignalAll();
1680 return;
1681 }
1682 }
1683
1684 auto thread_pool_size_base = FLAGS_max_background_compactions;
1685 auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations;
1686 int new_thread_pool_size =
1687 thread_pool_size_base - thread_pool_size_var +
1688 thread->rand.Next() % (thread_pool_size_var * 2 + 1);
1689 if (new_thread_pool_size < 1) {
1690 new_thread_pool_size = 1;
1691 }
1692 FLAGS_env->SetBackgroundThreads(new_thread_pool_size);
1693 // Sleep up to 3 seconds
1694 FLAGS_env->SleepForMicroseconds(
1695 thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
1696 1000 +
1697 1);
1698 }
1699 }
1700
11fdf7f2
TL
1701 static void PrintKeyValue(int cf, uint64_t key, const char* value,
1702 size_t sz) {
1703 if (!FLAGS_verbose) {
1704 return;
7c673cae 1705 }
11fdf7f2
TL
1706 std::string tmp;
1707 tmp.reserve(sz * 2 + 16);
1708 char buf[4];
1709 for (size_t i = 0; i < sz; i++) {
1710 snprintf(buf, 4, "%X", value[i]);
1711 tmp.append(buf);
7c673cae 1712 }
11fdf7f2
TL
1713 fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n", cf,
1714 key, sz, tmp.c_str());
1715 }
7c673cae 1716
11fdf7f2
TL
1717 static int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {
1718 const double completed_ratio =
1719 static_cast<double>(iteration) / FLAGS_ops_per_thread;
1720 const int64_t base_key = static_cast<int64_t>(
1721 completed_ratio * (FLAGS_max_key - FLAGS_active_width));
1722 return base_key + thread->rand.Next() % FLAGS_active_width;
7c673cae
FG
1723 }
1724
11fdf7f2
TL
1725 static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) {
1726 size_t value_sz =
1727 ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
1728 assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t));
1729 (void) max_sz;
1730 *((uint32_t*)v) = rand;
1731 for (size_t i=sizeof(uint32_t); i < value_sz; i++) {
1732 v[i] = (char)(rand ^ i);
1733 }
1734 v[value_sz] = '\0';
1735 return value_sz; // the size of the value set.
1736 }
7c673cae 1737
11fdf7f2
TL
1738 Status AssertSame(DB* db, ColumnFamilyHandle* cf,
1739 ThreadState::SnapshotState& snap_state) {
7c673cae 1740 Status s;
11fdf7f2
TL
1741 if (cf->GetName() != snap_state.cf_at_name) {
1742 return s;
1743 }
1744 ReadOptions ropt;
1745 ropt.snapshot = snap_state.snapshot;
1746 PinnableSlice exp_v(&snap_state.value);
1747 exp_v.PinSelf();
1748 PinnableSlice v;
1749 s = db->Get(ropt, cf, snap_state.key, &v);
1750 if (!s.ok() && !s.IsNotFound()) {
1751 return s;
1752 }
1753 if (snap_state.status != s) {
1754 return Status::Corruption(
1755 "The snapshot gave inconsistent results for key " +
1756 ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) +
1757 " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() +
1758 ") vs. (" + s.ToString() + ")");
7c673cae 1759 }
11fdf7f2
TL
1760 if (s.ok()) {
1761 if (exp_v != v) {
1762 return Status::Corruption("The snapshot gave inconsistent values: (" +
1763 exp_v.ToString() + ") vs. (" + v.ToString() +
1764 ")");
1765 }
1766 }
1767 if (snap_state.key_vec != nullptr) {
494da23a
TL
1768 // When `prefix_extractor` is set, seeking to beginning and scanning
1769 // across prefixes are only supported with `total_order_seek` set.
1770 ropt.total_order_seek = true;
11fdf7f2
TL
1771 std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
1772 std::unique_ptr<std::vector<bool>> tmp_bitvec(new std::vector<bool>(FLAGS_max_key));
1773 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
1774 uint64_t key_val;
1775 if (GetIntVal(iterator->key().ToString(), &key_val)) {
1776 (*tmp_bitvec.get())[key_val] = true;
1777 }
1778 }
1779 if (!std::equal(snap_state.key_vec->begin(),
1780 snap_state.key_vec->end(),
1781 tmp_bitvec.get()->begin())) {
1782 return Status::Corruption("Found inconsistent keys at this snapshot");
1783 }
1784 }
1785 return Status::OK();
1786 }
7c673cae 1787
494da23a
TL
1788 // Currently PreloadDb has to be single-threaded.
1789 void PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
1790 SharedState* shared) {
1791 WriteOptions write_opts;
1792 write_opts.disableWAL = FLAGS_disable_wal;
1793 if (FLAGS_sync) {
1794 write_opts.sync = true;
1795 }
1796 char value[100];
1797 int cf_idx = 0;
1798 Status s;
1799 for (auto cfh : column_families_) {
1800 for (int64_t k = 0; k != number_of_keys; ++k) {
1801 std::string key_str = Key(k);
1802 Slice key = key_str;
1803 size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
1804 Slice v(value, sz);
1805 shared->Put(cf_idx, k, 0, true /* pending */);
1806
1807 if (FLAGS_use_merge) {
1808 if (!FLAGS_use_txn) {
1809 s = db_->Merge(write_opts, cfh, key, v);
1810 } else {
1811#ifndef ROCKSDB_LITE
1812 Transaction* txn;
1813 s = NewTxn(write_opts, &txn);
1814 if (s.ok()) {
1815 s = txn->Merge(cfh, key, v);
1816 if (s.ok()) {
1817 s = CommitTxn(txn);
1818 }
1819 }
1820#endif
1821 }
1822 } else {
1823 if (!FLAGS_use_txn) {
1824 s = db_->Put(write_opts, cfh, key, v);
1825 } else {
1826#ifndef ROCKSDB_LITE
1827 Transaction* txn;
1828 s = NewTxn(write_opts, &txn);
1829 if (s.ok()) {
1830 s = txn->Put(cfh, key, v);
1831 if (s.ok()) {
1832 s = CommitTxn(txn);
1833 }
1834 }
1835#endif
1836 }
1837 }
1838
1839 shared->Put(cf_idx, k, 0, false /* pending */);
1840 if (!s.ok()) {
1841 break;
1842 }
1843 }
1844 if (!s.ok()) {
1845 break;
1846 }
1847 ++cf_idx;
1848 }
1849 if (s.ok()) {
1850 s = db_->Flush(FlushOptions(), column_families_);
1851 }
1852 if (s.ok()) {
1853 for (auto cf : column_families_) {
1854 delete cf;
1855 }
1856 column_families_.clear();
1857 delete db_;
1858 db_ = nullptr;
1859#ifndef ROCKSDB_LITE
1860 txn_db_ = nullptr;
1861#endif
1862
1863 db_preload_finished_.store(true);
1864 auto now = FLAGS_env->NowMicros();
1865 fprintf(stdout, "%s Reopening database in read-only\n",
1866 FLAGS_env->TimeToString(now / 1000000).c_str());
1867 // Reopen as read-only, can ignore all options related to updates
1868 Open();
1869 } else {
1870 fprintf(stderr, "Failed to preload db");
1871 exit(1);
1872 }
1873 }
1874
11fdf7f2
TL
1875 Status SetOptions(ThreadState* thread) {
1876 assert(FLAGS_set_options_one_in > 0);
1877 std::unordered_map<std::string, std::string> opts;
1878 std::string name = options_index_[
1879 thread->rand.Next() % options_index_.size()];
1880 int value_idx = thread->rand.Next() % options_table_[name].size();
1881 if (name == "soft_rate_limit" || name == "hard_rate_limit") {
1882 opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx];
1883 opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx];
1884 } else if (name == "level0_file_num_compaction_trigger" ||
1885 name == "level0_slowdown_writes_trigger" ||
1886 name == "level0_stop_writes_trigger") {
1887 opts["level0_file_num_compaction_trigger"] =
1888 options_table_["level0_file_num_compaction_trigger"][value_idx];
1889 opts["level0_slowdown_writes_trigger"] =
1890 options_table_["level0_slowdown_writes_trigger"][value_idx];
1891 opts["level0_stop_writes_trigger"] =
1892 options_table_["level0_stop_writes_trigger"][value_idx];
7c673cae 1893 } else {
11fdf7f2 1894 opts[name] = options_table_[name][value_idx];
7c673cae
FG
1895 }
1896
11fdf7f2
TL
1897 int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
1898 auto cfh = column_families_[rand_cf_idx];
1899 return db_->SetOptions(cfh, opts);
7c673cae
FG
1900 }
1901
11fdf7f2
TL
1902#ifndef ROCKSDB_LITE
1903 Status NewTxn(WriteOptions& write_opts, Transaction** txn) {
1904 if (!FLAGS_use_txn) {
1905 return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
1906 }
1907 static std::atomic<uint64_t> txn_id = {0};
1908 TransactionOptions txn_options;
1909 *txn = txn_db_->BeginTransaction(write_opts, txn_options);
1910 auto istr = std::to_string(txn_id.fetch_add(1));
1911 Status s = (*txn)->SetName("xid" + istr);
1912 return s;
1913 }
7c673cae 1914
11fdf7f2
TL
1915 Status CommitTxn(Transaction* txn) {
1916 if (!FLAGS_use_txn) {
1917 return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
7c673cae 1918 }
11fdf7f2
TL
1919 Status s = txn->Prepare();
1920 if (s.ok()) {
1921 s = txn->Commit();
7c673cae 1922 }
11fdf7f2 1923 delete txn;
7c673cae
FG
1924 return s;
1925 }
11fdf7f2 1926#endif
7c673cae 1927
11fdf7f2
TL
1928 virtual void OperateDb(ThreadState* thread) {
1929 ReadOptions read_opts(FLAGS_verify_checksum, true);
1930 WriteOptions write_opts;
1931 auto shared = thread->shared;
1932 char value[100];
1933 std::string from_db;
1934 if (FLAGS_sync) {
1935 write_opts.sync = true;
7c673cae
FG
1936 }
1937 write_opts.disableWAL = FLAGS_disable_wal;
1938 const int prefixBound = (int)FLAGS_readpercent + (int)FLAGS_prefixpercent;
1939 const int writeBound = prefixBound + (int)FLAGS_writepercent;
1940 const int delBound = writeBound + (int)FLAGS_delpercent;
1941 const int delRangeBound = delBound + (int)FLAGS_delrangepercent;
1942
1943 thread->stats.Start();
1944 for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
1945 if (thread->shared->HasVerificationFailedYet()) {
1946 break;
1947 }
1948 if (i != 0 && (i % (FLAGS_ops_per_thread / (FLAGS_reopen + 1))) == 0) {
1949 {
1950 thread->stats.FinishedSingleOp();
1951 MutexLock l(thread->shared->GetMutex());
11fdf7f2
TL
1952 while (!thread->snapshot_queue.empty()) {
1953 db_->ReleaseSnapshot(
1954 thread->snapshot_queue.front().second.snapshot);
1955 delete thread->snapshot_queue.front().second.key_vec;
1956 thread->snapshot_queue.pop();
1957 }
7c673cae
FG
1958 thread->shared->IncVotedReopen();
1959 if (thread->shared->AllVotedReopen()) {
1960 thread->shared->GetStressTest()->Reopen();
1961 thread->shared->GetCondVar()->SignalAll();
494da23a 1962 } else {
7c673cae
FG
1963 thread->shared->GetCondVar()->Wait();
1964 }
1965 // Commenting this out as we don't want to reset stats on each open.
1966 // thread->stats.Start();
1967 }
1968 }
1969
1970 // Change Options
1971 if (FLAGS_set_options_one_in > 0 &&
1972 thread->rand.OneIn(FLAGS_set_options_one_in)) {
1973 SetOptions(thread);
1974 }
1975
1976 if (FLAGS_set_in_place_one_in > 0 &&
1977 thread->rand.OneIn(FLAGS_set_in_place_one_in)) {
1978 options_.inplace_update_support ^= options_.inplace_update_support;
1979 }
1980
11fdf7f2
TL
1981 MaybeClearOneColumnFamily(thread);
1982
1983#ifndef ROCKSDB_LITE
7c673cae
FG
1984 if (FLAGS_compact_files_one_in > 0 &&
1985 thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) {
1986 auto* random_cf =
1987 column_families_[thread->rand.Next() % FLAGS_column_families];
1988 rocksdb::ColumnFamilyMetaData cf_meta_data;
1989 db_->GetColumnFamilyMetaData(random_cf, &cf_meta_data);
1990
1991 // Randomly compact up to three consecutive files from a level
1992 const int kMaxRetry = 3;
1993 for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
1994 size_t random_level = thread->rand.Uniform(
1995 static_cast<int>(cf_meta_data.levels.size()));
1996
1997 const auto& files = cf_meta_data.levels[random_level].files;
1998 if (files.size() > 0) {
1999 size_t random_file_index =
2000 thread->rand.Uniform(static_cast<int>(files.size()));
2001 if (files[random_file_index].being_compacted) {
2002 // Retry as the selected file is currently being compacted
2003 continue;
2004 }
2005
2006 std::vector<std::string> input_files;
2007 input_files.push_back(files[random_file_index].name);
2008 if (random_file_index > 0 &&
2009 !files[random_file_index - 1].being_compacted) {
2010 input_files.push_back(files[random_file_index - 1].name);
2011 }
2012 if (random_file_index + 1 < files.size() &&
2013 !files[random_file_index + 1].being_compacted) {
2014 input_files.push_back(files[random_file_index + 1].name);
2015 }
2016
2017 size_t output_level =
2018 std::min(random_level + 1, cf_meta_data.levels.size() - 1);
2019 auto s =
2020 db_->CompactFiles(CompactionOptions(), random_cf, input_files,
2021 static_cast<int>(output_level));
2022 if (!s.ok()) {
11fdf7f2
TL
2023 fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
2024 s.ToString().c_str());
7c673cae
FG
2025 thread->stats.AddNumCompactFilesFailed(1);
2026 } else {
2027 thread->stats.AddNumCompactFilesSucceed(1);
2028 }
2029 break;
2030 }
2031 }
2032 }
2033#endif // !ROCKSDB_LITE
11fdf7f2 2034 int64_t rand_key = GenerateOneKey(thread, i);
7c673cae
FG
2035 int rand_column_family = thread->rand.Next() % FLAGS_column_families;
2036 std::string keystr = Key(rand_key);
2037 Slice key = keystr;
11fdf7f2
TL
2038 std::unique_ptr<MutexLock> lock;
2039 if (ShouldAcquireMutexOnKey()) {
2040 lock.reset(new MutexLock(
7c673cae
FG
2041 shared->GetMutexForKey(rand_column_family, rand_key)));
2042 }
11fdf7f2 2043
7c673cae
FG
2044 auto column_family = column_families_[rand_column_family];
2045
11fdf7f2
TL
2046 if (FLAGS_compact_range_one_in > 0 &&
2047 thread->rand.Uniform(FLAGS_compact_range_one_in) == 0) {
2048 int64_t end_key_num;
2049 if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) {
2050 end_key_num = port::kMaxInt64;
2051 } else {
2052 end_key_num = FLAGS_compact_range_width + rand_key;
2053 }
2054 std::string end_key_buf = Key(end_key_num);
2055 Slice end_key(end_key_buf);
2056
2057 CompactRangeOptions cro;
2058 cro.exclusive_manual_compaction =
2059 static_cast<bool>(thread->rand.Next() % 2);
2060 Status status = db_->CompactRange(cro, column_family, &key, &end_key);
2061 if (!status.ok()) {
2062 printf("Unable to perform CompactRange(): %s\n",
2063 status.ToString().c_str());
2064 }
2065 }
2066
2067 std::vector<int> rand_column_families =
2068 GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
494da23a
TL
2069
2070 if (FLAGS_flush_one_in > 0 &&
2071 thread->rand.Uniform(FLAGS_flush_one_in) == 0) {
2072 FlushOptions flush_opts;
2073 std::vector<ColumnFamilyHandle*> cfhs;
2074 std::for_each(
2075 rand_column_families.begin(), rand_column_families.end(),
2076 [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
2077 Status status = db_->Flush(flush_opts, cfhs);
2078 if (!status.ok()) {
2079 fprintf(stdout, "Unable to perform Flush(): %s\n",
2080 status.ToString().c_str());
2081 }
2082 }
2083
11fdf7f2
TL
2084 std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
2085
2086 if (FLAGS_ingest_external_file_one_in > 0 &&
2087 thread->rand.Uniform(FLAGS_ingest_external_file_one_in) == 0) {
2088 TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
2089 }
2090
494da23a
TL
2091 if (FLAGS_backup_one_in > 0 &&
2092 thread->rand.Uniform(FLAGS_backup_one_in) == 0) {
2093 Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
2094 if (!s.ok()) {
2095 VerificationAbort(shared, "Backup/restore gave inconsistent state",
2096 s);
2097 }
2098 }
2099
2100 if (FLAGS_checkpoint_one_in > 0 &&
2101 thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
2102 Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
2103 if (!s.ok()) {
2104 VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
2105 }
2106 }
2107
11fdf7f2
TL
2108 if (FLAGS_acquire_snapshot_one_in > 0 &&
2109 thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
2110 auto snapshot = db_->GetSnapshot();
2111 ReadOptions ropt;
2112 ropt.snapshot = snapshot;
2113 std::string value_at;
2114 // When taking a snapshot, we also read a key from that snapshot. We
2115 // will later read the same key before releasing the snapshot and verify
2116 // that the results are the same.
2117 auto status_at = db_->Get(ropt, column_family, key, &value_at);
2118 std::vector<bool> *key_vec = nullptr;
2119
2120 if (FLAGS_compare_full_db_state_snapshot &&
2121 (thread->tid == 0)) {
2122 key_vec = new std::vector<bool>(FLAGS_max_key);
494da23a
TL
2123 // When `prefix_extractor` is set, seeking to beginning and scanning
2124 // across prefixes are only supported with `total_order_seek` set.
2125 ropt.total_order_seek = true;
11fdf7f2
TL
2126 std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
2127 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
2128 uint64_t key_val;
2129 if (GetIntVal(iterator->key().ToString(), &key_val)) {
2130 (*key_vec)[key_val] = true;
2131 }
2132 }
2133 }
2134
2135 ThreadState::SnapshotState snap_state = {
2136 snapshot, rand_column_family, column_family->GetName(),
2137 keystr, status_at, value_at, key_vec};
2138 thread->snapshot_queue.emplace(
2139 std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops),
2140 snap_state);
2141 }
2142 while (!thread->snapshot_queue.empty() &&
2143 i == thread->snapshot_queue.front().first) {
2144 auto snap_state = thread->snapshot_queue.front().second;
2145 assert(snap_state.snapshot);
2146 // Note: this is unsafe as the cf might be dropped concurrently. But it
2147 // is ok since unclean cf drop is cunnrently not supported by write
2148 // prepared transactions.
2149 Status s =
2150 AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
2151 if (!s.ok()) {
2152 VerificationAbort(shared, "Snapshot gave inconsistent state", s);
2153 }
2154 db_->ReleaseSnapshot(snap_state.snapshot);
2155 delete snap_state.key_vec;
2156 thread->snapshot_queue.pop();
2157 }
2158
7c673cae
FG
2159 int prob_op = thread->rand.Uniform(100);
2160 if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
2161 // OPERATION read
11fdf7f2 2162 TestGet(thread, read_opts, rand_column_families, rand_keys);
7c673cae
FG
2163 } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
2164 // OPERATION prefix scan
2165 // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
2166 // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
2167 // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
2168 // prefix
11fdf7f2 2169 TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
7c673cae
FG
2170 } else if (prefixBound <= prob_op && prob_op < writeBound) {
2171 // OPERATION write
11fdf7f2
TL
2172 TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
2173 value, lock);
7c673cae
FG
2174 } else if (writeBound <= prob_op && prob_op < delBound) {
2175 // OPERATION delete
11fdf7f2 2176 TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
7c673cae
FG
2177 } else if (delBound <= prob_op && prob_op < delRangeBound) {
2178 // OPERATION delete range
11fdf7f2
TL
2179 TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
2180 lock);
7c673cae
FG
2181 } else {
2182 // OPERATION iterate
11fdf7f2 2183 TestIterate(thread, read_opts, rand_column_families, rand_keys);
7c673cae
FG
2184 }
2185 thread->stats.FinishedSingleOp();
2186 }
2187
2188 thread->stats.Stop();
2189 }
2190
11fdf7f2
TL
2191 virtual void VerifyDb(ThreadState* thread) const = 0;
2192
2193 virtual void MaybeClearOneColumnFamily(ThreadState* /* thread */) {}
2194
2195 virtual bool ShouldAcquireMutexOnKey() const { return false; }
2196
2197 virtual std::vector<int> GenerateColumnFamilies(
2198 const int /* num_column_families */, int rand_column_family) const {
2199 return {rand_column_family};
2200 }
2201
2202 virtual std::vector<int64_t> GenerateKeys(int64_t rand_key) const {
2203 return {rand_key};
2204 }
2205
2206 virtual Status TestGet(ThreadState* thread,
2207 const ReadOptions& read_opts,
2208 const std::vector<int>& rand_column_families,
2209 const std::vector<int64_t>& rand_keys) = 0;
2210
2211 virtual Status TestPrefixScan(ThreadState* thread,
2212 const ReadOptions& read_opts,
2213 const std::vector<int>& rand_column_families,
2214 const std::vector<int64_t>& rand_keys) = 0;
2215
2216 virtual Status TestPut(ThreadState* thread,
2217 WriteOptions& write_opts, const ReadOptions& read_opts,
2218 const std::vector<int>& cf_ids, const std::vector<int64_t>& keys,
2219 char (&value)[100], std::unique_ptr<MutexLock>& lock) = 0;
2220
2221 virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
2222 const std::vector<int>& rand_column_families,
2223 const std::vector<int64_t>& rand_keys,
2224 std::unique_ptr<MutexLock>& lock) = 0;
2225
2226 virtual Status TestDeleteRange(ThreadState* thread,
2227 WriteOptions& write_opts,
2228 const std::vector<int>& rand_column_families,
2229 const std::vector<int64_t>& rand_keys,
2230 std::unique_ptr<MutexLock>& lock) = 0;
2231
2232 virtual void TestIngestExternalFile(
2233 ThreadState* thread, const std::vector<int>& rand_column_families,
2234 const std::vector<int64_t>& rand_keys,
2235 std::unique_ptr<MutexLock>& lock) = 0;
2236
2237 // Given a key K, this creates an iterator which scans to K and then
2238 // does a random sequence of Next/Prev operations.
2239 virtual Status TestIterate(ThreadState* thread,
2240 const ReadOptions& read_opts,
2241 const std::vector<int>& rand_column_families,
2242 const std::vector<int64_t>& rand_keys) {
2243 Status s;
2244 const Snapshot* snapshot = db_->GetSnapshot();
2245 ReadOptions readoptionscopy = read_opts;
2246 readoptionscopy.snapshot = snapshot;
2247
2248 std::string upper_bound_str;
2249 Slice upper_bound;
2250 if (thread->rand.OneIn(16)) {
2251 // in 1/16 chance, set a iterator upper bound
2252 int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
2253 upper_bound_str = Key(rand_upper_key);
2254 upper_bound = Slice(upper_bound_str);
2255 // uppder_bound can be smaller than seek key, but the query itself
2256 // should not crash either.
2257 readoptionscopy.iterate_upper_bound = &upper_bound;
2258 }
2259 std::string lower_bound_str;
2260 Slice lower_bound;
2261 if (thread->rand.OneIn(16)) {
2262 // in 1/16 chance, set a iterator lower bound
2263 int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
2264 lower_bound_str = Key(rand_lower_key);
2265 lower_bound = Slice(lower_bound_str);
2266 // uppder_bound can be smaller than seek key, but the query itself
2267 // should not crash either.
2268 readoptionscopy.iterate_lower_bound = &lower_bound;
2269 }
2270
2271 auto cfh = column_families_[rand_column_families[0]];
2272 std::unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, cfh));
2273
2274 std::string key_str = Key(rand_keys[0]);
2275 Slice key = key_str;
2276 iter->Seek(key);
2277 for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
2278 if (thread->rand.OneIn(2)) {
2279 iter->Next();
2280 } else {
2281 iter->Prev();
7c673cae 2282 }
7c673cae
FG
2283 }
2284
2285 if (s.ok()) {
11fdf7f2 2286 thread->stats.AddIterations(1);
7c673cae 2287 } else {
11fdf7f2 2288 thread->stats.AddErrors(1);
7c673cae 2289 }
11fdf7f2
TL
2290
2291 db_->ReleaseSnapshot(snapshot);
2292
2293 return s;
7c673cae
FG
2294 }
2295
494da23a
TL
2296#ifdef ROCKSDB_LITE
2297 virtual Status TestBackupRestore(
2298 ThreadState* /* thread */,
2299 const std::vector<int>& /* rand_column_families */,
2300 const std::vector<int64_t>& /* rand_keys */) {
2301 assert(false);
2302 fprintf(stderr,
2303 "RocksDB lite does not support "
2304 "TestBackupRestore\n");
2305 std::terminate();
2306 }
2307
2308 virtual Status TestCheckpoint(
2309 ThreadState* /* thread */,
2310 const std::vector<int>& /* rand_column_families */,
2311 const std::vector<int64_t>& /* rand_keys */) {
2312 assert(false);
2313 fprintf(stderr,
2314 "RocksDB lite does not support "
2315 "TestCheckpoint\n");
2316 std::terminate();
2317 }
2318#else // ROCKSDB_LITE
2319 virtual Status TestBackupRestore(ThreadState* thread,
2320 const std::vector<int>& rand_column_families,
2321 const std::vector<int64_t>& rand_keys) {
2322 // Note the column families chosen by `rand_column_families` cannot be
2323 // dropped while the locks for `rand_keys` are held. So we should not have
2324 // to worry about accessing those column families throughout this function.
2325 assert(rand_column_families.size() == rand_keys.size());
2326 std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
2327 std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
2328 BackupableDBOptions backup_opts(backup_dir);
2329 BackupEngine* backup_engine = nullptr;
2330 Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
2331 if (s.ok()) {
2332 s = backup_engine->CreateNewBackup(db_);
2333 }
2334 if (s.ok()) {
2335 delete backup_engine;
2336 backup_engine = nullptr;
2337 s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
2338 }
2339 if (s.ok()) {
2340 s = backup_engine->RestoreDBFromLatestBackup(restore_dir /* db_dir */,
2341 restore_dir /* wal_dir */);
2342 }
2343 if (s.ok()) {
2344 s = backup_engine->PurgeOldBackups(0 /* num_backups_to_keep */);
2345 }
2346 DB* restored_db = nullptr;
2347 std::vector<ColumnFamilyHandle*> restored_cf_handles;
2348 if (s.ok()) {
2349 Options restore_options(options_);
2350 restore_options.listeners.clear();
2351 std::vector<ColumnFamilyDescriptor> cf_descriptors;
2352 // TODO(ajkr): `column_family_names_` is not safe to access here when
2353 // `clear_column_family_one_in != 0`. But we can't easily switch to
2354 // `ListColumnFamilies` to get names because it won't necessarily give
2355 // the same order as `column_family_names_`.
2356 assert(FLAGS_clear_column_family_one_in == 0);
2357 for (auto name : column_family_names_) {
2358 cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
2359 }
2360 s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
2361 &restored_cf_handles, &restored_db);
2362 }
2363 // for simplicity, currently only verifies existence/non-existence of a few
2364 // keys
2365 for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
2366 std::string key_str = Key(rand_keys[i]);
2367 Slice key = key_str;
2368 std::string restored_value;
2369 Status get_status = restored_db->Get(
2370 ReadOptions(), restored_cf_handles[rand_column_families[i]], key,
2371 &restored_value);
2372 bool exists =
2373 thread->shared->Exists(rand_column_families[i], rand_keys[i]);
2374 if (get_status.ok()) {
2375 if (!exists) {
2376 s = Status::Corruption(
2377 "key exists in restore but not in original db");
2378 }
2379 } else if (get_status.IsNotFound()) {
2380 if (exists) {
2381 s = Status::Corruption(
2382 "key exists in original db but not in restore");
2383 }
2384 } else {
2385 s = get_status;
2386 }
2387 }
2388 if (backup_engine != nullptr) {
2389 delete backup_engine;
2390 backup_engine = nullptr;
2391 }
2392 if (restored_db != nullptr) {
2393 for (auto* cf_handle : restored_cf_handles) {
2394 restored_db->DestroyColumnFamilyHandle(cf_handle);
2395 }
2396 delete restored_db;
2397 restored_db = nullptr;
2398 }
2399 if (!s.ok()) {
2400 printf("A backup/restore operation failed with: %s\n",
2401 s.ToString().c_str());
2402 }
2403 return s;
2404 }
2405
2406 virtual Status TestCheckpoint(ThreadState* thread,
2407 const std::vector<int>& rand_column_families,
2408 const std::vector<int64_t>& rand_keys) {
2409 // Note the column families chosen by `rand_column_families` cannot be
2410 // dropped while the locks for `rand_keys` are held. So we should not have
2411 // to worry about accessing those column families throughout this function.
2412 assert(rand_column_families.size() == rand_keys.size());
2413 std::string checkpoint_dir =
2414 FLAGS_db + "/.checkpoint" + ToString(thread->tid);
2415 DestroyDB(checkpoint_dir, Options());
2416 Checkpoint* checkpoint = nullptr;
2417 Status s = Checkpoint::Create(db_, &checkpoint);
2418 if (s.ok()) {
2419 s = checkpoint->CreateCheckpoint(checkpoint_dir);
2420 }
2421 std::vector<ColumnFamilyHandle*> cf_handles;
2422 DB* checkpoint_db = nullptr;
2423 if (s.ok()) {
2424 delete checkpoint;
2425 checkpoint = nullptr;
2426 Options options(options_);
2427 options.listeners.clear();
2428 std::vector<ColumnFamilyDescriptor> cf_descs;
2429 // TODO(ajkr): `column_family_names_` is not safe to access here when
2430 // `clear_column_family_one_in != 0`. But we can't easily switch to
2431 // `ListColumnFamilies` to get names because it won't necessarily give
2432 // the same order as `column_family_names_`.
2433 if (FLAGS_clear_column_family_one_in == 0) {
2434 for (const auto& name : column_family_names_) {
2435 cf_descs.emplace_back(name, ColumnFamilyOptions(options));
2436 }
2437 s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
2438 &cf_handles, &checkpoint_db);
2439 }
2440 }
2441 if (checkpoint_db != nullptr) {
2442 for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
2443 std::string key_str = Key(rand_keys[i]);
2444 Slice key = key_str;
2445 std::string value;
2446 Status get_status = checkpoint_db->Get(
2447 ReadOptions(), cf_handles[rand_column_families[i]], key, &value);
2448 bool exists =
2449 thread->shared->Exists(rand_column_families[i], rand_keys[i]);
2450 if (get_status.ok()) {
2451 if (!exists) {
2452 s = Status::Corruption(
2453 "key exists in checkpoint but not in original db");
2454 }
2455 } else if (get_status.IsNotFound()) {
2456 if (exists) {
2457 s = Status::Corruption(
2458 "key exists in original db but not in checkpoint");
2459 }
2460 } else {
2461 s = get_status;
2462 }
2463 }
2464 for (auto cfh : cf_handles) {
2465 delete cfh;
2466 }
2467 cf_handles.clear();
2468 delete checkpoint_db;
2469 checkpoint_db = nullptr;
2470 }
2471 DestroyDB(checkpoint_dir, Options());
2472 if (!s.ok()) {
2473 fprintf(stderr, "A checkpoint operation failed with: %s\n",
2474 s.ToString().c_str());
2475 }
2476 return s;
2477 }
2478#endif // ROCKSDB_LITE
2479
11fdf7f2
TL
2480 void VerificationAbort(SharedState* shared, std::string msg, Status s) const {
2481 printf("Verification failed: %s. Status is %s\n", msg.c_str(),
2482 s.ToString().c_str());
2483 shared->SetVerificationFailure();
7c673cae
FG
2484 }
2485
11fdf7f2
TL
2486 void VerificationAbort(SharedState* shared, std::string msg, int cf,
2487 int64_t key) const {
2488 printf("Verification failed for column family %d key %" PRIi64 ": %s\n", cf, key,
2489 msg.c_str());
2490 shared->SetVerificationFailure();
7c673cae
FG
2491 }
2492
2493 void PrintEnv() const {
2494 fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
2495 kMinorVersion);
11fdf7f2
TL
2496 fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
2497 fprintf(stdout, "TransactionDB : %s\n",
2498 FLAGS_use_txn ? "true" : "false");
494da23a
TL
2499 fprintf(stdout, "Read only mode : %s\n",
2500 FLAGS_read_only ? "true" : "false");
2501 fprintf(stdout, "Atomic flush : %s\n",
2502 FLAGS_atomic_flush ? "true" : "false");
7c673cae
FG
2503 fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
2504 if (!FLAGS_test_batches_snapshots) {
2505 fprintf(stdout, "Clear CFs one in : %d\n",
2506 FLAGS_clear_column_family_one_in);
2507 }
2508 fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
2509 fprintf(stdout, "Ops per thread : %lu\n",
2510 (unsigned long)FLAGS_ops_per_thread);
2511 std::string ttl_state("unused");
2512 if (FLAGS_ttl > 0) {
2513 ttl_state = NumberToString(FLAGS_ttl);
2514 }
2515 fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
2516 fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
2517 fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
2518 fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
2519 fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
2520 fprintf(stdout, "Delete range percentage : %d%%\n", FLAGS_delrangepercent);
2521 fprintf(stdout, "No overwrite percentage : %d%%\n",
2522 FLAGS_nooverwritepercent);
2523 fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
2524 fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
2525 FLAGS_db_write_buffer_size);
2526 fprintf(stdout, "Write-buffer-size : %d\n",
2527 FLAGS_write_buffer_size);
2528 fprintf(stdout, "Iterations : %lu\n",
2529 (unsigned long)FLAGS_num_iterations);
2530 fprintf(stdout, "Max key : %lu\n",
2531 (unsigned long)FLAGS_max_key);
2532 fprintf(stdout, "Ratio #ops/#keys : %f\n",
2533 (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
2534 fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen);
2535 fprintf(stdout, "Batches/snapshots : %d\n",
2536 FLAGS_test_batches_snapshots);
2537 fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update);
2538 fprintf(stdout, "Num keys per lock : %d\n",
2539 1 << FLAGS_log2_keys_per_lock);
2540 std::string compression = CompressionTypeToString(FLAGS_compression_type_e);
2541 fprintf(stdout, "Compression : %s\n", compression.c_str());
11fdf7f2
TL
2542 std::string checksum = ChecksumTypeToString(FLAGS_checksum_type_e);
2543 fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
7c673cae
FG
2544 fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
2545 FLAGS_subcompactions);
2546
2547 const char* memtablerep = "";
2548 switch (FLAGS_rep_factory) {
2549 case kSkipList:
2550 memtablerep = "skip_list";
2551 break;
2552 case kHashSkipList:
2553 memtablerep = "prefix_hash";
2554 break;
2555 case kVectorRep:
2556 memtablerep = "vector";
2557 break;
2558 }
2559
2560 fprintf(stdout, "Memtablerep : %s\n", memtablerep);
2561
2562 fprintf(stdout, "Test kill odd : %d\n", rocksdb_kill_odds);
2563 if (!rocksdb_kill_prefix_blacklist.empty()) {
2564 fprintf(stdout, "Skipping kill points prefixes:\n");
2565 for (auto& p : rocksdb_kill_prefix_blacklist) {
2566 fprintf(stdout, " %s\n", p.c_str());
2567 }
2568 }
2569
2570 fprintf(stdout, "------------------------------------------------\n");
2571 }
2572
2573 void Open() {
2574 assert(db_ == nullptr);
11fdf7f2
TL
2575#ifndef ROCKSDB_LITE
2576 assert(txn_db_ == nullptr);
2577#endif
2578 if (FLAGS_options_file.empty()) {
2579 BlockBasedTableOptions block_based_options;
2580 block_based_options.block_cache = cache_;
2581 block_based_options.block_cache_compressed = compressed_cache_;
2582 block_based_options.checksum = FLAGS_checksum_type_e;
2583 block_based_options.block_size = FLAGS_block_size;
2584 block_based_options.format_version =
2585 static_cast<uint32_t>(FLAGS_format_version);
2586 block_based_options.index_block_restart_interval =
2587 static_cast<int32_t>(FLAGS_index_block_restart_interval);
2588 block_based_options.filter_policy = filter_policy_;
2589 options_.table_factory.reset(
2590 NewBlockBasedTableFactory(block_based_options));
2591 options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
2592 options_.write_buffer_size = FLAGS_write_buffer_size;
2593 options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
2594 options_.min_write_buffer_number_to_merge =
2595 FLAGS_min_write_buffer_number_to_merge;
2596 options_.max_write_buffer_number_to_maintain =
2597 FLAGS_max_write_buffer_number_to_maintain;
2598 options_.memtable_prefix_bloom_size_ratio =
2599 FLAGS_memtable_prefix_bloom_size_ratio;
494da23a
TL
2600 options_.memtable_whole_key_filtering =
2601 FLAGS_memtable_whole_key_filtering;
11fdf7f2
TL
2602 options_.max_background_compactions = FLAGS_max_background_compactions;
2603 options_.max_background_flushes = FLAGS_max_background_flushes;
2604 options_.compaction_style =
2605 static_cast<rocksdb::CompactionStyle>(FLAGS_compaction_style);
2606 options_.prefix_extractor.reset(
2607 NewFixedPrefixTransform(FLAGS_prefix_size));
2608 options_.max_open_files = FLAGS_open_files;
2609 options_.statistics = dbstats;
2610 options_.env = FLAGS_env;
2611 options_.use_fsync = FLAGS_use_fsync;
2612 options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
2613 options_.allow_mmap_reads = FLAGS_mmap_read;
2614 options_.allow_mmap_writes = FLAGS_mmap_write;
2615 options_.use_direct_reads = FLAGS_use_direct_reads;
2616 options_.use_direct_io_for_flush_and_compaction =
2617 FLAGS_use_direct_io_for_flush_and_compaction;
494da23a
TL
2618 options_.recycle_log_file_num =
2619 static_cast<size_t>(FLAGS_recycle_log_file_num);
11fdf7f2
TL
2620 options_.target_file_size_base = FLAGS_target_file_size_base;
2621 options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
2622 options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
2623 options_.max_bytes_for_level_multiplier =
2624 FLAGS_max_bytes_for_level_multiplier;
2625 options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
2626 options_.level0_slowdown_writes_trigger =
2627 FLAGS_level0_slowdown_writes_trigger;
2628 options_.level0_file_num_compaction_trigger =
2629 FLAGS_level0_file_num_compaction_trigger;
2630 options_.compression = FLAGS_compression_type_e;
2631 options_.compression_opts.max_dict_bytes =
2632 FLAGS_compression_max_dict_bytes;
2633 options_.compression_opts.zstd_max_train_bytes =
2634 FLAGS_compression_zstd_max_train_bytes;
2635 options_.create_if_missing = true;
2636 options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
2637 options_.inplace_update_support = FLAGS_in_place_update;
2638 options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
2639 options_.allow_concurrent_memtable_write =
2640 FLAGS_allow_concurrent_memtable_write;
2641 options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
2642 options_.enable_write_thread_adaptive_yield =
2643 FLAGS_enable_write_thread_adaptive_yield;
2644 options_.compaction_options_universal.size_ratio =
2645 FLAGS_universal_size_ratio;
2646 options_.compaction_options_universal.min_merge_width =
2647 FLAGS_universal_min_merge_width;
2648 options_.compaction_options_universal.max_merge_width =
2649 FLAGS_universal_max_merge_width;
2650 options_.compaction_options_universal.max_size_amplification_percent =
2651 FLAGS_universal_max_size_amplification_percent;
494da23a 2652 options_.atomic_flush = FLAGS_atomic_flush;
11fdf7f2
TL
2653 } else {
2654#ifdef ROCKSDB_LITE
2655 fprintf(stderr, "--options_file not supported in lite mode\n");
2656 exit(1);
2657#else
2658 DBOptions db_options;
2659 std::vector<ColumnFamilyDescriptor> cf_descriptors;
2660 Status s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(),
2661 &db_options, &cf_descriptors);
2662 if (!s.ok()) {
2663 fprintf(stderr, "Unable to load options file %s --- %s\n",
2664 FLAGS_options_file.c_str(), s.ToString().c_str());
2665 exit(1);
2666 }
2667 options_ = Options(db_options, cf_descriptors[0].options);
2668#endif // ROCKSDB_LITE
2669 }
2670
2671 if (FLAGS_rate_limiter_bytes_per_sec > 0) {
2672 options_.rate_limiter.reset(NewGenericRateLimiter(
2673 FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
2674 10 /* fairness */,
2675 FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
2676 : RateLimiter::Mode::kWritesOnly));
2677 if (FLAGS_rate_limit_bg_reads) {
2678 options_.new_table_reader_for_compaction_inputs = true;
2679 }
2680 }
7c673cae
FG
2681
2682 if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
2683 fprintf(stderr,
2684 "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
2685 exit(1);
2686 }
2687 if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
2688 fprintf(stderr,
2689 "WARNING: prefix_size is non-zero but "
2690 "memtablerep != prefix_hash\n");
2691 }
2692 switch (FLAGS_rep_factory) {
2693 case kSkipList:
2694 // no need to do anything
2695 break;
2696#ifndef ROCKSDB_LITE
2697 case kHashSkipList:
2698 options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
2699 break;
2700 case kVectorRep:
2701 options_.memtable_factory.reset(new VectorRepFactory());
2702 break;
2703#else
2704 default:
2705 fprintf(stderr,
2706 "RocksdbLite only supports skip list mem table. Skip "
2707 "--rep_factory\n");
2708#endif // ROCKSDB_LITE
2709 }
2710
2711 if (FLAGS_use_full_merge_v1) {
2712 options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
2713 } else {
2714 options_.merge_operator = MergeOperators::CreatePutOperator();
2715 }
2716
7c673cae
FG
2717 fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2718
2719 Status s;
2720 if (FLAGS_ttl == -1) {
2721 std::vector<std::string> existing_column_families;
2722 s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
2723 &existing_column_families); // ignore errors
2724 if (!s.ok()) {
2725 // DB doesn't exist
2726 assert(existing_column_families.empty());
2727 assert(column_family_names_.empty());
2728 column_family_names_.push_back(kDefaultColumnFamilyName);
2729 } else if (column_family_names_.empty()) {
2730 // this is the first call to the function Open()
2731 column_family_names_ = existing_column_families;
2732 } else {
2733 // this is a reopen. just assert that existing column_family_names are
2734 // equivalent to what we remember
2735 auto sorted_cfn = column_family_names_;
2736 std::sort(sorted_cfn.begin(), sorted_cfn.end());
2737 std::sort(existing_column_families.begin(),
2738 existing_column_families.end());
2739 if (sorted_cfn != existing_column_families) {
2740 fprintf(stderr,
2741 "Expected column families differ from the existing:\n");
2742 printf("Expected: {");
2743 for (auto cf : sorted_cfn) {
2744 printf("%s ", cf.c_str());
2745 }
2746 printf("}\n");
2747 printf("Existing: {");
2748 for (auto cf : existing_column_families) {
2749 printf("%s ", cf.c_str());
2750 }
2751 printf("}\n");
2752 }
2753 assert(sorted_cfn == existing_column_families);
2754 }
2755 std::vector<ColumnFamilyDescriptor> cf_descriptors;
2756 for (auto name : column_family_names_) {
2757 if (name != kDefaultColumnFamilyName) {
2758 new_column_family_name_ =
2759 std::max(new_column_family_name_.load(), std::stoi(name) + 1);
2760 }
2761 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2762 }
2763 while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
2764 std::string name = ToString(new_column_family_name_.load());
2765 new_column_family_name_++;
2766 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2767 column_family_names_.push_back(name);
2768 }
2769 options_.listeners.clear();
2770 options_.listeners.emplace_back(
11fdf7f2 2771 new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
7c673cae 2772 options_.create_missing_column_families = true;
11fdf7f2 2773 if (!FLAGS_use_txn) {
494da23a
TL
2774 if (db_preload_finished_.load() && FLAGS_read_only) {
2775 s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, cf_descriptors,
2776 &column_families_, &db_);
2777 } else {
2778 s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
2779 &column_families_, &db_);
2780 }
11fdf7f2
TL
2781 } else {
2782#ifndef ROCKSDB_LITE
2783 TransactionDBOptions txn_db_options;
2784 // For the moment it is sufficient to test WRITE_PREPARED policy
2785 txn_db_options.write_policy = TxnDBWritePolicy::WRITE_PREPARED;
2786 s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
2787 cf_descriptors, &column_families_, &txn_db_);
2788 db_ = txn_db_;
2789 // after a crash, rollback to commit recovered transactions
2790 std::vector<Transaction*> trans;
2791 txn_db_->GetAllPreparedTransactions(&trans);
2792 Random rand(static_cast<uint32_t>(FLAGS_seed));
2793 for (auto txn : trans) {
2794 if (rand.OneIn(2)) {
2795 s = txn->Commit();
2796 assert(s.ok());
2797 } else {
2798 s = txn->Rollback();
2799 assert(s.ok());
2800 }
2801 delete txn;
2802 }
2803 trans.clear();
2804 txn_db_->GetAllPreparedTransactions(&trans);
2805 assert(trans.size() == 0);
2806#endif
2807 }
7c673cae
FG
2808 assert(!s.ok() || column_families_.size() ==
2809 static_cast<size_t>(FLAGS_column_families));
2810 } else {
2811#ifndef ROCKSDB_LITE
2812 DBWithTTL* db_with_ttl;
2813 s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
2814 db_ = db_with_ttl;
2815#else
2816 fprintf(stderr, "TTL is not supported in RocksDBLite\n");
2817 exit(1);
2818#endif
2819 }
2820 if (!s.ok()) {
2821 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2822 exit(1);
2823 }
2824 }
2825
2826 void Reopen() {
2827 for (auto cf : column_families_) {
2828 delete cf;
2829 }
2830 column_families_.clear();
2831 delete db_;
2832 db_ = nullptr;
11fdf7f2
TL
2833#ifndef ROCKSDB_LITE
2834 txn_db_ = nullptr;
2835#endif
7c673cae
FG
2836
2837 num_times_reopened_++;
2838 auto now = FLAGS_env->NowMicros();
2839 fprintf(stdout, "%s Reopening database for the %dth time\n",
2840 FLAGS_env->TimeToString(now/1000000).c_str(),
2841 num_times_reopened_);
2842 Open();
2843 }
2844
2845 void PrintStatistics() {
2846 if (dbstats) {
2847 fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
2848 }
2849 }
2850
7c673cae
FG
2851 std::shared_ptr<Cache> cache_;
2852 std::shared_ptr<Cache> compressed_cache_;
2853 std::shared_ptr<const FilterPolicy> filter_policy_;
2854 DB* db_;
11fdf7f2
TL
2855#ifndef ROCKSDB_LITE
2856 TransactionDB* txn_db_;
2857#endif
7c673cae
FG
2858 Options options_;
2859 std::vector<ColumnFamilyHandle*> column_families_;
2860 std::vector<std::string> column_family_names_;
2861 std::atomic<int> new_column_family_name_;
2862 int num_times_reopened_;
2863 std::unordered_map<std::string, std::vector<std::string>> options_table_;
2864 std::vector<std::string> options_index_;
494da23a 2865 std::atomic<bool> db_preload_finished_;
7c673cae
FG
2866};
2867
11fdf7f2
TL
2868class NonBatchedOpsStressTest : public StressTest {
2869 public:
2870 NonBatchedOpsStressTest() {}
7c673cae 2871
11fdf7f2 2872 virtual ~NonBatchedOpsStressTest() {}
7c673cae 2873
11fdf7f2
TL
2874 virtual void VerifyDb(ThreadState* thread) const {
2875 ReadOptions options(FLAGS_verify_checksum, true);
2876 auto shared = thread->shared;
2877 const int64_t max_key = shared->GetMaxKey();
2878 const int64_t keys_per_thread = max_key / shared->GetNumThreads();
2879 int64_t start = keys_per_thread * thread->tid;
2880 int64_t end = start + keys_per_thread;
2881 if (thread->tid == shared->GetNumThreads() - 1) {
2882 end = max_key;
2883 }
2884 for (size_t cf = 0; cf < column_families_.size(); ++cf) {
2885 if (thread->shared->HasVerificationFailedYet()) {
2886 break;
2887 }
2888 if (!thread->rand.OneIn(2)) {
2889 // Use iterator to verify this range
494da23a 2890 std::unique_ptr<Iterator> iter(
11fdf7f2
TL
2891 db_->NewIterator(options, column_families_[cf]));
2892 iter->Seek(Key(start));
2893 for (auto i = start; i < end; i++) {
2894 if (thread->shared->HasVerificationFailedYet()) {
2895 break;
2896 }
2897 // TODO(ljin): update "long" to uint64_t
2898 // Reseek when the prefix changes
2899 if (i % (static_cast<int64_t>(1) << 8 * (8 - FLAGS_prefix_size)) ==
2900 0) {
2901 iter->Seek(Key(i));
2902 }
2903 std::string from_db;
2904 std::string keystr = Key(i);
2905 Slice k = keystr;
2906 Status s = iter->status();
2907 if (iter->Valid()) {
2908 if (iter->key().compare(k) > 0) {
2909 s = Status::NotFound(Slice());
2910 } else if (iter->key().compare(k) == 0) {
2911 from_db = iter->value().ToString();
2912 iter->Next();
2913 } else if (iter->key().compare(k) < 0) {
2914 VerificationAbort(shared, "An out of range key was found",
2915 static_cast<int>(cf), i);
2916 }
2917 } else {
2918 // The iterator found no value for the key in question, so do not
2919 // move to the next item in the iterator
2920 s = Status::NotFound(Slice());
2921 }
2922 VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
2923 true);
2924 if (from_db.length()) {
2925 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
2926 from_db.data(), from_db.length());
2927 }
2928 }
2929 } else {
2930 // Use Get to verify this range
2931 for (auto i = start; i < end; i++) {
2932 if (thread->shared->HasVerificationFailedYet()) {
2933 break;
2934 }
2935 std::string from_db;
2936 std::string keystr = Key(i);
2937 Slice k = keystr;
2938 Status s = db_->Get(options, column_families_[cf], k, &from_db);
2939 VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
2940 true);
2941 if (from_db.length()) {
2942 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
2943 from_db.data(), from_db.length());
2944 }
2945 }
2946 }
2947 }
7c673cae 2948 }
11fdf7f2
TL
2949
2950 virtual void MaybeClearOneColumnFamily(ThreadState* thread) {
2951 if (FLAGS_clear_column_family_one_in != 0 && FLAGS_column_families > 1) {
2952 if (thread->rand.OneIn(FLAGS_clear_column_family_one_in)) {
2953 // drop column family and then create it again (can't drop default)
2954 int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
2955 std::string new_name =
2956 ToString(new_column_family_name_.fetch_add(1));
2957 {
2958 MutexLock l(thread->shared->GetMutex());
2959 fprintf(
2960 stdout,
2961 "[CF %d] Dropping and recreating column family. new name: %s\n",
2962 cf, new_name.c_str());
2963 }
2964 thread->shared->LockColumnFamily(cf);
2965 Status s = db_->DropColumnFamily(column_families_[cf]);
2966 delete column_families_[cf];
2967 if (!s.ok()) {
2968 fprintf(stderr, "dropping column family error: %s\n",
2969 s.ToString().c_str());
2970 std::terminate();
2971 }
2972 s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
2973 &column_families_[cf]);
2974 column_family_names_[cf] = new_name;
2975 thread->shared->ClearColumnFamily(cf);
2976 if (!s.ok()) {
2977 fprintf(stderr, "creating column family error: %s\n",
2978 s.ToString().c_str());
2979 std::terminate();
2980 }
2981 thread->shared->UnlockColumnFamily(cf);
2982 }
2983 }
7c673cae 2984 }
7c673cae 2985
11fdf7f2 2986 virtual bool ShouldAcquireMutexOnKey() const { return true; }
7c673cae 2987
11fdf7f2
TL
2988 virtual Status TestGet(ThreadState* thread,
2989 const ReadOptions& read_opts,
2990 const std::vector<int>& rand_column_families,
2991 const std::vector<int64_t>& rand_keys) {
2992 auto cfh = column_families_[rand_column_families[0]];
2993 std::string key_str = Key(rand_keys[0]);
2994 Slice key = key_str;
2995 std::string from_db;
2996 Status s = db_->Get(read_opts, cfh, key, &from_db);
2997 if (s.ok()) {
2998 // found case
2999 thread->stats.AddGets(1, 1);
3000 } else if (s.IsNotFound()) {
3001 // not found case
3002 thread->stats.AddGets(1, 0);
3003 } else {
3004 // errors case
3005 thread->stats.AddErrors(1);
3006 }
3007 return s;
7c673cae 3008 }
11fdf7f2
TL
3009
3010 virtual Status TestPrefixScan(ThreadState* thread,
3011 const ReadOptions& read_opts,
3012 const std::vector<int>& rand_column_families,
3013 const std::vector<int64_t>& rand_keys) {
3014 auto cfh = column_families_[rand_column_families[0]];
3015 std::string key_str = Key(rand_keys[0]);
3016 Slice key = key_str;
3017 Slice prefix = Slice(key.data(), FLAGS_prefix_size);
3018
3019 std::string upper_bound;
3020 Slice ub_slice;
3021 ReadOptions ro_copy = read_opts;
3022 if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) {
3023 // For half of the time, set the upper bound to the next prefix
3024 ub_slice = Slice(upper_bound);
3025 ro_copy.iterate_upper_bound = &ub_slice;
3026 }
3027
3028 Iterator* iter = db_->NewIterator(ro_copy, cfh);
494da23a 3029 long count = 0;
11fdf7f2
TL
3030 for (iter->Seek(prefix);
3031 iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
3032 ++count;
3033 }
494da23a 3034 assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8)));
11fdf7f2
TL
3035 Status s = iter->status();
3036 if (iter->status().ok()) {
494da23a 3037 thread->stats.AddPrefixes(1, count);
11fdf7f2
TL
3038 } else {
3039 thread->stats.AddErrors(1);
3040 }
3041 delete iter;
3042 return s;
3043 }
3044
3045 virtual Status TestPut(ThreadState* thread,
3046 WriteOptions& write_opts, const ReadOptions& read_opts,
3047 const std::vector<int>& rand_column_families,
3048 const std::vector<int64_t>& rand_keys,
3049 char (&value) [100], std::unique_ptr<MutexLock>& lock) {
3050 auto shared = thread->shared;
3051 int64_t max_key = shared->GetMaxKey();
3052 int64_t rand_key = rand_keys[0];
3053 int rand_column_family = rand_column_families[0];
3054 while (!shared->AllowsOverwrite(rand_key) &&
3055 (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
3056 lock.reset();
3057 rand_key = thread->rand.Next() % max_key;
3058 rand_column_family = thread->rand.Next() % FLAGS_column_families;
3059 lock.reset(new MutexLock(
3060 shared->GetMutexForKey(rand_column_family, rand_key)));
3061 }
3062
3063 std::string key_str = Key(rand_key);
3064 Slice key = key_str;
3065 ColumnFamilyHandle* cfh = column_families_[rand_column_family];
3066
3067 if (FLAGS_verify_before_write) {
3068 std::string key_str2 = Key(rand_key);
3069 Slice k = key_str2;
3070 std::string from_db;
3071 Status s = db_->Get(read_opts, cfh, k, &from_db);
3072 if (!VerifyValue(rand_column_family, rand_key, read_opts, shared,
3073 from_db, s, true)) {
3074 return s;
3075 }
3076 }
3077 uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
3078 size_t sz = GenerateValue(value_base, value, sizeof(value));
3079 Slice v(value, sz);
3080 shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
3081 Status s;
3082 if (FLAGS_use_merge) {
3083 if (!FLAGS_use_txn) {
3084 s = db_->Merge(write_opts, cfh, key, v);
3085 } else {
3086#ifndef ROCKSDB_LITE
3087 Transaction* txn;
3088 s = NewTxn(write_opts, &txn);
3089 if (s.ok()) {
3090 s = txn->Merge(cfh, key, v);
3091 if (s.ok()) {
3092 s = CommitTxn(txn);
3093 }
3094 }
3095#endif
3096 }
3097 } else {
3098 if (!FLAGS_use_txn) {
3099 s = db_->Put(write_opts, cfh, key, v);
3100 } else {
3101#ifndef ROCKSDB_LITE
3102 Transaction* txn;
3103 s = NewTxn(write_opts, &txn);
3104 if (s.ok()) {
3105 s = txn->Put(cfh, key, v);
3106 if (s.ok()) {
3107 s = CommitTxn(txn);
3108 }
3109 }
3110#endif
3111 }
3112 }
3113 shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
3114 if (!s.ok()) {
3115 fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
3116 std::terminate();
3117 }
3118 thread->stats.AddBytesForWrites(1, sz);
3119 PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key),
3120 value, sz);
3121 return s;
3122 }
3123
3124 virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
3125 const std::vector<int>& rand_column_families,
3126 const std::vector<int64_t>& rand_keys,
3127 std::unique_ptr<MutexLock>& lock) {
3128 int64_t rand_key = rand_keys[0];
3129 int rand_column_family = rand_column_families[0];
3130 auto shared = thread->shared;
3131 int64_t max_key = shared->GetMaxKey();
3132
3133 // OPERATION delete
3134 // If the chosen key does not allow overwrite and it does not exist,
3135 // choose another key.
3136 while (!shared->AllowsOverwrite(rand_key) &&
3137 !shared->Exists(rand_column_family, rand_key)) {
3138 lock.reset();
3139 rand_key = thread->rand.Next() % max_key;
3140 rand_column_family = thread->rand.Next() % FLAGS_column_families;
3141 lock.reset(new MutexLock(
3142 shared->GetMutexForKey(rand_column_family, rand_key)));
3143 }
3144
3145 std::string key_str = Key(rand_key);
3146 Slice key = key_str;
3147 auto cfh = column_families_[rand_column_family];
3148
3149 // Use delete if the key may be overwritten and a single deletion
3150 // otherwise.
3151 Status s;
3152 if (shared->AllowsOverwrite(rand_key)) {
3153 shared->Delete(rand_column_family, rand_key, true /* pending */);
3154 if (!FLAGS_use_txn) {
3155 s = db_->Delete(write_opts, cfh, key);
3156 } else {
3157#ifndef ROCKSDB_LITE
3158 Transaction* txn;
3159 s = NewTxn(write_opts, &txn);
3160 if (s.ok()) {
3161 s = txn->Delete(cfh, key);
3162 if (s.ok()) {
3163 s = CommitTxn(txn);
3164 }
3165 }
3166#endif
3167 }
3168 shared->Delete(rand_column_family, rand_key, false /* pending */);
3169 thread->stats.AddDeletes(1);
3170 if (!s.ok()) {
3171 fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
3172 std::terminate();
3173 }
3174 } else {
3175 shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
3176 if (!FLAGS_use_txn) {
3177 s = db_->SingleDelete(write_opts, cfh, key);
3178 } else {
3179#ifndef ROCKSDB_LITE
3180 Transaction* txn;
3181 s = NewTxn(write_opts, &txn);
3182 if (s.ok()) {
3183 s = txn->SingleDelete(cfh, key);
3184 if (s.ok()) {
3185 s = CommitTxn(txn);
3186 }
3187 }
3188#endif
3189 }
3190 shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
3191 thread->stats.AddSingleDeletes(1);
3192 if (!s.ok()) {
3193 fprintf(stderr, "single delete error: %s\n",
3194 s.ToString().c_str());
3195 std::terminate();
3196 }
3197 }
3198 return s;
3199 }
3200
3201 virtual Status TestDeleteRange(ThreadState* thread,
3202 WriteOptions& write_opts,
3203 const std::vector<int>& rand_column_families,
3204 const std::vector<int64_t>& rand_keys,
3205 std::unique_ptr<MutexLock>& lock) {
3206 // OPERATION delete range
3207 std::vector<std::unique_ptr<MutexLock>> range_locks;
3208 // delete range does not respect disallowed overwrites. the keys for
3209 // which overwrites are disallowed are randomly distributed so it
3210 // could be expensive to find a range where each key allows
3211 // overwrites.
3212 int64_t rand_key = rand_keys[0];
3213 int rand_column_family = rand_column_families[0];
3214 auto shared = thread->shared;
3215 int64_t max_key = shared->GetMaxKey();
3216 if (rand_key > max_key - FLAGS_range_deletion_width) {
3217 lock.reset();
3218 rand_key = thread->rand.Next() %
3219 (max_key - FLAGS_range_deletion_width + 1);
3220 range_locks.emplace_back(new MutexLock(
3221 shared->GetMutexForKey(rand_column_family, rand_key)));
3222 } else {
3223 range_locks.emplace_back(std::move(lock));
3224 }
3225 for (int j = 1; j < FLAGS_range_deletion_width; ++j) {
3226 if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
3227 range_locks.emplace_back(new MutexLock(
3228 shared->GetMutexForKey(rand_column_family, rand_key + j)));
3229 }
3230 }
3231 shared->DeleteRange(rand_column_family, rand_key,
3232 rand_key + FLAGS_range_deletion_width,
3233 true /* pending */);
3234
3235 std::string keystr = Key(rand_key);
3236 Slice key = keystr;
3237 auto cfh = column_families_[rand_column_family];
3238 std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
3239 Slice end_key = end_keystr;
3240 Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
3241 if (!s.ok()) {
3242 fprintf(stderr, "delete range error: %s\n",
3243 s.ToString().c_str());
3244 std::terminate();
3245 }
3246 int covered = shared->DeleteRange(
3247 rand_column_family, rand_key,
3248 rand_key + FLAGS_range_deletion_width, false /* pending */);
3249 thread->stats.AddRangeDeletions(1);
3250 thread->stats.AddCoveredByRangeDeletions(covered);
3251 return s;
3252 }
3253
3254#ifdef ROCKSDB_LITE
3255 virtual void TestIngestExternalFile(
3256 ThreadState* /* thread */,
3257 const std::vector<int>& /* rand_column_families */,
3258 const std::vector<int64_t>& /* rand_keys */,
3259 std::unique_ptr<MutexLock>& /* lock */) {
3260 assert(false);
3261 fprintf(stderr,
3262 "RocksDB lite does not support "
3263 "TestIngestExternalFile\n");
3264 std::terminate();
3265 }
3266#else
3267 virtual void TestIngestExternalFile(
3268 ThreadState* thread, const std::vector<int>& rand_column_families,
3269 const std::vector<int64_t>& rand_keys, std::unique_ptr<MutexLock>& lock) {
3270 const std::string sst_filename =
3271 FLAGS_db + "/." + ToString(thread->tid) + ".sst";
3272 Status s;
3273 if (FLAGS_env->FileExists(sst_filename).ok()) {
3274 // Maybe we terminated abnormally before, so cleanup to give this file
3275 // ingestion a clean slate
3276 s = FLAGS_env->DeleteFile(sst_filename);
3277 }
3278
3279 SstFileWriter sst_file_writer(EnvOptions(), options_);
3280 if (s.ok()) {
3281 s = sst_file_writer.Open(sst_filename);
3282 }
3283 int64_t key_base = rand_keys[0];
3284 int column_family = rand_column_families[0];
3285 std::vector<std::unique_ptr<MutexLock> > range_locks;
3286 std::vector<uint32_t> values;
3287 SharedState* shared = thread->shared;
3288
3289 // Grab locks, set pending state on expected values, and add keys
3290 for (int64_t key = key_base;
3291 s.ok() && key < std::min(key_base + FLAGS_ingest_external_file_width,
3292 shared->GetMaxKey());
3293 ++key) {
3294 if (key == key_base) {
3295 range_locks.emplace_back(std::move(lock));
3296 } else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
3297 range_locks.emplace_back(
3298 new MutexLock(shared->GetMutexForKey(column_family, key)));
3299 }
3300
3301 uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
3302 values.push_back(value_base);
3303 shared->Put(column_family, key, value_base, true /* pending */);
3304
3305 char value[100];
3306 size_t value_len = GenerateValue(value_base, value, sizeof(value));
3307 auto key_str = Key(key);
3308 s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
3309 }
3310
3311 if (s.ok()) {
3312 s = sst_file_writer.Finish();
3313 }
3314 if (s.ok()) {
3315 s = db_->IngestExternalFile(column_families_[column_family],
3316 {sst_filename}, IngestExternalFileOptions());
3317 }
3318 if (!s.ok()) {
3319 fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
3320 std::terminate();
3321 }
3322 int64_t key = key_base;
3323 for (int32_t value : values) {
3324 shared->Put(column_family, key, value, false /* pending */);
3325 ++key;
3326 }
3327 }
3328#endif // ROCKSDB_LITE
3329
3330 bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/,
3331 SharedState* shared, const std::string& value_from_db,
3332 Status s, bool strict = false) const {
3333 if (shared->HasVerificationFailedYet()) {
3334 return false;
3335 }
3336 // compare value_from_db with the value in the shared state
3337 char value[kValueMaxLen];
3338 uint32_t value_base = shared->Get(cf, key);
3339 if (value_base == SharedState::UNKNOWN_SENTINEL) {
3340 return true;
3341 }
3342 if (value_base == SharedState::DELETION_SENTINEL && !strict) {
3343 return true;
3344 }
3345
3346 if (s.ok()) {
3347 if (value_base == SharedState::DELETION_SENTINEL) {
3348 VerificationAbort(shared, "Unexpected value found", cf, key);
3349 return false;
3350 }
3351 size_t sz = GenerateValue(value_base, value, sizeof(value));
3352 if (value_from_db.length() != sz) {
3353 VerificationAbort(shared, "Length of value read is not equal", cf, key);
3354 return false;
3355 }
3356 if (memcmp(value_from_db.data(), value, sz) != 0) {
3357 VerificationAbort(shared, "Contents of value read don't match", cf,
3358 key);
3359 return false;
3360 }
3361 } else {
3362 if (value_base != SharedState::DELETION_SENTINEL) {
3363 VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
3364 return false;
3365 }
3366 }
3367 return true;
3368 }
3369};
3370
3371class BatchedOpsStressTest : public StressTest {
3372 public:
3373 BatchedOpsStressTest() {}
3374 virtual ~BatchedOpsStressTest() {}
3375
3376 // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
3377 // ("9"+K, "9"+V) in DB atomically i.e in a single batch.
3378 // Also refer BatchedOpsStressTest::TestGet
3379 virtual Status TestPut(ThreadState* thread,
3380 WriteOptions& write_opts, const ReadOptions& /* read_opts */,
3381 const std::vector<int>& rand_column_families, const std::vector<int64_t>& rand_keys,
3382 char (&value)[100], std::unique_ptr<MutexLock>& /* lock */) {
3383 uint32_t value_base =
3384 thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
3385 size_t sz = GenerateValue(value_base, value, sizeof(value));
3386 Slice v(value, sz);
3387 std::string keys[10] = {"9", "8", "7", "6", "5",
3388 "4", "3", "2", "1", "0"};
3389 std::string values[10] = {"9", "8", "7", "6", "5",
3390 "4", "3", "2", "1", "0"};
3391 Slice value_slices[10];
3392 WriteBatch batch;
3393 Status s;
3394 auto cfh = column_families_[rand_column_families[0]];
3395 std::string key_str = Key(rand_keys[0]);
3396 for (int i = 0; i < 10; i++) {
3397 keys[i] += key_str;
3398 values[i] += v.ToString();
3399 value_slices[i] = values[i];
3400 if (FLAGS_use_merge) {
3401 batch.Merge(cfh, keys[i], value_slices[i]);
3402 } else {
3403 batch.Put(cfh, keys[i], value_slices[i]);
3404 }
3405 }
3406
3407 s = db_->Write(write_opts, &batch);
3408 if (!s.ok()) {
3409 fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
3410 thread->stats.AddErrors(1);
3411 } else {
3412 // we did 10 writes each of size sz + 1
3413 thread->stats.AddBytesForWrites(10, (sz + 1) * 10);
3414 }
3415
3416 return s;
3417 }
3418
3419 // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K)
3420 // in DB atomically i.e in a single batch. Also refer MultiGet.
3421 virtual Status TestDelete(ThreadState* thread, WriteOptions& writeoptions,
3422 const std::vector<int>& rand_column_families,
3423 const std::vector<int64_t>& rand_keys,
3424 std::unique_ptr<MutexLock>& /* lock */) {
3425 std::string keys[10] = {"9", "7", "5", "3", "1",
3426 "8", "6", "4", "2", "0"};
3427
3428 WriteBatch batch;
3429 Status s;
3430 auto cfh = column_families_[rand_column_families[0]];
3431 std::string key_str = Key(rand_keys[0]);
3432 for (int i = 0; i < 10; i++) {
3433 keys[i] += key_str;
3434 batch.Delete(cfh, keys[i]);
3435 }
3436
3437 s = db_->Write(writeoptions, &batch);
3438 if (!s.ok()) {
3439 fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str());
3440 thread->stats.AddErrors(1);
3441 } else {
3442 thread->stats.AddDeletes(10);
3443 }
3444
3445 return s;
3446 }
3447
3448 virtual Status TestDeleteRange(ThreadState* /* thread */,
3449 WriteOptions& /* write_opts */,
3450 const std::vector<int>& /* rand_column_families */,
3451 const std::vector<int64_t>& /* rand_keys */,
3452 std::unique_ptr<MutexLock>& /* lock */) {
3453 assert(false);
3454 return Status::NotSupported("BatchedOpsStressTest does not support "
3455 "TestDeleteRange");
3456 }
3457
3458 virtual void TestIngestExternalFile(
3459 ThreadState* /* thread */,
3460 const std::vector<int>& /* rand_column_families */,
3461 const std::vector<int64_t>& /* rand_keys */,
3462 std::unique_ptr<MutexLock>& /* lock */) {
3463 assert(false);
3464 fprintf(stderr,
3465 "BatchedOpsStressTest does not support "
3466 "TestIngestExternalFile\n");
3467 std::terminate();
3468 }
3469
3470 // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K
3471 // in the same snapshot, and verifies that all the values are of the form
3472 // "0"+V, "1"+V,..."9"+V.
3473 // ASSUMES that BatchedOpsStressTest::TestPut was used to put (K, V) into
3474 // the DB.
3475 virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
3476 const std::vector<int>& rand_column_families,
3477 const std::vector<int64_t>& rand_keys) {
3478 std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
3479 Slice key_slices[10];
3480 std::string values[10];
3481 ReadOptions readoptionscopy = readoptions;
3482 readoptionscopy.snapshot = db_->GetSnapshot();
3483 std::string key_str = Key(rand_keys[0]);
3484 Slice key = key_str;
3485 auto cfh = column_families_[rand_column_families[0]];
3486 std::string from_db;
3487 Status s;
3488 for (int i = 0; i < 10; i++) {
3489 keys[i] += key.ToString();
3490 key_slices[i] = keys[i];
3491 s = db_->Get(readoptionscopy, cfh, key_slices[i], &from_db);
3492 if (!s.ok() && !s.IsNotFound()) {
3493 fprintf(stderr, "get error: %s\n", s.ToString().c_str());
3494 values[i] = "";
3495 thread->stats.AddErrors(1);
3496 // we continue after error rather than exiting so that we can
3497 // find more errors if any
3498 } else if (s.IsNotFound()) {
3499 values[i] = "";
3500 thread->stats.AddGets(1, 0);
3501 } else {
3502 values[i] = from_db;
3503
3504 char expected_prefix = (keys[i])[0];
3505 char actual_prefix = (values[i])[0];
3506 if (actual_prefix != expected_prefix) {
3507 fprintf(stderr, "error expected prefix = %c actual = %c\n",
3508 expected_prefix, actual_prefix);
3509 }
3510 (values[i])[0] = ' '; // blank out the differing character
3511 thread->stats.AddGets(1, 1);
3512 }
3513 }
3514 db_->ReleaseSnapshot(readoptionscopy.snapshot);
3515
3516 // Now that we retrieved all values, check that they all match
3517 for (int i = 1; i < 10; i++) {
3518 if (values[i] != values[0]) {
3519 fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
3520 key.ToString(true).c_str(), StringToHex(values[0]).c_str(),
3521 StringToHex(values[i]).c_str());
3522 // we continue after error rather than exiting so that we can
3523 // find more errors if any
3524 }
3525 }
3526
3527 return s;
3528 }
3529
3530 // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
3531 // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
3532 // of the key. Each of these 10 scans returns a series of values;
3533 // each series should be the same length, and it is verified for each
3534 // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V.
3535 // ASSUMES that MultiPut was used to put (K, V)
3536 virtual Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
3537 const std::vector<int>& rand_column_families,
3538 const std::vector<int64_t>& rand_keys) {
3539 std::string key_str = Key(rand_keys[0]);
3540 Slice key = key_str;
3541 auto cfh = column_families_[rand_column_families[0]];
3542 std::string prefixes[10] = {"0", "1", "2", "3", "4",
3543 "5", "6", "7", "8", "9"};
3544 Slice prefix_slices[10];
3545 ReadOptions readoptionscopy[10];
3546 const Snapshot* snapshot = db_->GetSnapshot();
3547 Iterator* iters[10];
3548 std::string upper_bounds[10];
3549 Slice ub_slices[10];
3550 Status s = Status::OK();
3551 for (int i = 0; i < 10; i++) {
3552 prefixes[i] += key.ToString();
3553 prefixes[i].resize(FLAGS_prefix_size);
3554 prefix_slices[i] = Slice(prefixes[i]);
3555 readoptionscopy[i] = readoptions;
3556 readoptionscopy[i].snapshot = snapshot;
3557 if (thread->rand.OneIn(2) &&
3558 GetNextPrefix(prefix_slices[i], &(upper_bounds[i]))) {
3559 // For half of the time, set the upper bound to the next prefix
3560 ub_slices[i] = Slice(upper_bounds[i]);
3561 readoptionscopy[i].iterate_upper_bound = &(ub_slices[i]);
3562 }
3563 iters[i] = db_->NewIterator(readoptionscopy[i], cfh);
3564 iters[i]->Seek(prefix_slices[i]);
3565 }
3566
494da23a 3567 long count = 0;
11fdf7f2
TL
3568 while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
3569 count++;
3570 std::string values[10];
3571 // get list of all values for this iteration
3572 for (int i = 0; i < 10; i++) {
3573 // no iterator should finish before the first one
3574 assert(iters[i]->Valid() &&
3575 iters[i]->key().starts_with(prefix_slices[i]));
3576 values[i] = iters[i]->value().ToString();
3577
3578 char expected_first = (prefixes[i])[0];
3579 char actual_first = (values[i])[0];
3580
3581 if (actual_first != expected_first) {
3582 fprintf(stderr, "error expected first = %c actual = %c\n",
3583 expected_first, actual_first);
3584 }
3585 (values[i])[0] = ' '; // blank out the differing character
3586 }
3587 // make sure all values are equivalent
3588 for (int i = 0; i < 10; i++) {
3589 if (values[i] != values[0]) {
3590 fprintf(stderr, "error : %d, inconsistent values for prefix %s: %s, %s\n",
3591 i, prefixes[i].c_str(), StringToHex(values[0]).c_str(),
3592 StringToHex(values[i]).c_str());
3593 // we continue after error rather than exiting so that we can
3594 // find more errors if any
3595 }
3596 iters[i]->Next();
3597 }
3598 }
3599
3600 // cleanup iterators and snapshot
3601 for (int i = 0; i < 10; i++) {
3602 // if the first iterator finished, they should have all finished
3603 assert(!iters[i]->Valid() ||
3604 !iters[i]->key().starts_with(prefix_slices[i]));
3605 assert(iters[i]->status().ok());
3606 delete iters[i];
3607 }
3608 db_->ReleaseSnapshot(snapshot);
3609
3610 if (s.ok()) {
3611 thread->stats.AddPrefixes(1, count);
3612 } else {
3613 thread->stats.AddErrors(1);
3614 }
3615
3616 return s;
3617 }
3618
3619 virtual void VerifyDb(ThreadState* /* thread */) const {}
3620};
3621
494da23a
TL
3622class AtomicFlushStressTest : public StressTest {
3623 public:
3624 AtomicFlushStressTest() : batch_id_(0) {}
3625
3626 virtual ~AtomicFlushStressTest() {}
3627
3628 virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
3629 const ReadOptions& /* read_opts */,
3630 const std::vector<int>& rand_column_families,
3631 const std::vector<int64_t>& rand_keys,
3632 char (&value)[100],
3633 std::unique_ptr<MutexLock>& /* lock */) {
3634 std::string key_str = Key(rand_keys[0]);
3635 Slice key = key_str;
3636 uint64_t value_base = batch_id_.fetch_add(1);
3637 size_t sz =
3638 GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
3639 Slice v(value, sz);
3640 WriteBatch batch;
3641 for (auto cf : rand_column_families) {
3642 ColumnFamilyHandle* cfh = column_families_[cf];
3643 if (FLAGS_use_merge) {
3644 batch.Merge(cfh, key, v);
3645 } else { /* !FLAGS_use_merge */
3646 batch.Put(cfh, key, v);
3647 }
3648 }
3649 Status s = db_->Write(write_opts, &batch);
3650 if (!s.ok()) {
3651 fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
3652 thread->stats.AddErrors(1);
3653 } else {
3654 auto num = static_cast<long>(rand_column_families.size());
3655 thread->stats.AddBytesForWrites(num, (sz + 1) * num);
3656 }
3657
3658 return s;
3659 }
3660
3661 virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
3662 const std::vector<int>& rand_column_families,
3663 const std::vector<int64_t>& rand_keys,
3664 std::unique_ptr<MutexLock>& /* lock */) {
3665 std::string key_str = Key(rand_keys[0]);
3666 Slice key = key_str;
3667 WriteBatch batch;
3668 for (auto cf : rand_column_families) {
3669 ColumnFamilyHandle* cfh = column_families_[cf];
3670 batch.Delete(cfh, key);
3671 }
3672 Status s = db_->Write(write_opts, &batch);
3673 if (!s.ok()) {
3674 fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
3675 thread->stats.AddErrors(1);
3676 } else {
3677 thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
3678 }
3679 return s;
3680 }
3681
3682 virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
3683 const std::vector<int>& rand_column_families,
3684 const std::vector<int64_t>& rand_keys,
3685 std::unique_ptr<MutexLock>& /* lock */) {
3686 int64_t rand_key = rand_keys[0];
3687 auto shared = thread->shared;
3688 int64_t max_key = shared->GetMaxKey();
3689 if (rand_key > max_key - FLAGS_range_deletion_width) {
3690 rand_key =
3691 thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
3692 }
3693 std::string key_str = Key(rand_key);
3694 Slice key = key_str;
3695 std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
3696 Slice end_key = end_key_str;
3697 WriteBatch batch;
3698 for (auto cf : rand_column_families) {
3699 ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
3700 batch.DeleteRange(cfh, key, end_key);
3701 }
3702 Status s = db_->Write(write_opts, &batch);
3703 if (!s.ok()) {
3704 fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
3705 thread->stats.AddErrors(1);
3706 } else {
3707 thread->stats.AddRangeDeletions(
3708 static_cast<long>(rand_column_families.size()));
3709 }
3710 return s;
3711 }
3712
3713 virtual void TestIngestExternalFile(
3714 ThreadState* /* thread */,
3715 const std::vector<int>& /* rand_column_families */,
3716 const std::vector<int64_t>& /* rand_keys */,
3717 std::unique_ptr<MutexLock>& /* lock */) {
3718 assert(false);
3719 fprintf(stderr,
3720 "AtomicFlushStressTest does not support TestIngestExternalFile "
3721 "because it's not possible to verify the result\n");
3722 std::terminate();
3723 }
3724
3725 virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
3726 const std::vector<int>& rand_column_families,
3727 const std::vector<int64_t>& rand_keys) {
3728 std::string key_str = Key(rand_keys[0]);
3729 Slice key = key_str;
3730 auto cfh =
3731 column_families_[rand_column_families[thread->rand.Next() %
3732 rand_column_families.size()]];
3733 std::string from_db;
3734 Status s = db_->Get(readoptions, cfh, key, &from_db);
3735 if (s.ok()) {
3736 thread->stats.AddGets(1, 1);
3737 } else if (s.IsNotFound()) {
3738 thread->stats.AddGets(1, 0);
3739 } else {
3740 thread->stats.AddErrors(1);
3741 }
3742 return s;
3743 }
3744
3745 virtual Status TestPrefixScan(ThreadState* thread,
3746 const ReadOptions& readoptions,
3747 const std::vector<int>& rand_column_families,
3748 const std::vector<int64_t>& rand_keys) {
3749 std::string key_str = Key(rand_keys[0]);
3750 Slice key = key_str;
3751 Slice prefix = Slice(key.data(), FLAGS_prefix_size);
3752
3753 std::string upper_bound;
3754 Slice ub_slice;
3755 ReadOptions ro_copy = readoptions;
3756 if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) {
3757 ub_slice = Slice(upper_bound);
3758 ro_copy.iterate_upper_bound = &ub_slice;
3759 }
3760 auto cfh =
3761 column_families_[rand_column_families[thread->rand.Next() %
3762 rand_column_families.size()]];
3763 Iterator* iter = db_->NewIterator(ro_copy, cfh);
3764 long count = 0;
3765 for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
3766 iter->Next()) {
3767 ++count;
3768 }
3769 assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8)));
3770 Status s = iter->status();
3771 if (s.ok()) {
3772 thread->stats.AddPrefixes(1, count);
3773 } else {
3774 thread->stats.AddErrors(1);
3775 }
3776 delete iter;
3777 return s;
3778 }
3779
3780#ifdef ROCKSDB_LITE
3781 virtual Status TestCheckpoint(
3782 ThreadState* /* thread */,
3783 const std::vector<int>& /* rand_column_families */,
3784 const std::vector<int64_t>& /* rand_keys */) {
3785 assert(false);
3786 fprintf(stderr,
3787 "RocksDB lite does not support "
3788 "TestCheckpoint\n");
3789 std::terminate();
3790 }
3791#else
3792 virtual Status TestCheckpoint(
3793 ThreadState* thread, const std::vector<int>& /* rand_column_families */,
3794 const std::vector<int64_t>& /* rand_keys */) {
3795 std::string checkpoint_dir =
3796 FLAGS_db + "/.checkpoint" + ToString(thread->tid);
3797 DestroyDB(checkpoint_dir, Options());
3798 Checkpoint* checkpoint = nullptr;
3799 Status s = Checkpoint::Create(db_, &checkpoint);
3800 if (s.ok()) {
3801 s = checkpoint->CreateCheckpoint(checkpoint_dir);
3802 }
3803 std::vector<ColumnFamilyHandle*> cf_handles;
3804 DB* checkpoint_db = nullptr;
3805 if (s.ok()) {
3806 delete checkpoint;
3807 checkpoint = nullptr;
3808 Options options(options_);
3809 options.listeners.clear();
3810 std::vector<ColumnFamilyDescriptor> cf_descs;
3811 // TODO(ajkr): `column_family_names_` is not safe to access here when
3812 // `clear_column_family_one_in != 0`. But we can't easily switch to
3813 // `ListColumnFamilies` to get names because it won't necessarily give
3814 // the same order as `column_family_names_`.
3815 if (FLAGS_clear_column_family_one_in == 0) {
3816 for (const auto& name : column_family_names_) {
3817 cf_descs.emplace_back(name, ColumnFamilyOptions(options));
3818 }
3819 s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
3820 &cf_handles, &checkpoint_db);
3821 }
3822 }
3823 if (checkpoint_db != nullptr) {
3824 for (auto cfh : cf_handles) {
3825 delete cfh;
3826 }
3827 cf_handles.clear();
3828 delete checkpoint_db;
3829 checkpoint_db = nullptr;
3830 }
3831 DestroyDB(checkpoint_dir, Options());
3832 if (!s.ok()) {
3833 fprintf(stderr, "A checkpoint operation failed with: %s\n",
3834 s.ToString().c_str());
3835 }
3836 return s;
3837 }
3838#endif // !ROCKSDB_LITE
3839
3840 virtual void VerifyDb(ThreadState* thread) const {
3841 ReadOptions options(FLAGS_verify_checksum, true);
3842 // We must set total_order_seek to true because we are doing a SeekToFirst
3843 // on a column family whose memtables may support (by default) prefix-based
3844 // iterator. In this case, NewIterator with options.total_order_seek being
3845 // false returns a prefix-based iterator. Calling SeekToFirst using this
3846 // iterator causes the iterator to become invalid. That means we cannot
3847 // iterate the memtable using this iterator any more, although the memtable
3848 // contains the most up-to-date key-values.
3849 options.total_order_seek = true;
3850 assert(thread != nullptr);
3851 auto shared = thread->shared;
3852 std::vector<std::unique_ptr<Iterator> > iters(column_families_.size());
3853 for (size_t i = 0; i != column_families_.size(); ++i) {
3854 iters[i].reset(db_->NewIterator(options, column_families_[i]));
3855 }
3856 for (auto& iter : iters) {
3857 iter->SeekToFirst();
3858 }
3859 size_t num = column_families_.size();
3860 assert(num == iters.size());
3861 std::vector<Status> statuses(num, Status::OK());
3862 do {
3863 size_t valid_cnt = 0;
3864 size_t idx = 0;
3865 for (auto& iter : iters) {
3866 if (iter->Valid()) {
3867 ++valid_cnt;
3868 } else {
3869 statuses[idx] = iter->status();
3870 }
3871 ++idx;
3872 }
3873 if (valid_cnt == 0) {
3874 Status status;
3875 for (size_t i = 0; i != num; ++i) {
3876 const auto& s = statuses[i];
3877 if (!s.ok()) {
3878 status = s;
3879 fprintf(stderr, "Iterator on cf %s has error: %s\n",
3880 column_families_[i]->GetName().c_str(),
3881 s.ToString().c_str());
3882 shared->SetVerificationFailure();
3883 }
3884 }
3885 if (status.ok()) {
3886 fprintf(stdout, "Finished scanning all column families.\n");
3887 }
3888 break;
3889 } else if (valid_cnt != iters.size()) {
3890 for (size_t i = 0; i != num; ++i) {
3891 if (!iters[i]->Valid()) {
3892 if (statuses[i].ok()) {
3893 fprintf(stderr, "Finished scanning cf %s\n",
3894 column_families_[i]->GetName().c_str());
3895 } else {
3896 fprintf(stderr, "Iterator on cf %s has error: %s\n",
3897 column_families_[i]->GetName().c_str(),
3898 statuses[i].ToString().c_str());
3899 }
3900 } else {
3901 fprintf(stderr, "cf %s has remaining data to scan\n",
3902 column_families_[i]->GetName().c_str());
3903 }
3904 }
3905 shared->SetVerificationFailure();
3906 break;
3907 }
3908 // If the program reaches here, then all column families' iterators are
3909 // still valid.
3910 Slice key;
3911 Slice value;
3912 for (size_t i = 0; i != num; ++i) {
3913 if (i == 0) {
3914 key = iters[i]->key();
3915 value = iters[i]->value();
3916 } else {
3917 if (key.compare(iters[i]->key()) != 0) {
3918 fprintf(stderr, "Verification failed\n");
3919 fprintf(stderr, "cf%s: %s => %s\n",
3920 column_families_[0]->GetName().c_str(),
3921 key.ToString(true /* hex */).c_str(),
3922 value.ToString(/* hex */).c_str());
3923 fprintf(stderr, "cf%s: %s => %s\n",
3924 column_families_[i]->GetName().c_str(),
3925 iters[i]->key().ToString(true /* hex */).c_str(),
3926 iters[i]->value().ToString(true /* hex */).c_str());
3927 shared->SetVerificationFailure();
3928 }
3929 }
3930 }
3931 for (auto& iter : iters) {
3932 iter->Next();
3933 }
3934 } while (true);
3935 }
3936
3937 virtual std::vector<int> GenerateColumnFamilies(
3938 const int /* num_column_families */, int /* rand_column_family */) const {
3939 std::vector<int> ret;
3940 int num = static_cast<int>(column_families_.size());
3941 int k = 0;
3942 std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
3943 return ret;
3944 }
3945
3946 private:
3947 std::atomic<int64_t> batch_id_;
3948};
3949
11fdf7f2
TL
3950} // namespace rocksdb
3951
3952int main(int argc, char** argv) {
3953 SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
3954 " [OPTIONS]...");
3955 ParseCommandLineFlags(&argc, &argv, true);
3956
3957 if (FLAGS_statistics) {
3958 dbstats = rocksdb::CreateDBStatistics();
3959 }
3960 FLAGS_compression_type_e =
3961 StringToCompressionType(FLAGS_compression_type.c_str());
3962 FLAGS_checksum_type_e = StringToChecksumType(FLAGS_checksum_type.c_str());
3963 if (!FLAGS_hdfs.empty()) {
3964 FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs);
3965 }
3966 FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
3967
3968 // The number of background threads should be at least as much the
3969 // max number of concurrent compactions.
3970 FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
3971 FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
3972 rocksdb::Env::Priority::BOTTOM);
3973 if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size <= 0) {
3974 fprintf(stderr,
3975 "Error: prefixpercent is non-zero while prefix_size is "
3976 "not positive!\n");
3977 exit(1);
3978 }
3979 if (FLAGS_test_batches_snapshots && FLAGS_prefix_size <= 0) {
3980 fprintf(stderr,
3981 "Error: please specify prefix_size for "
3982 "test_batches_snapshots test!\n");
3983 exit(1);
3984 }
3985 if (FLAGS_memtable_prefix_bloom_size_ratio > 0.0 && FLAGS_prefix_size <= 0) {
3986 fprintf(stderr,
3987 "Error: please specify positive prefix_size in order to use "
3988 "memtable_prefix_bloom_size_ratio\n");
3989 exit(1);
7c673cae
FG
3990 }
3991 if ((FLAGS_readpercent + FLAGS_prefixpercent +
3992 FLAGS_writepercent + FLAGS_delpercent + FLAGS_delrangepercent +
3993 FLAGS_iterpercent) != 100) {
3994 fprintf(stderr,
3995 "Error: Read+Prefix+Write+Delete+DeleteRange+Iterate percents != "
3996 "100!\n");
3997 exit(1);
3998 }
3999 if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) {
11fdf7f2
TL
4000 fprintf(stderr, "Error: Db cannot reopen safely with disable_wal set!\n");
4001 exit(1);
7c673cae
FG
4002 }
4003 if ((unsigned)FLAGS_reopen >= FLAGS_ops_per_thread) {
4004 fprintf(stderr,
4005 "Error: #DB-reopens should be < ops_per_thread\n"
4006 "Provided reopens = %d and ops_per_thread = %lu\n",
4007 FLAGS_reopen,
4008 (unsigned long)FLAGS_ops_per_thread);
4009 exit(1);
4010 }
4011 if (FLAGS_test_batches_snapshots && FLAGS_delrangepercent > 0) {
4012 fprintf(stderr, "Error: nonzero delrangepercent unsupported in "
4013 "test_batches_snapshots mode\n");
4014 exit(1);
4015 }
11fdf7f2
TL
4016 if (FLAGS_active_width > FLAGS_max_key) {
4017 fprintf(stderr, "Error: active_width can be at most max_key\n");
4018 exit(1);
4019 } else if (FLAGS_active_width == 0) {
4020 FLAGS_active_width = FLAGS_max_key;
4021 }
4022 if (FLAGS_value_size_mult * kRandomValueMaxFactor > kValueMaxLen) {
4023 fprintf(stderr, "Error: value_size_mult can be at most %d\n",
4024 kValueMaxLen / kRandomValueMaxFactor);
4025 exit(1);
4026 }
4027 if (FLAGS_use_merge && FLAGS_nooverwritepercent == 100) {
4028 fprintf(
4029 stderr,
4030 "Error: nooverwritepercent must not be 100 when using merge operands");
4031 exit(1);
4032 }
4033 if (FLAGS_ingest_external_file_one_in > 0 && FLAGS_nooverwritepercent > 0) {
4034 fprintf(stderr,
4035 "Error: nooverwritepercent must be 0 when using file ingestion\n");
4036 exit(1);
4037 }
494da23a
TL
4038 if (FLAGS_clear_column_family_one_in > 0 && FLAGS_backup_one_in > 0) {
4039 fprintf(stderr,
4040 "Error: clear_column_family_one_in must be 0 when using backup\n");
4041 exit(1);
4042 }
4043 if (FLAGS_test_atomic_flush) {
4044 FLAGS_atomic_flush = true;
4045 }
4046 if (FLAGS_read_only) {
4047 if (FLAGS_writepercent != 0 || FLAGS_delpercent != 0 ||
4048 FLAGS_delrangepercent != 0) {
4049 fprintf(stderr, "Error: updates are not supported in read only mode\n");
4050 exit(1);
4051 } else if (FLAGS_checkpoint_one_in > 0 &&
4052 FLAGS_clear_column_family_one_in > 0) {
4053 fprintf(stdout,
4054 "Warn: checkpoint won't be validated since column families may "
4055 "be dropped.\n");
4056 }
4057 }
7c673cae
FG
4058
4059 // Choose a location for the test database if none given with --db=<path>
4060 if (FLAGS_db.empty()) {
4061 std::string default_db_path;
4062 rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
4063 default_db_path += "/dbstress";
4064 FLAGS_db = default_db_path;
4065 }
4066
4067 rocksdb_kill_odds = FLAGS_kill_random_test;
4068 rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
4069
11fdf7f2 4070 std::unique_ptr<rocksdb::StressTest> stress;
494da23a
TL
4071 if (FLAGS_test_atomic_flush) {
4072 stress.reset(new rocksdb::AtomicFlushStressTest());
4073 } else if (FLAGS_test_batches_snapshots) {
11fdf7f2
TL
4074 stress.reset(new rocksdb::BatchedOpsStressTest());
4075 } else {
4076 stress.reset(new rocksdb::NonBatchedOpsStressTest());
4077 }
4078 if (stress->Run()) {
7c673cae
FG
4079 return 0;
4080 } else {
4081 return 1;
4082 }
4083}
4084
4085#endif // GFLAGS