]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/tools/db_stress.cc
45a7c9a0d0a0c815380e322cc239fb477c237574
[ceph.git] / ceph / src / rocksdb / tools / db_stress.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // The test uses an array to compare against values written to the database.
11 // Keys written to the array are in 1:1 correspondence to the actual values in
12 // the database according to the formula in the function GenerateValue.
13
14 // Space is reserved in the array from 0 to FLAGS_max_key and values are
15 // randomly written/deleted/read from those positions. During verification we
16 // compare all the positions in the array. To shorten/elongate the running
17 // time, you could change the settings: FLAGS_max_key, FLAGS_ops_per_thread,
18 // (sometimes also FLAGS_threads).
19 //
20 // NOTE that if FLAGS_test_batches_snapshots is set, the test will have
21 // different behavior. See comment of the flag for details.
22
23 #ifndef GFLAGS
24 #include <cstdio>
25 int main() {
26 fprintf(stderr, "Please install gflags to run rocksdb tools\n");
27 return 1;
28 }
29 #else
30
31 #ifndef __STDC_FORMAT_MACROS
32 #define __STDC_FORMAT_MACROS
33 #endif // __STDC_FORMAT_MACROS
34
35 #include <fcntl.h>
36 #include <inttypes.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <sys/types.h>
40 #include <algorithm>
41 #include <chrono>
42 #include <exception>
43 #include <queue>
44 #include <thread>
45
46 #include "db/db_impl.h"
47 #include "db/version_set.h"
48 #include "hdfs/env_hdfs.h"
49 #include "monitoring/histogram.h"
50 #include "options/options_helper.h"
51 #include "port/port.h"
52 #include "rocksdb/cache.h"
53 #include "rocksdb/env.h"
54 #include "rocksdb/slice.h"
55 #include "rocksdb/slice_transform.h"
56 #include "rocksdb/statistics.h"
57 #include "rocksdb/utilities/backupable_db.h"
58 #include "rocksdb/utilities/checkpoint.h"
59 #include "rocksdb/utilities/db_ttl.h"
60 #include "rocksdb/utilities/options_util.h"
61 #include "rocksdb/utilities/transaction.h"
62 #include "rocksdb/utilities/transaction_db.h"
63 #include "rocksdb/write_batch.h"
64 #include "util/coding.h"
65 #include "util/compression.h"
66 #include "util/crc32c.h"
67 #include "util/gflags_compat.h"
68 #include "util/logging.h"
69 #include "util/mutexlock.h"
70 #include "util/random.h"
71 #include "util/string_util.h"
72 // SyncPoint is not supported in Released Windows Mode.
73 #if !(defined NDEBUG) || !defined(OS_WIN)
74 #include "util/sync_point.h"
75 #endif // !(defined NDEBUG) || !defined(OS_WIN)
76 #include "util/testutil.h"
77
78 #include "utilities/merge_operators.h"
79
80 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
81 using GFLAGS_NAMESPACE::RegisterFlagValidator;
82 using GFLAGS_NAMESPACE::SetUsageMessage;
83
84 static const long KB = 1024;
85 static const int kRandomValueMaxFactor = 3;
86 static const int kValueMaxLen = 100;
87
88 static bool ValidateUint32Range(const char* flagname, uint64_t value) {
89 if (value > std::numeric_limits<uint32_t>::max()) {
90 fprintf(stderr,
91 "Invalid value for --%s: %lu, overflow\n",
92 flagname,
93 (unsigned long)value);
94 return false;
95 }
96 return true;
97 }
98
99 DEFINE_uint64(seed, 2341234, "Seed for PRNG");
100 static const bool FLAGS_seed_dummy __attribute__((__unused__)) =
101 RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range);
102
103 DEFINE_int64(max_key, 1 * KB* KB,
104 "Max number of key/values to place in database");
105
106 DEFINE_int32(column_families, 10, "Number of column families");
107
108 DEFINE_string(
109 options_file, "",
110 "The path to a RocksDB options file. If specified, then db_stress will "
111 "run with the RocksDB options in the default column family of the "
112 "specified options file. Note that, when an options file is provided, "
113 "db_stress will ignore the flag values for all options that may be passed "
114 "via options file.");
115
116 DEFINE_int64(
117 active_width, 0,
118 "Number of keys in active span of the key-range at any given time. The "
119 "span begins with its left endpoint at key 0, gradually moves rightwards, "
120 "and ends with its right endpoint at max_key. If set to 0, active_width "
121 "will be sanitized to be equal to max_key.");
122
123 // TODO(noetzli) Add support for single deletes
124 DEFINE_bool(test_batches_snapshots, false,
125 "If set, the test uses MultiGet(), MultiPut() and MultiDelete()"
126 " which read/write/delete multiple keys in a batch. In this mode,"
127 " we do not verify db content by comparing the content with the "
128 "pre-allocated array. Instead, we do partial verification inside"
129 " MultiGet() by checking various values in a batch. Benefit of"
130 " this mode:\n"
131 "\t(a) No need to acquire mutexes during writes (less cache "
132 "flushes in multi-core leading to speed up)\n"
133 "\t(b) No long validation at the end (more speed up)\n"
134 "\t(c) Test snapshot and atomicity of batch writes");
135
136 DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
137
138 DEFINE_int32(ttl, -1,
139 "Opens the db with this ttl value if this is not -1. "
140 "Carefully specify a large value such that verifications on "
141 "deleted values don't fail");
142
143 DEFINE_int32(value_size_mult, 8,
144 "Size of value will be this number times rand_int(1,3) bytes");
145
146 DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
147
148 DEFINE_bool(enable_pipelined_write, false, "Pipeline WAL/memtable writes");
149
150 DEFINE_bool(verify_before_write, false, "Verify before write");
151
152 DEFINE_bool(histogram, false, "Print histogram of operation timings");
153
154 DEFINE_bool(destroy_db_initially, true,
155 "Destroys the database dir before start if this is true");
156
157 DEFINE_bool(verbose, false, "Verbose");
158
159 DEFINE_bool(progress_reports, true,
160 "If true, db_stress will report number of finished operations");
161
162 DEFINE_uint64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
163 "Number of bytes to buffer in all memtables before compacting");
164
165 DEFINE_int32(write_buffer_size,
166 static_cast<int32_t>(rocksdb::Options().write_buffer_size),
167 "Number of bytes to buffer in memtable before compacting");
168
169 DEFINE_int32(max_write_buffer_number,
170 rocksdb::Options().max_write_buffer_number,
171 "The number of in-memory memtables. "
172 "Each memtable is of size FLAGS_write_buffer_size.");
173
174 DEFINE_int32(min_write_buffer_number_to_merge,
175 rocksdb::Options().min_write_buffer_number_to_merge,
176 "The minimum number of write buffers that will be merged together "
177 "before writing to storage. This is cheap because it is an "
178 "in-memory merge. If this feature is not enabled, then all these "
179 "write buffers are flushed to L0 as separate files and this "
180 "increases read amplification because a get request has to check "
181 "in all of these files. Also, an in-memory merge may result in "
182 "writing less data to storage if there are duplicate records in"
183 " each of these individual write buffers.");
184
185 DEFINE_int32(max_write_buffer_number_to_maintain,
186 rocksdb::Options().max_write_buffer_number_to_maintain,
187 "The total maximum number of write buffers to maintain in memory "
188 "including copies of buffers that have already been flushed. "
189 "Unlike max_write_buffer_number, this parameter does not affect "
190 "flushing. This controls the minimum amount of write history "
191 "that will be available in memory for conflict checking when "
192 "Transactions are used. If this value is too low, some "
193 "transactions may fail at commit time due to not being able to "
194 "determine whether there were any write conflicts. Setting this "
195 "value to 0 will cause write buffers to be freed immediately "
196 "after they are flushed. If this value is set to -1, "
197 "'max_write_buffer_number' will be used.");
198
199 DEFINE_double(memtable_prefix_bloom_size_ratio,
200 rocksdb::Options().memtable_prefix_bloom_size_ratio,
201 "creates prefix blooms for memtables, each with size "
202 "`write_buffer_size * memtable_prefix_bloom_size_ratio`.");
203
204 DEFINE_int32(open_files, rocksdb::Options().max_open_files,
205 "Maximum number of files to keep open at the same time "
206 "(use default if == 0)");
207
208 DEFINE_int64(compressed_cache_size, -1,
209 "Number of bytes to use as a cache of compressed data."
210 " Negative means use default settings.");
211
212 DEFINE_int32(compaction_style, rocksdb::Options().compaction_style, "");
213
214 DEFINE_int32(level0_file_num_compaction_trigger,
215 rocksdb::Options().level0_file_num_compaction_trigger,
216 "Level0 compaction start trigger");
217
218 DEFINE_int32(level0_slowdown_writes_trigger,
219 rocksdb::Options().level0_slowdown_writes_trigger,
220 "Number of files in level-0 that will slow down writes");
221
222 DEFINE_int32(level0_stop_writes_trigger,
223 rocksdb::Options().level0_stop_writes_trigger,
224 "Number of files in level-0 that will trigger put stop.");
225
226 DEFINE_int32(block_size,
227 static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
228 "Number of bytes in a block.");
229
230 DEFINE_int32(
231 format_version,
232 static_cast<int32_t>(rocksdb::BlockBasedTableOptions().format_version),
233 "Format version of SST files.");
234
235 DEFINE_int32(index_block_restart_interval,
236 rocksdb::BlockBasedTableOptions().index_block_restart_interval,
237 "Number of keys between restart points "
238 "for delta encoding of keys in index block.");
239
240 DEFINE_int32(max_background_compactions,
241 rocksdb::Options().max_background_compactions,
242 "The maximum number of concurrent background compactions "
243 "that can occur in parallel.");
244
245 DEFINE_int32(num_bottom_pri_threads, 0,
246 "The number of threads in the bottom-priority thread pool (used "
247 "by universal compaction only).");
248
249 DEFINE_int32(compaction_thread_pool_adjust_interval, 0,
250 "The interval (in milliseconds) to adjust compaction thread pool "
251 "size. Don't change it periodically if the value is 0.");
252
253 DEFINE_int32(compaction_thread_pool_variations, 2,
254 "Range of background thread pool size variations when adjusted "
255 "periodically.");
256
257 DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes,
258 "The maximum number of concurrent background flushes "
259 "that can occur in parallel.");
260
261 DEFINE_int32(universal_size_ratio, 0, "The ratio of file sizes that trigger"
262 " compaction in universal style");
263
264 DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files to "
265 "compact in universal style compaction");
266
267 DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
268 " in universal style compaction");
269
270 DEFINE_int32(universal_max_size_amplification_percent, 0,
271 "The max size amplification for universal style compaction");
272
273 DEFINE_int32(clear_column_family_one_in, 1000000,
274 "With a chance of 1/N, delete a column family and then recreate "
275 "it again. If N == 0, never drop/create column families. "
276 "When test_batches_snapshots is true, this flag has no effect");
277
278 DEFINE_int32(set_options_one_in, 0,
279 "With a chance of 1/N, change some random options");
280
281 DEFINE_int32(set_in_place_one_in, 0,
282 "With a chance of 1/N, toggle in place support option");
283
284 DEFINE_int64(cache_size, 2LL * KB * KB * KB,
285 "Number of bytes to use as a cache of uncompressed data.");
286
287 DEFINE_bool(use_clock_cache, false,
288 "Replace default LRU block cache with clock cache.");
289
290 DEFINE_uint64(subcompactions, 1,
291 "Maximum number of subcompactions to divide L0-L1 compactions "
292 "into.");
293
294 DEFINE_bool(allow_concurrent_memtable_write, false,
295 "Allow multi-writers to update mem tables in parallel.");
296
297 DEFINE_bool(enable_write_thread_adaptive_yield, true,
298 "Use a yielding spin loop for brief writer thread waits.");
299
300 static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) =
301 RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);
302
303 static bool ValidateInt32Positive(const char* flagname, int32_t value) {
304 if (value < 0) {
305 fprintf(stderr, "Invalid value for --%s: %d, must be >=0\n",
306 flagname, value);
307 return false;
308 }
309 return true;
310 }
311 DEFINE_int32(reopen, 10, "Number of times database reopens");
312 static const bool FLAGS_reopen_dummy __attribute__((__unused__)) =
313 RegisterFlagValidator(&FLAGS_reopen, &ValidateInt32Positive);
314
315 DEFINE_int32(bloom_bits, 10, "Bloom filter bits per key. "
316 "Negative means use default settings.");
317
318 DEFINE_bool(use_block_based_filter, false, "use block based filter"
319 "instead of full filter for block based table");
320
321 DEFINE_string(db, "", "Use the db with the following name.");
322
323 DEFINE_string(
324 expected_values_path, "",
325 "File where the array of expected uint32_t values will be stored. If "
326 "provided and non-empty, the DB state will be verified against these "
327 "values after recovery. --max_key and --column_family must be kept the "
328 "same across invocations of this program that use the same "
329 "--expected_values_path.");
330
331 DEFINE_bool(verify_checksum, false,
332 "Verify checksum for every block read from storage");
333
334 DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads,
335 "Allow reads to occur via mmap-ing files");
336
337 DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes,
338 "Allow writes to occur via mmap-ing files");
339
340 DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads,
341 "Use O_DIRECT for reading data");
342
343 DEFINE_bool(use_direct_io_for_flush_and_compaction,
344 rocksdb::Options().use_direct_io_for_flush_and_compaction,
345 "Use O_DIRECT for writing data");
346
347 // Database statistics
348 static std::shared_ptr<rocksdb::Statistics> dbstats;
349 DEFINE_bool(statistics, false, "Create database statistics");
350
351 DEFINE_bool(sync, false, "Sync all writes to disk");
352
353 DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
354
355 DEFINE_int32(kill_random_test, 0,
356 "If non-zero, kill at various points in source code with "
357 "probability 1/this");
358 static const bool FLAGS_kill_random_test_dummy __attribute__((__unused__)) =
359 RegisterFlagValidator(&FLAGS_kill_random_test, &ValidateInt32Positive);
360 extern int rocksdb_kill_odds;
361
362 DEFINE_string(kill_prefix_blacklist, "",
363 "If non-empty, kill points with prefix in the list given will be"
364 " skipped. Items are comma-separated.");
365 extern std::vector<std::string> rocksdb_kill_prefix_blacklist;
366
367 DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
368
369 DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base,
370 "Target level-1 file size for compaction");
371
372 DEFINE_int32(target_file_size_multiplier, 1,
373 "A multiplier to compute target level-N file size (N >= 2)");
374
375 DEFINE_uint64(max_bytes_for_level_base,
376 rocksdb::Options().max_bytes_for_level_base,
377 "Max bytes for level-1");
378
379 DEFINE_double(max_bytes_for_level_multiplier, 2,
380 "A multiplier to compute max bytes for level-N (N >= 2)");
381
382 DEFINE_int32(range_deletion_width, 10,
383 "The width of the range deletion intervals.");
384
385 DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
386
387 DEFINE_bool(rate_limit_bg_reads, false,
388 "Use options.rate_limiter on compaction reads");
389
390 DEFINE_bool(use_txn, false,
391 "Use TransactionDB. Currently the default write policy is "
392 "TxnDBWritePolicy::WRITE_PREPARED");
393
394 DEFINE_int32(backup_one_in, 0,
395 "If non-zero, then CreateNewBackup() will be called once for "
396 "every N operations on average. 0 indicates CreateNewBackup() "
397 "is disabled.");
398
399 DEFINE_int32(checkpoint_one_in, 0,
400 "If non-zero, then CreateCheckpoint() will be called once for "
401 "every N operations on average. 0 indicates CreateCheckpoint() "
402 "is disabled.");
403
404 DEFINE_int32(ingest_external_file_one_in, 0,
405 "If non-zero, then IngestExternalFile() will be called once for "
406 "every N operations on average. 0 indicates IngestExternalFile() "
407 "is disabled.");
408
409 DEFINE_int32(ingest_external_file_width, 1000,
410 "The width of the ingested external files.");
411
412 DEFINE_int32(compact_files_one_in, 0,
413 "If non-zero, then CompactFiles() will be called once for every N "
414 "operations on average. 0 indicates CompactFiles() is disabled.");
415
416 DEFINE_int32(compact_range_one_in, 0,
417 "If non-zero, then CompactRange() will be called once for every N "
418 "operations on average. 0 indicates CompactRange() is disabled.");
419
420 DEFINE_int32(flush_one_in, 0,
421 "If non-zero, then Flush() will be called once for every N ops "
422 "on average. 0 indicates calls to Flush() are disabled.");
423
424 DEFINE_int32(compact_range_width, 10000,
425 "The width of the ranges passed to CompactRange().");
426
427 DEFINE_int32(acquire_snapshot_one_in, 0,
428 "If non-zero, then acquires a snapshot once every N operations on "
429 "average.");
430
431 DEFINE_bool(compare_full_db_state_snapshot, false,
432 "If set we compare state of entire db (in one of the threads) with"
433 "each snapshot.");
434
435 DEFINE_uint64(snapshot_hold_ops, 0,
436 "If non-zero, then releases snapshots N operations after they're "
437 "acquired.");
438
439 static bool ValidateInt32Percent(const char* flagname, int32_t value) {
440 if (value < 0 || value>100) {
441 fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n",
442 flagname, value);
443 return false;
444 }
445 return true;
446 }
447
448 DEFINE_int32(readpercent, 10,
449 "Ratio of reads to total workload (expressed as a percentage)");
450 static const bool FLAGS_readpercent_dummy __attribute__((__unused__)) =
451 RegisterFlagValidator(&FLAGS_readpercent, &ValidateInt32Percent);
452
453 DEFINE_int32(prefixpercent, 20,
454 "Ratio of prefix iterators to total workload (expressed as a"
455 " percentage)");
456 static const bool FLAGS_prefixpercent_dummy __attribute__((__unused__)) =
457 RegisterFlagValidator(&FLAGS_prefixpercent, &ValidateInt32Percent);
458
459 DEFINE_int32(writepercent, 45,
460 "Ratio of writes to total workload (expressed as a percentage)");
461 static const bool FLAGS_writepercent_dummy __attribute__((__unused__)) =
462 RegisterFlagValidator(&FLAGS_writepercent, &ValidateInt32Percent);
463
464 DEFINE_int32(delpercent, 15,
465 "Ratio of deletes to total workload (expressed as a percentage)");
466 static const bool FLAGS_delpercent_dummy __attribute__((__unused__)) =
467 RegisterFlagValidator(&FLAGS_delpercent, &ValidateInt32Percent);
468
469 DEFINE_int32(delrangepercent, 0,
470 "Ratio of range deletions to total workload (expressed as a "
471 "percentage). Cannot be used with test_batches_snapshots");
472 static const bool FLAGS_delrangepercent_dummy __attribute__((__unused__)) =
473 RegisterFlagValidator(&FLAGS_delrangepercent, &ValidateInt32Percent);
474
475 DEFINE_int32(nooverwritepercent, 60,
476 "Ratio of keys without overwrite to total workload (expressed as "
477 " a percentage)");
478 static const bool FLAGS_nooverwritepercent_dummy __attribute__((__unused__)) =
479 RegisterFlagValidator(&FLAGS_nooverwritepercent, &ValidateInt32Percent);
480
481 DEFINE_int32(iterpercent, 10, "Ratio of iterations to total workload"
482 " (expressed as a percentage)");
483 static const bool FLAGS_iterpercent_dummy __attribute__((__unused__)) =
484 RegisterFlagValidator(&FLAGS_iterpercent, &ValidateInt32Percent);
485
486 DEFINE_uint64(num_iterations, 10, "Number of iterations per MultiIterate run");
487 static const bool FLAGS_num_iterations_dummy __attribute__((__unused__)) =
488 RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range);
489
490 namespace {
491 enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
492 assert(ctype);
493
494 if (!strcasecmp(ctype, "none"))
495 return rocksdb::kNoCompression;
496 else if (!strcasecmp(ctype, "snappy"))
497 return rocksdb::kSnappyCompression;
498 else if (!strcasecmp(ctype, "zlib"))
499 return rocksdb::kZlibCompression;
500 else if (!strcasecmp(ctype, "bzip2"))
501 return rocksdb::kBZip2Compression;
502 else if (!strcasecmp(ctype, "lz4"))
503 return rocksdb::kLZ4Compression;
504 else if (!strcasecmp(ctype, "lz4hc"))
505 return rocksdb::kLZ4HCCompression;
506 else if (!strcasecmp(ctype, "xpress"))
507 return rocksdb::kXpressCompression;
508 else if (!strcasecmp(ctype, "zstd"))
509 return rocksdb::kZSTD;
510
511 fprintf(stderr, "Cannot parse compression type '%s'\n", ctype);
512 return rocksdb::kSnappyCompression; //default value
513 }
514
515 enum rocksdb::ChecksumType StringToChecksumType(const char* ctype) {
516 assert(ctype);
517 auto iter = rocksdb::checksum_type_string_map.find(ctype);
518 if (iter != rocksdb::checksum_type_string_map.end()) {
519 return iter->second;
520 }
521 fprintf(stderr, "Cannot parse checksum type '%s'\n", ctype);
522 return rocksdb::kCRC32c;
523 }
524
525 std::string ChecksumTypeToString(rocksdb::ChecksumType ctype) {
526 auto iter = std::find_if(
527 rocksdb::checksum_type_string_map.begin(),
528 rocksdb::checksum_type_string_map.end(),
529 [&](const std::pair<std::string, rocksdb::ChecksumType>&
530 name_and_enum_val) { return name_and_enum_val.second == ctype; });
531 assert(iter != rocksdb::checksum_type_string_map.end());
532 return iter->first;
533 }
534
535 std::vector<std::string> SplitString(std::string src) {
536 std::vector<std::string> ret;
537 if (src.empty()) {
538 return ret;
539 }
540 size_t pos = 0;
541 size_t pos_comma;
542 while ((pos_comma = src.find(',', pos)) != std::string::npos) {
543 ret.push_back(src.substr(pos, pos_comma - pos));
544 pos = pos_comma + 1;
545 }
546 ret.push_back(src.substr(pos, src.length()));
547 return ret;
548 }
549 } // namespace
550
551 DEFINE_string(compression_type, "snappy",
552 "Algorithm to use to compress the database");
553 static enum rocksdb::CompressionType FLAGS_compression_type_e =
554 rocksdb::kSnappyCompression;
555
556 DEFINE_int32(compression_max_dict_bytes, 0,
557 "Maximum size of dictionary used to prime the compression "
558 "library.");
559
560 DEFINE_int32(compression_zstd_max_train_bytes, 0,
561 "Maximum size of training data passed to zstd's dictionary "
562 "trainer.");
563
564 DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks");
565 static enum rocksdb::ChecksumType FLAGS_checksum_type_e = rocksdb::kCRC32c;
566
567 DEFINE_string(hdfs, "", "Name of hdfs environment");
568 // posix or hdfs environment
569 static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
570
571 DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread.");
572 static const bool FLAGS_ops_per_thread_dummy __attribute__((__unused__)) =
573 RegisterFlagValidator(&FLAGS_ops_per_thread, &ValidateUint32Range);
574
575 DEFINE_uint64(log2_keys_per_lock, 2, "Log2 of number of keys per lock");
576 static const bool FLAGS_log2_keys_per_lock_dummy __attribute__((__unused__)) =
577 RegisterFlagValidator(&FLAGS_log2_keys_per_lock, &ValidateUint32Range);
578
579 DEFINE_uint64(max_manifest_file_size, 16384, "Maximum size of a MANIFEST file");
580
581 DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable");
582
583 enum RepFactory {
584 kSkipList,
585 kHashSkipList,
586 kVectorRep
587 };
588
589 namespace {
590 enum RepFactory StringToRepFactory(const char* ctype) {
591 assert(ctype);
592
593 if (!strcasecmp(ctype, "skip_list"))
594 return kSkipList;
595 else if (!strcasecmp(ctype, "prefix_hash"))
596 return kHashSkipList;
597 else if (!strcasecmp(ctype, "vector"))
598 return kVectorRep;
599
600 fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
601 return kSkipList;
602 }
603
604 #ifdef _MSC_VER
605 #pragma warning(push)
606 // truncation of constant value on static_cast
607 #pragma warning(disable : 4309)
608 #endif
609 bool GetNextPrefix(const rocksdb::Slice& src, std::string* v) {
610 std::string ret = src.ToString();
611 for (int i = static_cast<int>(ret.size()) - 1; i >= 0; i--) {
612 if (ret[i] != static_cast<char>(255)) {
613 ret[i] = ret[i] + 1;
614 break;
615 } else if (i != 0) {
616 ret[i] = 0;
617 } else {
618 // all FF. No next prefix
619 return false;
620 }
621 }
622 *v = ret;
623 return true;
624 }
625 #ifdef _MSC_VER
626 #pragma warning(pop)
627 #endif
628 } // namespace
629
630 static enum RepFactory FLAGS_rep_factory;
631 DEFINE_string(memtablerep, "prefix_hash", "");
632
633 static bool ValidatePrefixSize(const char* flagname, int32_t value) {
634 if (value < 0 || value > 8) {
635 fprintf(stderr, "Invalid value for --%s: %d. 0 <= PrefixSize <= 8\n",
636 flagname, value);
637 return false;
638 }
639 return true;
640 }
641 DEFINE_int32(prefix_size, 7, "Control the prefix size for HashSkipListRep");
642 static const bool FLAGS_prefix_size_dummy __attribute__((__unused__)) =
643 RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
644
645 DEFINE_bool(use_merge, false, "On true, replaces all writes with a Merge "
646 "that behaves like a Put");
647
648 DEFINE_bool(use_full_merge_v1, false,
649 "On true, use a merge operator that implement the deprecated "
650 "version of FullMerge");
651
652 namespace rocksdb {
653
654 // convert long to a big-endian slice key
655 static std::string Key(int64_t val) {
656 std::string little_endian_key;
657 std::string big_endian_key;
658 PutFixed64(&little_endian_key, val);
659 assert(little_endian_key.size() == sizeof(val));
660 big_endian_key.resize(sizeof(val));
661 for (size_t i = 0 ; i < sizeof(val); ++i) {
662 big_endian_key[i] = little_endian_key[sizeof(val) - 1 - i];
663 }
664 return big_endian_key;
665 }
666
667 static bool GetIntVal(std::string big_endian_key, uint64_t *key_p) {
668 unsigned int size_key = sizeof(*key_p);
669 assert(big_endian_key.size() == size_key);
670 std::string little_endian_key;
671 little_endian_key.resize(size_key);
672 for (size_t i = 0 ; i < size_key; ++i) {
673 little_endian_key[i] = big_endian_key[size_key - 1 - i];
674 }
675 Slice little_endian_slice = Slice(little_endian_key);
676 return GetFixed64(&little_endian_slice, key_p);
677 }
678
679 static std::string StringToHex(const std::string& str) {
680 std::string result = "0x";
681 result.append(Slice(str).ToString(true));
682 return result;
683 }
684
685
686 class StressTest;
687 namespace {
688
689 class Stats {
690 private:
691 uint64_t start_;
692 uint64_t finish_;
693 double seconds_;
694 long done_;
695 long gets_;
696 long prefixes_;
697 long writes_;
698 long deletes_;
699 size_t single_deletes_;
700 long iterator_size_sums_;
701 long founds_;
702 long iterations_;
703 long range_deletions_;
704 long covered_by_range_deletions_;
705 long errors_;
706 long num_compact_files_succeed_;
707 long num_compact_files_failed_;
708 int next_report_;
709 size_t bytes_;
710 uint64_t last_op_finish_;
711 HistogramImpl hist_;
712
713 public:
714 Stats() { }
715
716 void Start() {
717 next_report_ = 100;
718 hist_.Clear();
719 done_ = 0;
720 gets_ = 0;
721 prefixes_ = 0;
722 writes_ = 0;
723 deletes_ = 0;
724 single_deletes_ = 0;
725 iterator_size_sums_ = 0;
726 founds_ = 0;
727 iterations_ = 0;
728 range_deletions_ = 0;
729 covered_by_range_deletions_ = 0;
730 errors_ = 0;
731 bytes_ = 0;
732 seconds_ = 0;
733 num_compact_files_succeed_ = 0;
734 num_compact_files_failed_ = 0;
735 start_ = FLAGS_env->NowMicros();
736 last_op_finish_ = start_;
737 finish_ = start_;
738 }
739
740 void Merge(const Stats& other) {
741 hist_.Merge(other.hist_);
742 done_ += other.done_;
743 gets_ += other.gets_;
744 prefixes_ += other.prefixes_;
745 writes_ += other.writes_;
746 deletes_ += other.deletes_;
747 single_deletes_ += other.single_deletes_;
748 iterator_size_sums_ += other.iterator_size_sums_;
749 founds_ += other.founds_;
750 iterations_ += other.iterations_;
751 range_deletions_ += other.range_deletions_;
752 covered_by_range_deletions_ = other.covered_by_range_deletions_;
753 errors_ += other.errors_;
754 bytes_ += other.bytes_;
755 seconds_ += other.seconds_;
756 num_compact_files_succeed_ += other.num_compact_files_succeed_;
757 num_compact_files_failed_ += other.num_compact_files_failed_;
758 if (other.start_ < start_) start_ = other.start_;
759 if (other.finish_ > finish_) finish_ = other.finish_;
760 }
761
762 void Stop() {
763 finish_ = FLAGS_env->NowMicros();
764 seconds_ = (finish_ - start_) * 1e-6;
765 }
766
767 void FinishedSingleOp() {
768 if (FLAGS_histogram) {
769 auto now = FLAGS_env->NowMicros();
770 auto micros = now - last_op_finish_;
771 hist_.Add(micros);
772 if (micros > 20000) {
773 fprintf(stdout, "long op: %" PRIu64 " micros%30s\r", micros, "");
774 }
775 last_op_finish_ = now;
776 }
777
778 done_++;
779 if (FLAGS_progress_reports) {
780 if (done_ >= next_report_) {
781 if (next_report_ < 1000) next_report_ += 100;
782 else if (next_report_ < 5000) next_report_ += 500;
783 else if (next_report_ < 10000) next_report_ += 1000;
784 else if (next_report_ < 50000) next_report_ += 5000;
785 else if (next_report_ < 100000) next_report_ += 10000;
786 else if (next_report_ < 500000) next_report_ += 50000;
787 else next_report_ += 100000;
788 fprintf(stdout, "... finished %ld ops%30s\r", done_, "");
789 }
790 }
791 }
792
793 void AddBytesForWrites(int nwrites, size_t nbytes) {
794 writes_ += nwrites;
795 bytes_ += nbytes;
796 }
797
798 void AddGets(int ngets, int nfounds) {
799 founds_ += nfounds;
800 gets_ += ngets;
801 }
802
803 void AddPrefixes(int nprefixes, int count) {
804 prefixes_ += nprefixes;
805 iterator_size_sums_ += count;
806 }
807
808 void AddIterations(int n) {
809 iterations_ += n;
810 }
811
812 void AddDeletes(int n) {
813 deletes_ += n;
814 }
815
816 void AddSingleDeletes(size_t n) { single_deletes_ += n; }
817
818 void AddRangeDeletions(int n) {
819 range_deletions_ += n;
820 }
821
822 void AddCoveredByRangeDeletions(int n) {
823 covered_by_range_deletions_ += n;
824 }
825
826 void AddErrors(int n) {
827 errors_ += n;
828 }
829
830 void AddNumCompactFilesSucceed(int n) { num_compact_files_succeed_ += n; }
831
832 void AddNumCompactFilesFailed(int n) { num_compact_files_failed_ += n; }
833
834 void Report(const char* name) {
835 std::string extra;
836 if (bytes_ < 1 || done_ < 1) {
837 fprintf(stderr, "No writes or ops?\n");
838 return;
839 }
840
841 double elapsed = (finish_ - start_) * 1e-6;
842 double bytes_mb = bytes_ / 1048576.0;
843 double rate = bytes_mb / elapsed;
844 double throughput = (double)done_/elapsed;
845
846 fprintf(stdout, "%-12s: ", name);
847 fprintf(stdout, "%.3f micros/op %ld ops/sec\n",
848 seconds_ * 1e6 / done_, (long)throughput);
849 fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n",
850 "", bytes_mb, rate, (100*writes_)/done_, done_);
851 fprintf(stdout, "%-12s: Wrote %ld times\n", "", writes_);
852 fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_);
853 fprintf(stdout, "%-12s: Single deleted %" ROCKSDB_PRIszt " times\n", "",
854 single_deletes_);
855 fprintf(stdout, "%-12s: %ld read and %ld found the key\n", "",
856 gets_, founds_);
857 fprintf(stdout, "%-12s: Prefix scanned %ld times\n", "", prefixes_);
858 fprintf(stdout, "%-12s: Iterator size sum is %ld\n", "",
859 iterator_size_sums_);
860 fprintf(stdout, "%-12s: Iterated %ld times\n", "", iterations_);
861 fprintf(stdout, "%-12s: Deleted %ld key-ranges\n", "", range_deletions_);
862 fprintf(stdout, "%-12s: Range deletions covered %ld keys\n", "",
863 covered_by_range_deletions_);
864
865 fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_);
866 fprintf(stdout, "%-12s: %ld CompactFiles() succeed\n", "",
867 num_compact_files_succeed_);
868 fprintf(stdout, "%-12s: %ld CompactFiles() did not succeed\n", "",
869 num_compact_files_failed_);
870
871 if (FLAGS_histogram) {
872 fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
873 }
874 fflush(stdout);
875 }
876 };
877
878 // State shared by all concurrent executions of the same benchmark.
879 class SharedState {
880 public:
881 // indicates a key may have any value (or not be present) as an operation on
882 // it is incomplete.
883 static const uint32_t UNKNOWN_SENTINEL;
884 // indicates a key should definitely be deleted
885 static const uint32_t DELETION_SENTINEL;
886
887 explicit SharedState(StressTest* stress_test)
888 : cv_(&mu_),
889 seed_(static_cast<uint32_t>(FLAGS_seed)),
890 max_key_(FLAGS_max_key),
891 log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
892 num_threads_(FLAGS_threads),
893 num_initialized_(0),
894 num_populated_(0),
895 vote_reopen_(0),
896 num_done_(0),
897 start_(false),
898 start_verify_(false),
899 should_stop_bg_thread_(false),
900 bg_thread_finished_(false),
901 stress_test_(stress_test),
902 verification_failure_(false),
903 no_overwrite_ids_(FLAGS_column_families),
904 values_(nullptr) {
905 // Pick random keys in each column family that will not experience
906 // overwrite
907
908 printf("Choosing random keys with no overwrite\n");
909 Random64 rnd(seed_);
910 // Start with the identity permutation. Subsequent iterations of
911 // for loop below will start with perm of previous for loop
912 int64_t *permutation = new int64_t[max_key_];
913 for (int64_t i = 0; i < max_key_; i++) {
914 permutation[i] = i;
915 }
916 // Now do the Knuth shuffle
917 int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
918 // Only need to figure out first num_no_overwrite_keys of permutation
919 no_overwrite_ids_.reserve(num_no_overwrite_keys);
920 for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
921 int64_t rand_index = i + rnd.Next() % (max_key_ - i);
922 // Swap i and rand_index;
923 int64_t temp = permutation[i];
924 permutation[i] = permutation[rand_index];
925 permutation[rand_index] = temp;
926 // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
927 // permutation
928 no_overwrite_ids_.insert(permutation[i]);
929 }
930 delete[] permutation;
931
932 size_t expected_values_size =
933 sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_;
934 bool values_init_needed = false;
935 Status status;
936 if (!FLAGS_expected_values_path.empty()) {
937 if (!std::atomic<uint32_t>{}.is_lock_free()) {
938 status = Status::InvalidArgument(
939 "Cannot use --expected_values_path on platforms without lock-free "
940 "std::atomic<uint32_t>");
941 }
942 if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
943 status = Status::InvalidArgument(
944 "Cannot use --expected_values_path on when "
945 "--clear_column_family_one_in is greater than zero.");
946 }
947 uint64_t size = 0;
948 if (status.ok()) {
949 status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size);
950 }
951 unique_ptr<WritableFile> wfile;
952 if (status.ok() && size == 0) {
953 const EnvOptions soptions;
954 status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile,
955 soptions);
956 }
957 if (status.ok() && size == 0) {
958 std::string buf(expected_values_size, '\0');
959 status = wfile->Append(buf);
960 values_init_needed = true;
961 }
962 if (status.ok()) {
963 status = FLAGS_env->NewMemoryMappedFileBuffer(
964 FLAGS_expected_values_path, &expected_mmap_buffer_);
965 }
966 if (status.ok()) {
967 assert(expected_mmap_buffer_->GetLen() == expected_values_size);
968 values_ =
969 static_cast<std::atomic<uint32_t>*>(expected_mmap_buffer_->GetBase());
970 assert(values_ != nullptr);
971 } else {
972 fprintf(stderr, "Failed opening shared file '%s' with error: %s\n",
973 FLAGS_expected_values_path.c_str(), status.ToString().c_str());
974 assert(values_ == nullptr);
975 }
976 }
977 if (values_ == nullptr) {
978 values_allocation_.reset(
979 new std::atomic<uint32_t>[FLAGS_column_families * max_key_]);
980 values_ = &values_allocation_[0];
981 values_init_needed = true;
982 }
983 assert(values_ != nullptr);
984 if (values_init_needed) {
985 for (int i = 0; i < FLAGS_column_families; ++i) {
986 for (int j = 0; j < max_key_; ++j) {
987 Delete(i, j, false /* pending */);
988 }
989 }
990 }
991
992 if (FLAGS_test_batches_snapshots) {
993 fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
994 return;
995 }
996
997 long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
998 if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
999 num_locks++;
1000 }
1001 fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families);
1002 key_locks_.resize(FLAGS_column_families);
1003
1004 for (int i = 0; i < FLAGS_column_families; ++i) {
1005 key_locks_[i].resize(num_locks);
1006 for (auto& ptr : key_locks_[i]) {
1007 ptr.reset(new port::Mutex);
1008 }
1009 }
1010 }
1011
1012 ~SharedState() {}
1013
1014 port::Mutex* GetMutex() {
1015 return &mu_;
1016 }
1017
1018 port::CondVar* GetCondVar() {
1019 return &cv_;
1020 }
1021
1022 StressTest* GetStressTest() const {
1023 return stress_test_;
1024 }
1025
1026 int64_t GetMaxKey() const {
1027 return max_key_;
1028 }
1029
1030 uint32_t GetNumThreads() const {
1031 return num_threads_;
1032 }
1033
1034 void IncInitialized() {
1035 num_initialized_++;
1036 }
1037
1038 void IncOperated() {
1039 num_populated_++;
1040 }
1041
1042 void IncDone() {
1043 num_done_++;
1044 }
1045
1046 void IncVotedReopen() {
1047 vote_reopen_ = (vote_reopen_ + 1) % num_threads_;
1048 }
1049
1050 bool AllInitialized() const {
1051 return num_initialized_ >= num_threads_;
1052 }
1053
1054 bool AllOperated() const {
1055 return num_populated_ >= num_threads_;
1056 }
1057
1058 bool AllDone() const {
1059 return num_done_ >= num_threads_;
1060 }
1061
1062 bool AllVotedReopen() {
1063 return (vote_reopen_ == 0);
1064 }
1065
1066 void SetStart() {
1067 start_ = true;
1068 }
1069
1070 void SetStartVerify() {
1071 start_verify_ = true;
1072 }
1073
1074 bool Started() const {
1075 return start_;
1076 }
1077
1078 bool VerifyStarted() const {
1079 return start_verify_;
1080 }
1081
1082 void SetVerificationFailure() { verification_failure_.store(true); }
1083
1084 bool HasVerificationFailedYet() { return verification_failure_.load(); }
1085
1086 port::Mutex* GetMutexForKey(int cf, int64_t key) {
1087 return key_locks_[cf][key >> log2_keys_per_lock_].get();
1088 }
1089
1090 void LockColumnFamily(int cf) {
1091 for (auto& mutex : key_locks_[cf]) {
1092 mutex->Lock();
1093 }
1094 }
1095
1096 void UnlockColumnFamily(int cf) {
1097 for (auto& mutex : key_locks_[cf]) {
1098 mutex->Unlock();
1099 }
1100 }
1101
1102 std::atomic<uint32_t>& Value(int cf, int64_t key) const {
1103 return values_[cf * max_key_ + key];
1104 }
1105
1106 void ClearColumnFamily(int cf) {
1107 std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */),
1108 DELETION_SENTINEL);
1109 }
1110
1111 // @param pending True if the update may have started but is not yet
1112 // guaranteed finished. This is useful for crash-recovery testing when the
1113 // process may crash before updating the expected values array.
1114 void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
1115 if (!pending) {
1116 // prevent expected-value update from reordering before Write
1117 std::atomic_thread_fence(std::memory_order_release);
1118 }
1119 Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base,
1120 std::memory_order_relaxed);
1121 if (pending) {
1122 // prevent Write from reordering before expected-value update
1123 std::atomic_thread_fence(std::memory_order_release);
1124 }
1125 }
1126
1127 uint32_t Get(int cf, int64_t key) const { return Value(cf, key); }
1128
1129 // @param pending See comment above Put()
1130 // Returns true if the key was not yet deleted.
1131 bool Delete(int cf, int64_t key, bool pending) {
1132 if (Value(cf, key) == DELETION_SENTINEL) {
1133 return false;
1134 }
1135 Put(cf, key, DELETION_SENTINEL, pending);
1136 return true;
1137 }
1138
1139 // @param pending See comment above Put()
1140 // Returns true if the key was not yet deleted.
1141 bool SingleDelete(int cf, int64_t key, bool pending) {
1142 return Delete(cf, key, pending);
1143 }
1144
1145 // @param pending See comment above Put()
1146 // Returns number of keys deleted by the call.
1147 int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
1148 int covered = 0;
1149 for (int64_t key = begin_key; key < end_key; ++key) {
1150 if (Delete(cf, key, pending)) {
1151 ++covered;
1152 }
1153 }
1154 return covered;
1155 }
1156
1157 bool AllowsOverwrite(int64_t key) {
1158 return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
1159 }
1160
1161 bool Exists(int cf, int64_t key) {
1162 // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
1163 // is disallowed can't be accidentally added a second time, in which case
1164 // SingleDelete wouldn't be able to properly delete the key. It does allow
1165 // the case where a SingleDelete might be added which covers nothing, but
1166 // that's not a correctness issue.
1167 uint32_t expected_value = Value(cf, key).load();
1168 return expected_value != DELETION_SENTINEL;
1169 }
1170
1171 uint32_t GetSeed() const { return seed_; }
1172
1173 void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
1174
1175 bool ShoudStopBgThread() { return should_stop_bg_thread_; }
1176
1177 void SetBgThreadFinish() { bg_thread_finished_ = true; }
1178
1179 bool BgThreadFinished() const { return bg_thread_finished_; }
1180
1181 bool ShouldVerifyAtBeginning() const {
1182 return expected_mmap_buffer_.get() != nullptr;
1183 }
1184
1185 private:
1186 port::Mutex mu_;
1187 port::CondVar cv_;
1188 const uint32_t seed_;
1189 const int64_t max_key_;
1190 const uint32_t log2_keys_per_lock_;
1191 const int num_threads_;
1192 long num_initialized_;
1193 long num_populated_;
1194 long vote_reopen_;
1195 long num_done_;
1196 bool start_;
1197 bool start_verify_;
1198 bool should_stop_bg_thread_;
1199 bool bg_thread_finished_;
1200 StressTest* stress_test_;
1201 std::atomic<bool> verification_failure_;
1202
1203 // Keys that should not be overwritten
1204 std::unordered_set<size_t> no_overwrite_ids_;
1205
1206 std::atomic<uint32_t>* values_;
1207 std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_;
1208 // Has to make it owned by a smart ptr as port::Mutex is not copyable
1209 // and storing it in the container may require copying depending on the impl.
1210 std::vector<std::vector<std::unique_ptr<port::Mutex> > > key_locks_;
1211 std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_;
1212 };
1213
1214 const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe;
1215 const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff;
1216
1217 // Per-thread state for concurrent executions of the same benchmark.
1218 struct ThreadState {
1219 uint32_t tid; // 0..n-1
1220 Random rand; // Has different seeds for different threads
1221 SharedState* shared;
1222 Stats stats;
1223 struct SnapshotState {
1224 const Snapshot* snapshot;
1225 // The cf from which we did a Get at this snapshot
1226 int cf_at;
1227 // The name of the cf at the time that we did a read
1228 std::string cf_at_name;
1229 // The key with which we did a Get at this snapshot
1230 std::string key;
1231 // The status of the Get
1232 Status status;
1233 // The value of the Get
1234 std::string value;
1235 // optional state of all keys in the db
1236 std::vector<bool> *key_vec;
1237 };
1238 std::queue<std::pair<uint64_t, SnapshotState> > snapshot_queue;
1239
1240 ThreadState(uint32_t index, SharedState* _shared)
1241 : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
1242 };
1243
1244 class DbStressListener : public EventListener {
1245 public:
1246 DbStressListener(const std::string& db_name,
1247 const std::vector<DbPath>& db_paths,
1248 const std::vector<ColumnFamilyDescriptor>& column_families)
1249 : db_name_(db_name),
1250 db_paths_(db_paths),
1251 column_families_(column_families),
1252 num_pending_file_creations_(0) {}
1253 virtual ~DbStressListener() {
1254 assert(num_pending_file_creations_ == 0);
1255 }
1256 #ifndef ROCKSDB_LITE
1257 virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
1258 assert(IsValidColumnFamilyName(info.cf_name));
1259 VerifyFilePath(info.file_path);
1260 // pretending doing some work here
1261 std::this_thread::sleep_for(
1262 std::chrono::microseconds(Random::GetTLSInstance()->Uniform(5000)));
1263 }
1264
1265 virtual void OnCompactionCompleted(DB* /*db*/,
1266 const CompactionJobInfo& ci) override {
1267 assert(IsValidColumnFamilyName(ci.cf_name));
1268 assert(ci.input_files.size() + ci.output_files.size() > 0U);
1269 for (const auto& file_path : ci.input_files) {
1270 VerifyFilePath(file_path);
1271 }
1272 for (const auto& file_path : ci.output_files) {
1273 VerifyFilePath(file_path);
1274 }
1275 // pretending doing some work here
1276 std::this_thread::sleep_for(
1277 std::chrono::microseconds(Random::GetTLSInstance()->Uniform(5000)));
1278 }
1279
1280 virtual void OnTableFileCreationStarted(
1281 const TableFileCreationBriefInfo& /*info*/) override {
1282 ++num_pending_file_creations_;
1283 }
1284 virtual void OnTableFileCreated(const TableFileCreationInfo& info) override {
1285 assert(info.db_name == db_name_);
1286 assert(IsValidColumnFamilyName(info.cf_name));
1287 if (info.file_size) {
1288 VerifyFilePath(info.file_path);
1289 }
1290 assert(info.job_id > 0 || FLAGS_compact_files_one_in > 0);
1291 if (info.status.ok() && info.file_size > 0) {
1292 assert(info.table_properties.data_size > 0);
1293 assert(info.table_properties.raw_key_size > 0);
1294 assert(info.table_properties.num_entries > 0);
1295 }
1296 --num_pending_file_creations_;
1297 }
1298
1299 protected:
1300 bool IsValidColumnFamilyName(const std::string& cf_name) const {
1301 if (cf_name == kDefaultColumnFamilyName) {
1302 return true;
1303 }
1304 // The column family names in the stress tests are numbers.
1305 for (size_t i = 0; i < cf_name.size(); ++i) {
1306 if (cf_name[i] < '0' || cf_name[i] > '9') {
1307 return false;
1308 }
1309 }
1310 return true;
1311 }
1312
1313 void VerifyFileDir(const std::string& file_dir) {
1314 #ifndef NDEBUG
1315 if (db_name_ == file_dir) {
1316 return;
1317 }
1318 for (const auto& db_path : db_paths_) {
1319 if (db_path.path == file_dir) {
1320 return;
1321 }
1322 }
1323 for (auto& cf : column_families_) {
1324 for (const auto& cf_path : cf.options.cf_paths) {
1325 if (cf_path.path == file_dir) {
1326 return;
1327 }
1328 }
1329 }
1330 assert(false);
1331 #else
1332 (void)file_dir;
1333 #endif // !NDEBUG
1334 }
1335
1336 void VerifyFileName(const std::string& file_name) {
1337 #ifndef NDEBUG
1338 uint64_t file_number;
1339 FileType file_type;
1340 bool result = ParseFileName(file_name, &file_number, &file_type);
1341 assert(result);
1342 assert(file_type == kTableFile);
1343 #else
1344 (void)file_name;
1345 #endif // !NDEBUG
1346 }
1347
1348 void VerifyFilePath(const std::string& file_path) {
1349 #ifndef NDEBUG
1350 size_t pos = file_path.find_last_of("/");
1351 if (pos == std::string::npos) {
1352 VerifyFileName(file_path);
1353 } else {
1354 if (pos > 0) {
1355 VerifyFileDir(file_path.substr(0, pos));
1356 }
1357 VerifyFileName(file_path.substr(pos));
1358 }
1359 #else
1360 (void)file_path;
1361 #endif // !NDEBUG
1362 }
1363 #endif // !ROCKSDB_LITE
1364
1365 private:
1366 std::string db_name_;
1367 std::vector<DbPath> db_paths_;
1368 std::vector<ColumnFamilyDescriptor> column_families_;
1369 std::atomic<int> num_pending_file_creations_;
1370 };
1371
1372 } // namespace
1373
1374 class StressTest {
1375 public:
1376 StressTest()
1377 : cache_(NewCache(FLAGS_cache_size)),
1378 compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
1379 filter_policy_(FLAGS_bloom_bits >= 0
1380 ? FLAGS_use_block_based_filter
1381 ? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
1382 : NewBloomFilterPolicy(FLAGS_bloom_bits, false)
1383 : nullptr),
1384 db_(nullptr),
1385 #ifndef ROCKSDB_LITE
1386 txn_db_(nullptr),
1387 #endif
1388 new_column_family_name_(1),
1389 num_times_reopened_(0) {
1390 if (FLAGS_destroy_db_initially) {
1391 std::vector<std::string> files;
1392 FLAGS_env->GetChildren(FLAGS_db, &files);
1393 for (unsigned int i = 0; i < files.size(); i++) {
1394 if (Slice(files[i]).starts_with("heap-")) {
1395 FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
1396 }
1397 }
1398 DestroyDB(FLAGS_db, Options());
1399 }
1400 }
1401
1402 virtual ~StressTest() {
1403 for (auto cf : column_families_) {
1404 delete cf;
1405 }
1406 column_families_.clear();
1407 delete db_;
1408 }
1409
1410 std::shared_ptr<Cache> NewCache(size_t capacity) {
1411 if (capacity <= 0) {
1412 return nullptr;
1413 }
1414 if (FLAGS_use_clock_cache) {
1415 auto cache = NewClockCache((size_t)capacity);
1416 if (!cache) {
1417 fprintf(stderr, "Clock cache not supported.");
1418 exit(1);
1419 }
1420 return cache;
1421 } else {
1422 return NewLRUCache((size_t)capacity);
1423 }
1424 }
1425
1426 bool BuildOptionsTable() {
1427 if (FLAGS_set_options_one_in <= 0) {
1428 return true;
1429 }
1430
1431 std::unordered_map<std::string, std::vector<std::string> > options_tbl = {
1432 {"write_buffer_size",
1433 {ToString(options_.write_buffer_size),
1434 ToString(options_.write_buffer_size * 2),
1435 ToString(options_.write_buffer_size * 4)}},
1436 {"max_write_buffer_number",
1437 {ToString(options_.max_write_buffer_number),
1438 ToString(options_.max_write_buffer_number * 2),
1439 ToString(options_.max_write_buffer_number * 4)}},
1440 {"arena_block_size",
1441 {
1442 ToString(options_.arena_block_size),
1443 ToString(options_.write_buffer_size / 4),
1444 ToString(options_.write_buffer_size / 8),
1445 }},
1446 {"memtable_huge_page_size", {"0", ToString(2 * 1024 * 1024)}},
1447 {"max_successive_merges", {"0", "2", "4"}},
1448 {"inplace_update_num_locks", {"100", "200", "300"}},
1449 // TODO(ljin): enable test for this option
1450 // {"disable_auto_compactions", {"100", "200", "300"}},
1451 {"soft_rate_limit", {"0", "0.5", "0.9"}},
1452 {"hard_rate_limit", {"0", "1.1", "2.0"}},
1453 {"level0_file_num_compaction_trigger",
1454 {
1455 ToString(options_.level0_file_num_compaction_trigger),
1456 ToString(options_.level0_file_num_compaction_trigger + 2),
1457 ToString(options_.level0_file_num_compaction_trigger + 4),
1458 }},
1459 {"level0_slowdown_writes_trigger",
1460 {
1461 ToString(options_.level0_slowdown_writes_trigger),
1462 ToString(options_.level0_slowdown_writes_trigger + 2),
1463 ToString(options_.level0_slowdown_writes_trigger + 4),
1464 }},
1465 {"level0_stop_writes_trigger",
1466 {
1467 ToString(options_.level0_stop_writes_trigger),
1468 ToString(options_.level0_stop_writes_trigger + 2),
1469 ToString(options_.level0_stop_writes_trigger + 4),
1470 }},
1471 {"max_compaction_bytes",
1472 {
1473 ToString(options_.target_file_size_base * 5),
1474 ToString(options_.target_file_size_base * 15),
1475 ToString(options_.target_file_size_base * 100),
1476 }},
1477 {"target_file_size_base",
1478 {
1479 ToString(options_.target_file_size_base),
1480 ToString(options_.target_file_size_base * 2),
1481 ToString(options_.target_file_size_base * 4),
1482 }},
1483 {"target_file_size_multiplier",
1484 {
1485 ToString(options_.target_file_size_multiplier), "1", "2",
1486 }},
1487 {"max_bytes_for_level_base",
1488 {
1489 ToString(options_.max_bytes_for_level_base / 2),
1490 ToString(options_.max_bytes_for_level_base),
1491 ToString(options_.max_bytes_for_level_base * 2),
1492 }},
1493 {"max_bytes_for_level_multiplier",
1494 {
1495 ToString(options_.max_bytes_for_level_multiplier), "1", "2",
1496 }},
1497 {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
1498 };
1499
1500 options_table_ = std::move(options_tbl);
1501
1502 for (const auto& iter : options_table_) {
1503 options_index_.push_back(iter.first);
1504 }
1505 return true;
1506 }
1507
1508 bool Run() {
1509 uint64_t now = FLAGS_env->NowMicros();
1510 fprintf(stdout, "%s Initializing db_stress\n",
1511 FLAGS_env->TimeToString(now / 1000000).c_str());
1512 PrintEnv();
1513 Open();
1514 BuildOptionsTable();
1515 SharedState shared(this);
1516 uint32_t n = shared.GetNumThreads();
1517
1518 now = FLAGS_env->NowMicros();
1519 fprintf(stdout, "%s Initializing worker threads\n",
1520 FLAGS_env->TimeToString(now / 1000000).c_str());
1521 std::vector<ThreadState*> threads(n);
1522 for (uint32_t i = 0; i < n; i++) {
1523 threads[i] = new ThreadState(i, &shared);
1524 FLAGS_env->StartThread(ThreadBody, threads[i]);
1525 }
1526 ThreadState bg_thread(0, &shared);
1527 if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
1528 FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread);
1529 }
1530
1531 // Each thread goes through the following states:
1532 // initializing -> wait for others to init -> read/populate/depopulate
1533 // wait for others to operate -> verify -> done
1534
1535 {
1536 MutexLock l(shared.GetMutex());
1537 while (!shared.AllInitialized()) {
1538 shared.GetCondVar()->Wait();
1539 }
1540 if (shared.ShouldVerifyAtBeginning()) {
1541 if (shared.HasVerificationFailedYet()) {
1542 printf("Crash-recovery verification failed :(\n");
1543 } else {
1544 printf("Crash-recovery verification passed :)\n");
1545 }
1546 }
1547
1548 now = FLAGS_env->NowMicros();
1549 fprintf(stdout, "%s Starting database operations\n",
1550 FLAGS_env->TimeToString(now/1000000).c_str());
1551
1552 shared.SetStart();
1553 shared.GetCondVar()->SignalAll();
1554 while (!shared.AllOperated()) {
1555 shared.GetCondVar()->Wait();
1556 }
1557
1558 now = FLAGS_env->NowMicros();
1559 if (FLAGS_test_batches_snapshots) {
1560 fprintf(stdout, "%s Limited verification already done during gets\n",
1561 FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
1562 } else {
1563 fprintf(stdout, "%s Starting verification\n",
1564 FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
1565 }
1566
1567 shared.SetStartVerify();
1568 shared.GetCondVar()->SignalAll();
1569 while (!shared.AllDone()) {
1570 shared.GetCondVar()->Wait();
1571 }
1572 }
1573
1574 for (unsigned int i = 1; i < n; i++) {
1575 threads[0]->stats.Merge(threads[i]->stats);
1576 }
1577 threads[0]->stats.Report("Stress Test");
1578
1579 for (unsigned int i = 0; i < n; i++) {
1580 delete threads[i];
1581 threads[i] = nullptr;
1582 }
1583 now = FLAGS_env->NowMicros();
1584 if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) {
1585 fprintf(stdout, "%s Verification successful\n",
1586 FLAGS_env->TimeToString(now/1000000).c_str());
1587 }
1588 PrintStatistics();
1589
1590 if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
1591 MutexLock l(shared.GetMutex());
1592 shared.SetShouldStopBgThread();
1593 while (!shared.BgThreadFinished()) {
1594 shared.GetCondVar()->Wait();
1595 }
1596 }
1597
1598 if (shared.HasVerificationFailedYet()) {
1599 printf("Verification failed :(\n");
1600 return false;
1601 }
1602 return true;
1603 }
1604
1605 protected:
1606 static void ThreadBody(void* v) {
1607 ThreadState* thread = reinterpret_cast<ThreadState*>(v);
1608 SharedState* shared = thread->shared;
1609
1610 if (shared->ShouldVerifyAtBeginning()) {
1611 thread->shared->GetStressTest()->VerifyDb(thread);
1612 }
1613 {
1614 MutexLock l(shared->GetMutex());
1615 shared->IncInitialized();
1616 if (shared->AllInitialized()) {
1617 shared->GetCondVar()->SignalAll();
1618 }
1619 while (!shared->Started()) {
1620 shared->GetCondVar()->Wait();
1621 }
1622 }
1623 thread->shared->GetStressTest()->OperateDb(thread);
1624
1625 {
1626 MutexLock l(shared->GetMutex());
1627 shared->IncOperated();
1628 if (shared->AllOperated()) {
1629 shared->GetCondVar()->SignalAll();
1630 }
1631 while (!shared->VerifyStarted()) {
1632 shared->GetCondVar()->Wait();
1633 }
1634 }
1635
1636 thread->shared->GetStressTest()->VerifyDb(thread);
1637
1638 {
1639 MutexLock l(shared->GetMutex());
1640 shared->IncDone();
1641 if (shared->AllDone()) {
1642 shared->GetCondVar()->SignalAll();
1643 }
1644 }
1645 }
1646
1647 static void PoolSizeChangeThread(void* v) {
1648 assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
1649 ThreadState* thread = reinterpret_cast<ThreadState*>(v);
1650 SharedState* shared = thread->shared;
1651
1652 while (true) {
1653 {
1654 MutexLock l(shared->GetMutex());
1655 if (shared->ShoudStopBgThread()) {
1656 shared->SetBgThreadFinish();
1657 shared->GetCondVar()->SignalAll();
1658 return;
1659 }
1660 }
1661
1662 auto thread_pool_size_base = FLAGS_max_background_compactions;
1663 auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations;
1664 int new_thread_pool_size =
1665 thread_pool_size_base - thread_pool_size_var +
1666 thread->rand.Next() % (thread_pool_size_var * 2 + 1);
1667 if (new_thread_pool_size < 1) {
1668 new_thread_pool_size = 1;
1669 }
1670 FLAGS_env->SetBackgroundThreads(new_thread_pool_size);
1671 // Sleep up to 3 seconds
1672 FLAGS_env->SleepForMicroseconds(
1673 thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
1674 1000 +
1675 1);
1676 }
1677 }
1678
1679 static void PrintKeyValue(int cf, uint64_t key, const char* value,
1680 size_t sz) {
1681 if (!FLAGS_verbose) {
1682 return;
1683 }
1684 std::string tmp;
1685 tmp.reserve(sz * 2 + 16);
1686 char buf[4];
1687 for (size_t i = 0; i < sz; i++) {
1688 snprintf(buf, 4, "%X", value[i]);
1689 tmp.append(buf);
1690 }
1691 fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n", cf,
1692 key, sz, tmp.c_str());
1693 }
1694
1695 static int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {
1696 const double completed_ratio =
1697 static_cast<double>(iteration) / FLAGS_ops_per_thread;
1698 const int64_t base_key = static_cast<int64_t>(
1699 completed_ratio * (FLAGS_max_key - FLAGS_active_width));
1700 return base_key + thread->rand.Next() % FLAGS_active_width;
1701 }
1702
1703 static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) {
1704 size_t value_sz =
1705 ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
1706 assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t));
1707 (void) max_sz;
1708 *((uint32_t*)v) = rand;
1709 for (size_t i=sizeof(uint32_t); i < value_sz; i++) {
1710 v[i] = (char)(rand ^ i);
1711 }
1712 v[value_sz] = '\0';
1713 return value_sz; // the size of the value set.
1714 }
1715
1716 Status AssertSame(DB* db, ColumnFamilyHandle* cf,
1717 ThreadState::SnapshotState& snap_state) {
1718 Status s;
1719 if (cf->GetName() != snap_state.cf_at_name) {
1720 return s;
1721 }
1722 ReadOptions ropt;
1723 ropt.snapshot = snap_state.snapshot;
1724 PinnableSlice exp_v(&snap_state.value);
1725 exp_v.PinSelf();
1726 PinnableSlice v;
1727 s = db->Get(ropt, cf, snap_state.key, &v);
1728 if (!s.ok() && !s.IsNotFound()) {
1729 return s;
1730 }
1731 if (snap_state.status != s) {
1732 return Status::Corruption(
1733 "The snapshot gave inconsistent results for key " +
1734 ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) +
1735 " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() +
1736 ") vs. (" + s.ToString() + ")");
1737 }
1738 if (s.ok()) {
1739 if (exp_v != v) {
1740 return Status::Corruption("The snapshot gave inconsistent values: (" +
1741 exp_v.ToString() + ") vs. (" + v.ToString() +
1742 ")");
1743 }
1744 }
1745 if (snap_state.key_vec != nullptr) {
1746 std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
1747 std::unique_ptr<std::vector<bool>> tmp_bitvec(new std::vector<bool>(FLAGS_max_key));
1748 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
1749 uint64_t key_val;
1750 if (GetIntVal(iterator->key().ToString(), &key_val)) {
1751 (*tmp_bitvec.get())[key_val] = true;
1752 }
1753 }
1754 if (!std::equal(snap_state.key_vec->begin(),
1755 snap_state.key_vec->end(),
1756 tmp_bitvec.get()->begin())) {
1757 return Status::Corruption("Found inconsistent keys at this snapshot");
1758 }
1759 }
1760 return Status::OK();
1761 }
1762
1763 Status SetOptions(ThreadState* thread) {
1764 assert(FLAGS_set_options_one_in > 0);
1765 std::unordered_map<std::string, std::string> opts;
1766 std::string name = options_index_[
1767 thread->rand.Next() % options_index_.size()];
1768 int value_idx = thread->rand.Next() % options_table_[name].size();
1769 if (name == "soft_rate_limit" || name == "hard_rate_limit") {
1770 opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx];
1771 opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx];
1772 } else if (name == "level0_file_num_compaction_trigger" ||
1773 name == "level0_slowdown_writes_trigger" ||
1774 name == "level0_stop_writes_trigger") {
1775 opts["level0_file_num_compaction_trigger"] =
1776 options_table_["level0_file_num_compaction_trigger"][value_idx];
1777 opts["level0_slowdown_writes_trigger"] =
1778 options_table_["level0_slowdown_writes_trigger"][value_idx];
1779 opts["level0_stop_writes_trigger"] =
1780 options_table_["level0_stop_writes_trigger"][value_idx];
1781 } else {
1782 opts[name] = options_table_[name][value_idx];
1783 }
1784
1785 int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
1786 auto cfh = column_families_[rand_cf_idx];
1787 return db_->SetOptions(cfh, opts);
1788 }
1789
1790 #ifndef ROCKSDB_LITE
1791 Status NewTxn(WriteOptions& write_opts, Transaction** txn) {
1792 if (!FLAGS_use_txn) {
1793 return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
1794 }
1795 static std::atomic<uint64_t> txn_id = {0};
1796 TransactionOptions txn_options;
1797 *txn = txn_db_->BeginTransaction(write_opts, txn_options);
1798 auto istr = std::to_string(txn_id.fetch_add(1));
1799 Status s = (*txn)->SetName("xid" + istr);
1800 return s;
1801 }
1802
1803 Status CommitTxn(Transaction* txn) {
1804 if (!FLAGS_use_txn) {
1805 return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
1806 }
1807 Status s = txn->Prepare();
1808 if (s.ok()) {
1809 s = txn->Commit();
1810 }
1811 delete txn;
1812 return s;
1813 }
1814 #endif
1815
1816 virtual void OperateDb(ThreadState* thread) {
1817 ReadOptions read_opts(FLAGS_verify_checksum, true);
1818 WriteOptions write_opts;
1819 auto shared = thread->shared;
1820 char value[100];
1821 std::string from_db;
1822 if (FLAGS_sync) {
1823 write_opts.sync = true;
1824 }
1825 write_opts.disableWAL = FLAGS_disable_wal;
1826 const int prefixBound = (int)FLAGS_readpercent + (int)FLAGS_prefixpercent;
1827 const int writeBound = prefixBound + (int)FLAGS_writepercent;
1828 const int delBound = writeBound + (int)FLAGS_delpercent;
1829 const int delRangeBound = delBound + (int)FLAGS_delrangepercent;
1830
1831 thread->stats.Start();
1832 for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
1833 if (thread->shared->HasVerificationFailedYet()) {
1834 break;
1835 }
1836 if (i != 0 && (i % (FLAGS_ops_per_thread / (FLAGS_reopen + 1))) == 0) {
1837 {
1838 thread->stats.FinishedSingleOp();
1839 MutexLock l(thread->shared->GetMutex());
1840 while (!thread->snapshot_queue.empty()) {
1841 db_->ReleaseSnapshot(
1842 thread->snapshot_queue.front().second.snapshot);
1843 delete thread->snapshot_queue.front().second.key_vec;
1844 thread->snapshot_queue.pop();
1845 }
1846 thread->shared->IncVotedReopen();
1847 if (thread->shared->AllVotedReopen()) {
1848 thread->shared->GetStressTest()->Reopen();
1849 thread->shared->GetCondVar()->SignalAll();
1850 }
1851 else {
1852 thread->shared->GetCondVar()->Wait();
1853 }
1854 // Commenting this out as we don't want to reset stats on each open.
1855 // thread->stats.Start();
1856 }
1857 }
1858
1859 // Change Options
1860 if (FLAGS_set_options_one_in > 0 &&
1861 thread->rand.OneIn(FLAGS_set_options_one_in)) {
1862 SetOptions(thread);
1863 }
1864
1865 if (FLAGS_set_in_place_one_in > 0 &&
1866 thread->rand.OneIn(FLAGS_set_in_place_one_in)) {
1867 options_.inplace_update_support ^= options_.inplace_update_support;
1868 }
1869
1870 MaybeClearOneColumnFamily(thread);
1871
1872 #ifndef ROCKSDB_LITE
1873 if (FLAGS_checkpoint_one_in > 0 &&
1874 thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
1875 std::string checkpoint_dir =
1876 FLAGS_db + "/.checkpoint" + ToString(thread->tid);
1877 DestroyDB(checkpoint_dir, Options());
1878 Checkpoint* checkpoint;
1879 Status s = Checkpoint::Create(db_, &checkpoint);
1880 if (s.ok()) {
1881 s = checkpoint->CreateCheckpoint(checkpoint_dir);
1882 }
1883 std::vector<std::string> files;
1884 if (s.ok()) {
1885 s = FLAGS_env->GetChildren(checkpoint_dir, &files);
1886 }
1887 DestroyDB(checkpoint_dir, Options());
1888 delete checkpoint;
1889 if (!s.ok()) {
1890 printf("A checkpoint operation failed with: %s\n",
1891 s.ToString().c_str());
1892 }
1893 }
1894
1895 if (FLAGS_backup_one_in > 0 &&
1896 thread->rand.Uniform(FLAGS_backup_one_in) == 0) {
1897 std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
1898 BackupableDBOptions backup_opts(backup_dir);
1899 BackupEngine* backup_engine = nullptr;
1900 Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
1901 if (s.ok()) {
1902 s = backup_engine->CreateNewBackup(db_);
1903 }
1904 if (s.ok()) {
1905 s = backup_engine->PurgeOldBackups(0 /* num_backups_to_keep */);
1906 }
1907 if (!s.ok()) {
1908 printf("A BackupEngine operation failed with: %s\n",
1909 s.ToString().c_str());
1910 }
1911 if (backup_engine != nullptr) {
1912 delete backup_engine;
1913 }
1914 }
1915
1916 if (FLAGS_compact_files_one_in > 0 &&
1917 thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) {
1918 auto* random_cf =
1919 column_families_[thread->rand.Next() % FLAGS_column_families];
1920 rocksdb::ColumnFamilyMetaData cf_meta_data;
1921 db_->GetColumnFamilyMetaData(random_cf, &cf_meta_data);
1922
1923 // Randomly compact up to three consecutive files from a level
1924 const int kMaxRetry = 3;
1925 for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
1926 size_t random_level = thread->rand.Uniform(
1927 static_cast<int>(cf_meta_data.levels.size()));
1928
1929 const auto& files = cf_meta_data.levels[random_level].files;
1930 if (files.size() > 0) {
1931 size_t random_file_index =
1932 thread->rand.Uniform(static_cast<int>(files.size()));
1933 if (files[random_file_index].being_compacted) {
1934 // Retry as the selected file is currently being compacted
1935 continue;
1936 }
1937
1938 std::vector<std::string> input_files;
1939 input_files.push_back(files[random_file_index].name);
1940 if (random_file_index > 0 &&
1941 !files[random_file_index - 1].being_compacted) {
1942 input_files.push_back(files[random_file_index - 1].name);
1943 }
1944 if (random_file_index + 1 < files.size() &&
1945 !files[random_file_index + 1].being_compacted) {
1946 input_files.push_back(files[random_file_index + 1].name);
1947 }
1948
1949 size_t output_level =
1950 std::min(random_level + 1, cf_meta_data.levels.size() - 1);
1951 auto s =
1952 db_->CompactFiles(CompactionOptions(), random_cf, input_files,
1953 static_cast<int>(output_level));
1954 if (!s.ok()) {
1955 fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
1956 s.ToString().c_str());
1957 thread->stats.AddNumCompactFilesFailed(1);
1958 } else {
1959 thread->stats.AddNumCompactFilesSucceed(1);
1960 }
1961 break;
1962 }
1963 }
1964 }
1965 #endif // !ROCKSDB_LITE
1966 int64_t rand_key = GenerateOneKey(thread, i);
1967 int rand_column_family = thread->rand.Next() % FLAGS_column_families;
1968 std::string keystr = Key(rand_key);
1969 Slice key = keystr;
1970 std::unique_ptr<MutexLock> lock;
1971 if (ShouldAcquireMutexOnKey()) {
1972 lock.reset(new MutexLock(
1973 shared->GetMutexForKey(rand_column_family, rand_key)));
1974 }
1975
1976 auto column_family = column_families_[rand_column_family];
1977
1978 if (FLAGS_flush_one_in > 0 &&
1979 thread->rand.Uniform(FLAGS_flush_one_in) == 0) {
1980 FlushOptions flush_opts;
1981 Status status = db_->Flush(flush_opts, column_family);
1982 if (!status.ok()) {
1983 fprintf(stdout, "Unable to perform Flush(): %s\n", status.ToString().c_str());
1984 }
1985 }
1986
1987 if (FLAGS_compact_range_one_in > 0 &&
1988 thread->rand.Uniform(FLAGS_compact_range_one_in) == 0) {
1989 int64_t end_key_num;
1990 if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) {
1991 end_key_num = port::kMaxInt64;
1992 } else {
1993 end_key_num = FLAGS_compact_range_width + rand_key;
1994 }
1995 std::string end_key_buf = Key(end_key_num);
1996 Slice end_key(end_key_buf);
1997
1998 CompactRangeOptions cro;
1999 cro.exclusive_manual_compaction =
2000 static_cast<bool>(thread->rand.Next() % 2);
2001 Status status = db_->CompactRange(cro, column_family, &key, &end_key);
2002 if (!status.ok()) {
2003 printf("Unable to perform CompactRange(): %s\n",
2004 status.ToString().c_str());
2005 }
2006 }
2007
2008 std::vector<int> rand_column_families =
2009 GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
2010 std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
2011
2012 if (FLAGS_ingest_external_file_one_in > 0 &&
2013 thread->rand.Uniform(FLAGS_ingest_external_file_one_in) == 0) {
2014 TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
2015 }
2016
2017 if (FLAGS_acquire_snapshot_one_in > 0 &&
2018 thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
2019 auto snapshot = db_->GetSnapshot();
2020 ReadOptions ropt;
2021 ropt.snapshot = snapshot;
2022 std::string value_at;
2023 // When taking a snapshot, we also read a key from that snapshot. We
2024 // will later read the same key before releasing the snapshot and verify
2025 // that the results are the same.
2026 auto status_at = db_->Get(ropt, column_family, key, &value_at);
2027 std::vector<bool> *key_vec = nullptr;
2028
2029 if (FLAGS_compare_full_db_state_snapshot &&
2030 (thread->tid == 0)) {
2031 key_vec = new std::vector<bool>(FLAGS_max_key);
2032 std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
2033 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
2034 uint64_t key_val;
2035 if (GetIntVal(iterator->key().ToString(), &key_val)) {
2036 (*key_vec)[key_val] = true;
2037 }
2038 }
2039 }
2040
2041 ThreadState::SnapshotState snap_state = {
2042 snapshot, rand_column_family, column_family->GetName(),
2043 keystr, status_at, value_at, key_vec};
2044 thread->snapshot_queue.emplace(
2045 std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops),
2046 snap_state);
2047 }
2048 while (!thread->snapshot_queue.empty() &&
2049 i == thread->snapshot_queue.front().first) {
2050 auto snap_state = thread->snapshot_queue.front().second;
2051 assert(snap_state.snapshot);
2052 // Note: this is unsafe as the cf might be dropped concurrently. But it
2053 // is ok since unclean cf drop is cunnrently not supported by write
2054 // prepared transactions.
2055 Status s =
2056 AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
2057 if (!s.ok()) {
2058 VerificationAbort(shared, "Snapshot gave inconsistent state", s);
2059 }
2060 db_->ReleaseSnapshot(snap_state.snapshot);
2061 delete snap_state.key_vec;
2062 thread->snapshot_queue.pop();
2063 }
2064
2065 int prob_op = thread->rand.Uniform(100);
2066 if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
2067 // OPERATION read
2068 TestGet(thread, read_opts, rand_column_families, rand_keys);
2069 } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
2070 // OPERATION prefix scan
2071 // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
2072 // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
2073 // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
2074 // prefix
2075 TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
2076 } else if (prefixBound <= prob_op && prob_op < writeBound) {
2077 // OPERATION write
2078 TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
2079 value, lock);
2080 } else if (writeBound <= prob_op && prob_op < delBound) {
2081 // OPERATION delete
2082 TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
2083 } else if (delBound <= prob_op && prob_op < delRangeBound) {
2084 // OPERATION delete range
2085 TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
2086 lock);
2087 } else {
2088 // OPERATION iterate
2089 TestIterate(thread, read_opts, rand_column_families, rand_keys);
2090 }
2091 thread->stats.FinishedSingleOp();
2092 }
2093
2094 thread->stats.Stop();
2095 }
2096
2097 virtual void VerifyDb(ThreadState* thread) const = 0;
2098
2099 virtual void MaybeClearOneColumnFamily(ThreadState* /* thread */) {}
2100
2101 virtual bool ShouldAcquireMutexOnKey() const { return false; }
2102
2103 virtual std::vector<int> GenerateColumnFamilies(
2104 const int /* num_column_families */, int rand_column_family) const {
2105 return {rand_column_family};
2106 }
2107
2108 virtual std::vector<int64_t> GenerateKeys(int64_t rand_key) const {
2109 return {rand_key};
2110 }
2111
2112 virtual Status TestGet(ThreadState* thread,
2113 const ReadOptions& read_opts,
2114 const std::vector<int>& rand_column_families,
2115 const std::vector<int64_t>& rand_keys) = 0;
2116
2117 virtual Status TestPrefixScan(ThreadState* thread,
2118 const ReadOptions& read_opts,
2119 const std::vector<int>& rand_column_families,
2120 const std::vector<int64_t>& rand_keys) = 0;
2121
2122 virtual Status TestPut(ThreadState* thread,
2123 WriteOptions& write_opts, const ReadOptions& read_opts,
2124 const std::vector<int>& cf_ids, const std::vector<int64_t>& keys,
2125 char (&value)[100], std::unique_ptr<MutexLock>& lock) = 0;
2126
2127 virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
2128 const std::vector<int>& rand_column_families,
2129 const std::vector<int64_t>& rand_keys,
2130 std::unique_ptr<MutexLock>& lock) = 0;
2131
2132 virtual Status TestDeleteRange(ThreadState* thread,
2133 WriteOptions& write_opts,
2134 const std::vector<int>& rand_column_families,
2135 const std::vector<int64_t>& rand_keys,
2136 std::unique_ptr<MutexLock>& lock) = 0;
2137
2138 virtual void TestIngestExternalFile(
2139 ThreadState* thread, const std::vector<int>& rand_column_families,
2140 const std::vector<int64_t>& rand_keys,
2141 std::unique_ptr<MutexLock>& lock) = 0;
2142
2143 // Given a key K, this creates an iterator which scans to K and then
2144 // does a random sequence of Next/Prev operations.
2145 virtual Status TestIterate(ThreadState* thread,
2146 const ReadOptions& read_opts,
2147 const std::vector<int>& rand_column_families,
2148 const std::vector<int64_t>& rand_keys) {
2149 Status s;
2150 const Snapshot* snapshot = db_->GetSnapshot();
2151 ReadOptions readoptionscopy = read_opts;
2152 readoptionscopy.snapshot = snapshot;
2153
2154 std::string upper_bound_str;
2155 Slice upper_bound;
2156 if (thread->rand.OneIn(16)) {
2157 // in 1/16 chance, set a iterator upper bound
2158 int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
2159 upper_bound_str = Key(rand_upper_key);
2160 upper_bound = Slice(upper_bound_str);
2161 // uppder_bound can be smaller than seek key, but the query itself
2162 // should not crash either.
2163 readoptionscopy.iterate_upper_bound = &upper_bound;
2164 }
2165 std::string lower_bound_str;
2166 Slice lower_bound;
2167 if (thread->rand.OneIn(16)) {
2168 // in 1/16 chance, set a iterator lower bound
2169 int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
2170 lower_bound_str = Key(rand_lower_key);
2171 lower_bound = Slice(lower_bound_str);
2172 // uppder_bound can be smaller than seek key, but the query itself
2173 // should not crash either.
2174 readoptionscopy.iterate_lower_bound = &lower_bound;
2175 }
2176
2177 auto cfh = column_families_[rand_column_families[0]];
2178 std::unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, cfh));
2179
2180 std::string key_str = Key(rand_keys[0]);
2181 Slice key = key_str;
2182 iter->Seek(key);
2183 for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
2184 if (thread->rand.OneIn(2)) {
2185 iter->Next();
2186 } else {
2187 iter->Prev();
2188 }
2189 }
2190
2191 if (s.ok()) {
2192 thread->stats.AddIterations(1);
2193 } else {
2194 thread->stats.AddErrors(1);
2195 }
2196
2197 db_->ReleaseSnapshot(snapshot);
2198
2199 return s;
2200 }
2201
2202 void VerificationAbort(SharedState* shared, std::string msg, Status s) const {
2203 printf("Verification failed: %s. Status is %s\n", msg.c_str(),
2204 s.ToString().c_str());
2205 shared->SetVerificationFailure();
2206 }
2207
2208 void VerificationAbort(SharedState* shared, std::string msg, int cf,
2209 int64_t key) const {
2210 printf("Verification failed for column family %d key %" PRIi64 ": %s\n", cf, key,
2211 msg.c_str());
2212 shared->SetVerificationFailure();
2213 }
2214
2215 void PrintEnv() const {
2216 fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
2217 kMinorVersion);
2218 fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
2219 fprintf(stdout, "TransactionDB : %s\n",
2220 FLAGS_use_txn ? "true" : "false");
2221 fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
2222 if (!FLAGS_test_batches_snapshots) {
2223 fprintf(stdout, "Clear CFs one in : %d\n",
2224 FLAGS_clear_column_family_one_in);
2225 }
2226 fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
2227 fprintf(stdout, "Ops per thread : %lu\n",
2228 (unsigned long)FLAGS_ops_per_thread);
2229 std::string ttl_state("unused");
2230 if (FLAGS_ttl > 0) {
2231 ttl_state = NumberToString(FLAGS_ttl);
2232 }
2233 fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
2234 fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
2235 fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
2236 fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
2237 fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
2238 fprintf(stdout, "Delete range percentage : %d%%\n", FLAGS_delrangepercent);
2239 fprintf(stdout, "No overwrite percentage : %d%%\n",
2240 FLAGS_nooverwritepercent);
2241 fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
2242 fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
2243 FLAGS_db_write_buffer_size);
2244 fprintf(stdout, "Write-buffer-size : %d\n",
2245 FLAGS_write_buffer_size);
2246 fprintf(stdout, "Iterations : %lu\n",
2247 (unsigned long)FLAGS_num_iterations);
2248 fprintf(stdout, "Max key : %lu\n",
2249 (unsigned long)FLAGS_max_key);
2250 fprintf(stdout, "Ratio #ops/#keys : %f\n",
2251 (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
2252 fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen);
2253 fprintf(stdout, "Batches/snapshots : %d\n",
2254 FLAGS_test_batches_snapshots);
2255 fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update);
2256 fprintf(stdout, "Num keys per lock : %d\n",
2257 1 << FLAGS_log2_keys_per_lock);
2258 std::string compression = CompressionTypeToString(FLAGS_compression_type_e);
2259 fprintf(stdout, "Compression : %s\n", compression.c_str());
2260 std::string checksum = ChecksumTypeToString(FLAGS_checksum_type_e);
2261 fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
2262 fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
2263 FLAGS_subcompactions);
2264
2265 const char* memtablerep = "";
2266 switch (FLAGS_rep_factory) {
2267 case kSkipList:
2268 memtablerep = "skip_list";
2269 break;
2270 case kHashSkipList:
2271 memtablerep = "prefix_hash";
2272 break;
2273 case kVectorRep:
2274 memtablerep = "vector";
2275 break;
2276 }
2277
2278 fprintf(stdout, "Memtablerep : %s\n", memtablerep);
2279
2280 fprintf(stdout, "Test kill odd : %d\n", rocksdb_kill_odds);
2281 if (!rocksdb_kill_prefix_blacklist.empty()) {
2282 fprintf(stdout, "Skipping kill points prefixes:\n");
2283 for (auto& p : rocksdb_kill_prefix_blacklist) {
2284 fprintf(stdout, " %s\n", p.c_str());
2285 }
2286 }
2287
2288 fprintf(stdout, "------------------------------------------------\n");
2289 }
2290
2291 void Open() {
2292 assert(db_ == nullptr);
2293 #ifndef ROCKSDB_LITE
2294 assert(txn_db_ == nullptr);
2295 #endif
2296 if (FLAGS_options_file.empty()) {
2297 BlockBasedTableOptions block_based_options;
2298 block_based_options.block_cache = cache_;
2299 block_based_options.block_cache_compressed = compressed_cache_;
2300 block_based_options.checksum = FLAGS_checksum_type_e;
2301 block_based_options.block_size = FLAGS_block_size;
2302 block_based_options.format_version =
2303 static_cast<uint32_t>(FLAGS_format_version);
2304 block_based_options.index_block_restart_interval =
2305 static_cast<int32_t>(FLAGS_index_block_restart_interval);
2306 block_based_options.filter_policy = filter_policy_;
2307 options_.table_factory.reset(
2308 NewBlockBasedTableFactory(block_based_options));
2309 options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
2310 options_.write_buffer_size = FLAGS_write_buffer_size;
2311 options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
2312 options_.min_write_buffer_number_to_merge =
2313 FLAGS_min_write_buffer_number_to_merge;
2314 options_.max_write_buffer_number_to_maintain =
2315 FLAGS_max_write_buffer_number_to_maintain;
2316 options_.memtable_prefix_bloom_size_ratio =
2317 FLAGS_memtable_prefix_bloom_size_ratio;
2318 options_.max_background_compactions = FLAGS_max_background_compactions;
2319 options_.max_background_flushes = FLAGS_max_background_flushes;
2320 options_.compaction_style =
2321 static_cast<rocksdb::CompactionStyle>(FLAGS_compaction_style);
2322 options_.prefix_extractor.reset(
2323 NewFixedPrefixTransform(FLAGS_prefix_size));
2324 options_.max_open_files = FLAGS_open_files;
2325 options_.statistics = dbstats;
2326 options_.env = FLAGS_env;
2327 options_.use_fsync = FLAGS_use_fsync;
2328 options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
2329 options_.allow_mmap_reads = FLAGS_mmap_read;
2330 options_.allow_mmap_writes = FLAGS_mmap_write;
2331 options_.use_direct_reads = FLAGS_use_direct_reads;
2332 options_.use_direct_io_for_flush_and_compaction =
2333 FLAGS_use_direct_io_for_flush_and_compaction;
2334 options_.target_file_size_base = FLAGS_target_file_size_base;
2335 options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
2336 options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
2337 options_.max_bytes_for_level_multiplier =
2338 FLAGS_max_bytes_for_level_multiplier;
2339 options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
2340 options_.level0_slowdown_writes_trigger =
2341 FLAGS_level0_slowdown_writes_trigger;
2342 options_.level0_file_num_compaction_trigger =
2343 FLAGS_level0_file_num_compaction_trigger;
2344 options_.compression = FLAGS_compression_type_e;
2345 options_.compression_opts.max_dict_bytes =
2346 FLAGS_compression_max_dict_bytes;
2347 options_.compression_opts.zstd_max_train_bytes =
2348 FLAGS_compression_zstd_max_train_bytes;
2349 options_.create_if_missing = true;
2350 options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
2351 options_.inplace_update_support = FLAGS_in_place_update;
2352 options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
2353 options_.allow_concurrent_memtable_write =
2354 FLAGS_allow_concurrent_memtable_write;
2355 options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
2356 options_.enable_write_thread_adaptive_yield =
2357 FLAGS_enable_write_thread_adaptive_yield;
2358 options_.compaction_options_universal.size_ratio =
2359 FLAGS_universal_size_ratio;
2360 options_.compaction_options_universal.min_merge_width =
2361 FLAGS_universal_min_merge_width;
2362 options_.compaction_options_universal.max_merge_width =
2363 FLAGS_universal_max_merge_width;
2364 options_.compaction_options_universal.max_size_amplification_percent =
2365 FLAGS_universal_max_size_amplification_percent;
2366 } else {
2367 #ifdef ROCKSDB_LITE
2368 fprintf(stderr, "--options_file not supported in lite mode\n");
2369 exit(1);
2370 #else
2371 DBOptions db_options;
2372 std::vector<ColumnFamilyDescriptor> cf_descriptors;
2373 Status s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(),
2374 &db_options, &cf_descriptors);
2375 if (!s.ok()) {
2376 fprintf(stderr, "Unable to load options file %s --- %s\n",
2377 FLAGS_options_file.c_str(), s.ToString().c_str());
2378 exit(1);
2379 }
2380 options_ = Options(db_options, cf_descriptors[0].options);
2381 #endif // ROCKSDB_LITE
2382 }
2383
2384 if (FLAGS_rate_limiter_bytes_per_sec > 0) {
2385 options_.rate_limiter.reset(NewGenericRateLimiter(
2386 FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
2387 10 /* fairness */,
2388 FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
2389 : RateLimiter::Mode::kWritesOnly));
2390 if (FLAGS_rate_limit_bg_reads) {
2391 options_.new_table_reader_for_compaction_inputs = true;
2392 }
2393 }
2394
2395 if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
2396 fprintf(stderr,
2397 "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
2398 exit(1);
2399 }
2400 if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
2401 fprintf(stderr,
2402 "WARNING: prefix_size is non-zero but "
2403 "memtablerep != prefix_hash\n");
2404 }
2405 switch (FLAGS_rep_factory) {
2406 case kSkipList:
2407 // no need to do anything
2408 break;
2409 #ifndef ROCKSDB_LITE
2410 case kHashSkipList:
2411 options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
2412 break;
2413 case kVectorRep:
2414 options_.memtable_factory.reset(new VectorRepFactory());
2415 break;
2416 #else
2417 default:
2418 fprintf(stderr,
2419 "RocksdbLite only supports skip list mem table. Skip "
2420 "--rep_factory\n");
2421 #endif // ROCKSDB_LITE
2422 }
2423
2424 if (FLAGS_use_full_merge_v1) {
2425 options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
2426 } else {
2427 options_.merge_operator = MergeOperators::CreatePutOperator();
2428 }
2429
2430 fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2431
2432 Status s;
2433 if (FLAGS_ttl == -1) {
2434 std::vector<std::string> existing_column_families;
2435 s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
2436 &existing_column_families); // ignore errors
2437 if (!s.ok()) {
2438 // DB doesn't exist
2439 assert(existing_column_families.empty());
2440 assert(column_family_names_.empty());
2441 column_family_names_.push_back(kDefaultColumnFamilyName);
2442 } else if (column_family_names_.empty()) {
2443 // this is the first call to the function Open()
2444 column_family_names_ = existing_column_families;
2445 } else {
2446 // this is a reopen. just assert that existing column_family_names are
2447 // equivalent to what we remember
2448 auto sorted_cfn = column_family_names_;
2449 std::sort(sorted_cfn.begin(), sorted_cfn.end());
2450 std::sort(existing_column_families.begin(),
2451 existing_column_families.end());
2452 if (sorted_cfn != existing_column_families) {
2453 fprintf(stderr,
2454 "Expected column families differ from the existing:\n");
2455 printf("Expected: {");
2456 for (auto cf : sorted_cfn) {
2457 printf("%s ", cf.c_str());
2458 }
2459 printf("}\n");
2460 printf("Existing: {");
2461 for (auto cf : existing_column_families) {
2462 printf("%s ", cf.c_str());
2463 }
2464 printf("}\n");
2465 }
2466 assert(sorted_cfn == existing_column_families);
2467 }
2468 std::vector<ColumnFamilyDescriptor> cf_descriptors;
2469 for (auto name : column_family_names_) {
2470 if (name != kDefaultColumnFamilyName) {
2471 new_column_family_name_ =
2472 std::max(new_column_family_name_.load(), std::stoi(name) + 1);
2473 }
2474 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2475 }
2476 while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
2477 std::string name = ToString(new_column_family_name_.load());
2478 new_column_family_name_++;
2479 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2480 column_family_names_.push_back(name);
2481 }
2482 options_.listeners.clear();
2483 options_.listeners.emplace_back(
2484 new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
2485 options_.create_missing_column_families = true;
2486 if (!FLAGS_use_txn) {
2487 s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
2488 &column_families_, &db_);
2489 } else {
2490 #ifndef ROCKSDB_LITE
2491 TransactionDBOptions txn_db_options;
2492 // For the moment it is sufficient to test WRITE_PREPARED policy
2493 txn_db_options.write_policy = TxnDBWritePolicy::WRITE_PREPARED;
2494 s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
2495 cf_descriptors, &column_families_, &txn_db_);
2496 db_ = txn_db_;
2497 // after a crash, rollback to commit recovered transactions
2498 std::vector<Transaction*> trans;
2499 txn_db_->GetAllPreparedTransactions(&trans);
2500 Random rand(static_cast<uint32_t>(FLAGS_seed));
2501 for (auto txn : trans) {
2502 if (rand.OneIn(2)) {
2503 s = txn->Commit();
2504 assert(s.ok());
2505 } else {
2506 s = txn->Rollback();
2507 assert(s.ok());
2508 }
2509 delete txn;
2510 }
2511 trans.clear();
2512 txn_db_->GetAllPreparedTransactions(&trans);
2513 assert(trans.size() == 0);
2514 #endif
2515 }
2516 assert(!s.ok() || column_families_.size() ==
2517 static_cast<size_t>(FLAGS_column_families));
2518 } else {
2519 #ifndef ROCKSDB_LITE
2520 DBWithTTL* db_with_ttl;
2521 s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
2522 db_ = db_with_ttl;
2523 #else
2524 fprintf(stderr, "TTL is not supported in RocksDBLite\n");
2525 exit(1);
2526 #endif
2527 }
2528 if (!s.ok()) {
2529 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2530 exit(1);
2531 }
2532 }
2533
2534 void Reopen() {
2535 for (auto cf : column_families_) {
2536 delete cf;
2537 }
2538 column_families_.clear();
2539 delete db_;
2540 db_ = nullptr;
2541 #ifndef ROCKSDB_LITE
2542 txn_db_ = nullptr;
2543 #endif
2544
2545 num_times_reopened_++;
2546 auto now = FLAGS_env->NowMicros();
2547 fprintf(stdout, "%s Reopening database for the %dth time\n",
2548 FLAGS_env->TimeToString(now/1000000).c_str(),
2549 num_times_reopened_);
2550 Open();
2551 }
2552
2553 void PrintStatistics() {
2554 if (dbstats) {
2555 fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
2556 }
2557 }
2558
2559 std::shared_ptr<Cache> cache_;
2560 std::shared_ptr<Cache> compressed_cache_;
2561 std::shared_ptr<const FilterPolicy> filter_policy_;
2562 DB* db_;
2563 #ifndef ROCKSDB_LITE
2564 TransactionDB* txn_db_;
2565 #endif
2566 Options options_;
2567 std::vector<ColumnFamilyHandle*> column_families_;
2568 std::vector<std::string> column_family_names_;
2569 std::atomic<int> new_column_family_name_;
2570 int num_times_reopened_;
2571 std::unordered_map<std::string, std::vector<std::string>> options_table_;
2572 std::vector<std::string> options_index_;
2573 };
2574
2575 class NonBatchedOpsStressTest : public StressTest {
2576 public:
2577 NonBatchedOpsStressTest() {}
2578
2579 virtual ~NonBatchedOpsStressTest() {}
2580
2581 virtual void VerifyDb(ThreadState* thread) const {
2582 ReadOptions options(FLAGS_verify_checksum, true);
2583 auto shared = thread->shared;
2584 const int64_t max_key = shared->GetMaxKey();
2585 const int64_t keys_per_thread = max_key / shared->GetNumThreads();
2586 int64_t start = keys_per_thread * thread->tid;
2587 int64_t end = start + keys_per_thread;
2588 if (thread->tid == shared->GetNumThreads() - 1) {
2589 end = max_key;
2590 }
2591 for (size_t cf = 0; cf < column_families_.size(); ++cf) {
2592 if (thread->shared->HasVerificationFailedYet()) {
2593 break;
2594 }
2595 if (!thread->rand.OneIn(2)) {
2596 // Use iterator to verify this range
2597 unique_ptr<Iterator> iter(
2598 db_->NewIterator(options, column_families_[cf]));
2599 iter->Seek(Key(start));
2600 for (auto i = start; i < end; i++) {
2601 if (thread->shared->HasVerificationFailedYet()) {
2602 break;
2603 }
2604 // TODO(ljin): update "long" to uint64_t
2605 // Reseek when the prefix changes
2606 if (i % (static_cast<int64_t>(1) << 8 * (8 - FLAGS_prefix_size)) ==
2607 0) {
2608 iter->Seek(Key(i));
2609 }
2610 std::string from_db;
2611 std::string keystr = Key(i);
2612 Slice k = keystr;
2613 Status s = iter->status();
2614 if (iter->Valid()) {
2615 if (iter->key().compare(k) > 0) {
2616 s = Status::NotFound(Slice());
2617 } else if (iter->key().compare(k) == 0) {
2618 from_db = iter->value().ToString();
2619 iter->Next();
2620 } else if (iter->key().compare(k) < 0) {
2621 VerificationAbort(shared, "An out of range key was found",
2622 static_cast<int>(cf), i);
2623 }
2624 } else {
2625 // The iterator found no value for the key in question, so do not
2626 // move to the next item in the iterator
2627 s = Status::NotFound(Slice());
2628 }
2629 VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
2630 true);
2631 if (from_db.length()) {
2632 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
2633 from_db.data(), from_db.length());
2634 }
2635 }
2636 } else {
2637 // Use Get to verify this range
2638 for (auto i = start; i < end; i++) {
2639 if (thread->shared->HasVerificationFailedYet()) {
2640 break;
2641 }
2642 std::string from_db;
2643 std::string keystr = Key(i);
2644 Slice k = keystr;
2645 Status s = db_->Get(options, column_families_[cf], k, &from_db);
2646 VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
2647 true);
2648 if (from_db.length()) {
2649 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
2650 from_db.data(), from_db.length());
2651 }
2652 }
2653 }
2654 }
2655 }
2656
2657 virtual void MaybeClearOneColumnFamily(ThreadState* thread) {
2658 if (FLAGS_clear_column_family_one_in != 0 && FLAGS_column_families > 1) {
2659 if (thread->rand.OneIn(FLAGS_clear_column_family_one_in)) {
2660 // drop column family and then create it again (can't drop default)
2661 int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
2662 std::string new_name =
2663 ToString(new_column_family_name_.fetch_add(1));
2664 {
2665 MutexLock l(thread->shared->GetMutex());
2666 fprintf(
2667 stdout,
2668 "[CF %d] Dropping and recreating column family. new name: %s\n",
2669 cf, new_name.c_str());
2670 }
2671 thread->shared->LockColumnFamily(cf);
2672 Status s = db_->DropColumnFamily(column_families_[cf]);
2673 delete column_families_[cf];
2674 if (!s.ok()) {
2675 fprintf(stderr, "dropping column family error: %s\n",
2676 s.ToString().c_str());
2677 std::terminate();
2678 }
2679 s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
2680 &column_families_[cf]);
2681 column_family_names_[cf] = new_name;
2682 thread->shared->ClearColumnFamily(cf);
2683 if (!s.ok()) {
2684 fprintf(stderr, "creating column family error: %s\n",
2685 s.ToString().c_str());
2686 std::terminate();
2687 }
2688 thread->shared->UnlockColumnFamily(cf);
2689 }
2690 }
2691 }
2692
2693 virtual bool ShouldAcquireMutexOnKey() const { return true; }
2694
2695 virtual Status TestGet(ThreadState* thread,
2696 const ReadOptions& read_opts,
2697 const std::vector<int>& rand_column_families,
2698 const std::vector<int64_t>& rand_keys) {
2699 auto cfh = column_families_[rand_column_families[0]];
2700 std::string key_str = Key(rand_keys[0]);
2701 Slice key = key_str;
2702 std::string from_db;
2703 Status s = db_->Get(read_opts, cfh, key, &from_db);
2704 if (s.ok()) {
2705 // found case
2706 thread->stats.AddGets(1, 1);
2707 } else if (s.IsNotFound()) {
2708 // not found case
2709 thread->stats.AddGets(1, 0);
2710 } else {
2711 // errors case
2712 thread->stats.AddErrors(1);
2713 }
2714 return s;
2715 }
2716
2717 virtual Status TestPrefixScan(ThreadState* thread,
2718 const ReadOptions& read_opts,
2719 const std::vector<int>& rand_column_families,
2720 const std::vector<int64_t>& rand_keys) {
2721 auto cfh = column_families_[rand_column_families[0]];
2722 std::string key_str = Key(rand_keys[0]);
2723 Slice key = key_str;
2724 Slice prefix = Slice(key.data(), FLAGS_prefix_size);
2725
2726 std::string upper_bound;
2727 Slice ub_slice;
2728 ReadOptions ro_copy = read_opts;
2729 if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) {
2730 // For half of the time, set the upper bound to the next prefix
2731 ub_slice = Slice(upper_bound);
2732 ro_copy.iterate_upper_bound = &ub_slice;
2733 }
2734
2735 Iterator* iter = db_->NewIterator(ro_copy, cfh);
2736 int64_t count = 0;
2737 for (iter->Seek(prefix);
2738 iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
2739 ++count;
2740 }
2741 assert(count <=
2742 (static_cast<int64_t>(1) << ((8 - FLAGS_prefix_size) * 8)));
2743 Status s = iter->status();
2744 if (iter->status().ok()) {
2745 thread->stats.AddPrefixes(1, static_cast<int>(count));
2746 } else {
2747 thread->stats.AddErrors(1);
2748 }
2749 delete iter;
2750 return s;
2751 }
2752
2753 virtual Status TestPut(ThreadState* thread,
2754 WriteOptions& write_opts, const ReadOptions& read_opts,
2755 const std::vector<int>& rand_column_families,
2756 const std::vector<int64_t>& rand_keys,
2757 char (&value) [100], std::unique_ptr<MutexLock>& lock) {
2758 auto shared = thread->shared;
2759 int64_t max_key = shared->GetMaxKey();
2760 int64_t rand_key = rand_keys[0];
2761 int rand_column_family = rand_column_families[0];
2762 while (!shared->AllowsOverwrite(rand_key) &&
2763 (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
2764 lock.reset();
2765 rand_key = thread->rand.Next() % max_key;
2766 rand_column_family = thread->rand.Next() % FLAGS_column_families;
2767 lock.reset(new MutexLock(
2768 shared->GetMutexForKey(rand_column_family, rand_key)));
2769 }
2770
2771 std::string key_str = Key(rand_key);
2772 Slice key = key_str;
2773 ColumnFamilyHandle* cfh = column_families_[rand_column_family];
2774
2775 if (FLAGS_verify_before_write) {
2776 std::string key_str2 = Key(rand_key);
2777 Slice k = key_str2;
2778 std::string from_db;
2779 Status s = db_->Get(read_opts, cfh, k, &from_db);
2780 if (!VerifyValue(rand_column_family, rand_key, read_opts, shared,
2781 from_db, s, true)) {
2782 return s;
2783 }
2784 }
2785 uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
2786 size_t sz = GenerateValue(value_base, value, sizeof(value));
2787 Slice v(value, sz);
2788 shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
2789 Status s;
2790 if (FLAGS_use_merge) {
2791 if (!FLAGS_use_txn) {
2792 s = db_->Merge(write_opts, cfh, key, v);
2793 } else {
2794 #ifndef ROCKSDB_LITE
2795 Transaction* txn;
2796 s = NewTxn(write_opts, &txn);
2797 if (s.ok()) {
2798 s = txn->Merge(cfh, key, v);
2799 if (s.ok()) {
2800 s = CommitTxn(txn);
2801 }
2802 }
2803 #endif
2804 }
2805 } else {
2806 if (!FLAGS_use_txn) {
2807 s = db_->Put(write_opts, cfh, key, v);
2808 } else {
2809 #ifndef ROCKSDB_LITE
2810 Transaction* txn;
2811 s = NewTxn(write_opts, &txn);
2812 if (s.ok()) {
2813 s = txn->Put(cfh, key, v);
2814 if (s.ok()) {
2815 s = CommitTxn(txn);
2816 }
2817 }
2818 #endif
2819 }
2820 }
2821 shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
2822 if (!s.ok()) {
2823 fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
2824 std::terminate();
2825 }
2826 thread->stats.AddBytesForWrites(1, sz);
2827 PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key),
2828 value, sz);
2829 return s;
2830 }
2831
2832 virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
2833 const std::vector<int>& rand_column_families,
2834 const std::vector<int64_t>& rand_keys,
2835 std::unique_ptr<MutexLock>& lock) {
2836 int64_t rand_key = rand_keys[0];
2837 int rand_column_family = rand_column_families[0];
2838 auto shared = thread->shared;
2839 int64_t max_key = shared->GetMaxKey();
2840
2841 // OPERATION delete
2842 // If the chosen key does not allow overwrite and it does not exist,
2843 // choose another key.
2844 while (!shared->AllowsOverwrite(rand_key) &&
2845 !shared->Exists(rand_column_family, rand_key)) {
2846 lock.reset();
2847 rand_key = thread->rand.Next() % max_key;
2848 rand_column_family = thread->rand.Next() % FLAGS_column_families;
2849 lock.reset(new MutexLock(
2850 shared->GetMutexForKey(rand_column_family, rand_key)));
2851 }
2852
2853 std::string key_str = Key(rand_key);
2854 Slice key = key_str;
2855 auto cfh = column_families_[rand_column_family];
2856
2857 // Use delete if the key may be overwritten and a single deletion
2858 // otherwise.
2859 Status s;
2860 if (shared->AllowsOverwrite(rand_key)) {
2861 shared->Delete(rand_column_family, rand_key, true /* pending */);
2862 if (!FLAGS_use_txn) {
2863 s = db_->Delete(write_opts, cfh, key);
2864 } else {
2865 #ifndef ROCKSDB_LITE
2866 Transaction* txn;
2867 s = NewTxn(write_opts, &txn);
2868 if (s.ok()) {
2869 s = txn->Delete(cfh, key);
2870 if (s.ok()) {
2871 s = CommitTxn(txn);
2872 }
2873 }
2874 #endif
2875 }
2876 shared->Delete(rand_column_family, rand_key, false /* pending */);
2877 thread->stats.AddDeletes(1);
2878 if (!s.ok()) {
2879 fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
2880 std::terminate();
2881 }
2882 } else {
2883 shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
2884 if (!FLAGS_use_txn) {
2885 s = db_->SingleDelete(write_opts, cfh, key);
2886 } else {
2887 #ifndef ROCKSDB_LITE
2888 Transaction* txn;
2889 s = NewTxn(write_opts, &txn);
2890 if (s.ok()) {
2891 s = txn->SingleDelete(cfh, key);
2892 if (s.ok()) {
2893 s = CommitTxn(txn);
2894 }
2895 }
2896 #endif
2897 }
2898 shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
2899 thread->stats.AddSingleDeletes(1);
2900 if (!s.ok()) {
2901 fprintf(stderr, "single delete error: %s\n",
2902 s.ToString().c_str());
2903 std::terminate();
2904 }
2905 }
2906 return s;
2907 }
2908
2909 virtual Status TestDeleteRange(ThreadState* thread,
2910 WriteOptions& write_opts,
2911 const std::vector<int>& rand_column_families,
2912 const std::vector<int64_t>& rand_keys,
2913 std::unique_ptr<MutexLock>& lock) {
2914 // OPERATION delete range
2915 std::vector<std::unique_ptr<MutexLock>> range_locks;
2916 // delete range does not respect disallowed overwrites. the keys for
2917 // which overwrites are disallowed are randomly distributed so it
2918 // could be expensive to find a range where each key allows
2919 // overwrites.
2920 int64_t rand_key = rand_keys[0];
2921 int rand_column_family = rand_column_families[0];
2922 auto shared = thread->shared;
2923 int64_t max_key = shared->GetMaxKey();
2924 if (rand_key > max_key - FLAGS_range_deletion_width) {
2925 lock.reset();
2926 rand_key = thread->rand.Next() %
2927 (max_key - FLAGS_range_deletion_width + 1);
2928 range_locks.emplace_back(new MutexLock(
2929 shared->GetMutexForKey(rand_column_family, rand_key)));
2930 } else {
2931 range_locks.emplace_back(std::move(lock));
2932 }
2933 for (int j = 1; j < FLAGS_range_deletion_width; ++j) {
2934 if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
2935 range_locks.emplace_back(new MutexLock(
2936 shared->GetMutexForKey(rand_column_family, rand_key + j)));
2937 }
2938 }
2939 shared->DeleteRange(rand_column_family, rand_key,
2940 rand_key + FLAGS_range_deletion_width,
2941 true /* pending */);
2942
2943 std::string keystr = Key(rand_key);
2944 Slice key = keystr;
2945 auto cfh = column_families_[rand_column_family];
2946 std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
2947 Slice end_key = end_keystr;
2948 Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
2949 if (!s.ok()) {
2950 fprintf(stderr, "delete range error: %s\n",
2951 s.ToString().c_str());
2952 std::terminate();
2953 }
2954 int covered = shared->DeleteRange(
2955 rand_column_family, rand_key,
2956 rand_key + FLAGS_range_deletion_width, false /* pending */);
2957 thread->stats.AddRangeDeletions(1);
2958 thread->stats.AddCoveredByRangeDeletions(covered);
2959 return s;
2960 }
2961
2962 #ifdef ROCKSDB_LITE
2963 virtual void TestIngestExternalFile(
2964 ThreadState* /* thread */,
2965 const std::vector<int>& /* rand_column_families */,
2966 const std::vector<int64_t>& /* rand_keys */,
2967 std::unique_ptr<MutexLock>& /* lock */) {
2968 assert(false);
2969 fprintf(stderr,
2970 "RocksDB lite does not support "
2971 "TestIngestExternalFile\n");
2972 std::terminate();
2973 }
2974 #else
2975 virtual void TestIngestExternalFile(
2976 ThreadState* thread, const std::vector<int>& rand_column_families,
2977 const std::vector<int64_t>& rand_keys, std::unique_ptr<MutexLock>& lock) {
2978 const std::string sst_filename =
2979 FLAGS_db + "/." + ToString(thread->tid) + ".sst";
2980 Status s;
2981 if (FLAGS_env->FileExists(sst_filename).ok()) {
2982 // Maybe we terminated abnormally before, so cleanup to give this file
2983 // ingestion a clean slate
2984 s = FLAGS_env->DeleteFile(sst_filename);
2985 }
2986
2987 SstFileWriter sst_file_writer(EnvOptions(), options_);
2988 if (s.ok()) {
2989 s = sst_file_writer.Open(sst_filename);
2990 }
2991 int64_t key_base = rand_keys[0];
2992 int column_family = rand_column_families[0];
2993 std::vector<std::unique_ptr<MutexLock> > range_locks;
2994 std::vector<uint32_t> values;
2995 SharedState* shared = thread->shared;
2996
2997 // Grab locks, set pending state on expected values, and add keys
2998 for (int64_t key = key_base;
2999 s.ok() && key < std::min(key_base + FLAGS_ingest_external_file_width,
3000 shared->GetMaxKey());
3001 ++key) {
3002 if (key == key_base) {
3003 range_locks.emplace_back(std::move(lock));
3004 } else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
3005 range_locks.emplace_back(
3006 new MutexLock(shared->GetMutexForKey(column_family, key)));
3007 }
3008
3009 uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
3010 values.push_back(value_base);
3011 shared->Put(column_family, key, value_base, true /* pending */);
3012
3013 char value[100];
3014 size_t value_len = GenerateValue(value_base, value, sizeof(value));
3015 auto key_str = Key(key);
3016 s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
3017 }
3018
3019 if (s.ok()) {
3020 s = sst_file_writer.Finish();
3021 }
3022 if (s.ok()) {
3023 s = db_->IngestExternalFile(column_families_[column_family],
3024 {sst_filename}, IngestExternalFileOptions());
3025 }
3026 if (!s.ok()) {
3027 fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
3028 std::terminate();
3029 }
3030 int64_t key = key_base;
3031 for (int32_t value : values) {
3032 shared->Put(column_family, key, value, false /* pending */);
3033 ++key;
3034 }
3035 }
3036 #endif // ROCKSDB_LITE
3037
3038 bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/,
3039 SharedState* shared, const std::string& value_from_db,
3040 Status s, bool strict = false) const {
3041 if (shared->HasVerificationFailedYet()) {
3042 return false;
3043 }
3044 // compare value_from_db with the value in the shared state
3045 char value[kValueMaxLen];
3046 uint32_t value_base = shared->Get(cf, key);
3047 if (value_base == SharedState::UNKNOWN_SENTINEL) {
3048 return true;
3049 }
3050 if (value_base == SharedState::DELETION_SENTINEL && !strict) {
3051 return true;
3052 }
3053
3054 if (s.ok()) {
3055 if (value_base == SharedState::DELETION_SENTINEL) {
3056 VerificationAbort(shared, "Unexpected value found", cf, key);
3057 return false;
3058 }
3059 size_t sz = GenerateValue(value_base, value, sizeof(value));
3060 if (value_from_db.length() != sz) {
3061 VerificationAbort(shared, "Length of value read is not equal", cf, key);
3062 return false;
3063 }
3064 if (memcmp(value_from_db.data(), value, sz) != 0) {
3065 VerificationAbort(shared, "Contents of value read don't match", cf,
3066 key);
3067 return false;
3068 }
3069 } else {
3070 if (value_base != SharedState::DELETION_SENTINEL) {
3071 VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
3072 return false;
3073 }
3074 }
3075 return true;
3076 }
3077 };
3078
3079 class BatchedOpsStressTest : public StressTest {
3080 public:
3081 BatchedOpsStressTest() {}
3082 virtual ~BatchedOpsStressTest() {}
3083
3084 // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
3085 // ("9"+K, "9"+V) in DB atomically i.e in a single batch.
3086 // Also refer BatchedOpsStressTest::TestGet
3087 virtual Status TestPut(ThreadState* thread,
3088 WriteOptions& write_opts, const ReadOptions& /* read_opts */,
3089 const std::vector<int>& rand_column_families, const std::vector<int64_t>& rand_keys,
3090 char (&value)[100], std::unique_ptr<MutexLock>& /* lock */) {
3091 uint32_t value_base =
3092 thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
3093 size_t sz = GenerateValue(value_base, value, sizeof(value));
3094 Slice v(value, sz);
3095 std::string keys[10] = {"9", "8", "7", "6", "5",
3096 "4", "3", "2", "1", "0"};
3097 std::string values[10] = {"9", "8", "7", "6", "5",
3098 "4", "3", "2", "1", "0"};
3099 Slice value_slices[10];
3100 WriteBatch batch;
3101 Status s;
3102 auto cfh = column_families_[rand_column_families[0]];
3103 std::string key_str = Key(rand_keys[0]);
3104 for (int i = 0; i < 10; i++) {
3105 keys[i] += key_str;
3106 values[i] += v.ToString();
3107 value_slices[i] = values[i];
3108 if (FLAGS_use_merge) {
3109 batch.Merge(cfh, keys[i], value_slices[i]);
3110 } else {
3111 batch.Put(cfh, keys[i], value_slices[i]);
3112 }
3113 }
3114
3115 s = db_->Write(write_opts, &batch);
3116 if (!s.ok()) {
3117 fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
3118 thread->stats.AddErrors(1);
3119 } else {
3120 // we did 10 writes each of size sz + 1
3121 thread->stats.AddBytesForWrites(10, (sz + 1) * 10);
3122 }
3123
3124 return s;
3125 }
3126
3127 // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K)
3128 // in DB atomically i.e in a single batch. Also refer MultiGet.
3129 virtual Status TestDelete(ThreadState* thread, WriteOptions& writeoptions,
3130 const std::vector<int>& rand_column_families,
3131 const std::vector<int64_t>& rand_keys,
3132 std::unique_ptr<MutexLock>& /* lock */) {
3133 std::string keys[10] = {"9", "7", "5", "3", "1",
3134 "8", "6", "4", "2", "0"};
3135
3136 WriteBatch batch;
3137 Status s;
3138 auto cfh = column_families_[rand_column_families[0]];
3139 std::string key_str = Key(rand_keys[0]);
3140 for (int i = 0; i < 10; i++) {
3141 keys[i] += key_str;
3142 batch.Delete(cfh, keys[i]);
3143 }
3144
3145 s = db_->Write(writeoptions, &batch);
3146 if (!s.ok()) {
3147 fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str());
3148 thread->stats.AddErrors(1);
3149 } else {
3150 thread->stats.AddDeletes(10);
3151 }
3152
3153 return s;
3154 }
3155
3156 virtual Status TestDeleteRange(ThreadState* /* thread */,
3157 WriteOptions& /* write_opts */,
3158 const std::vector<int>& /* rand_column_families */,
3159 const std::vector<int64_t>& /* rand_keys */,
3160 std::unique_ptr<MutexLock>& /* lock */) {
3161 assert(false);
3162 return Status::NotSupported("BatchedOpsStressTest does not support "
3163 "TestDeleteRange");
3164 }
3165
3166 virtual void TestIngestExternalFile(
3167 ThreadState* /* thread */,
3168 const std::vector<int>& /* rand_column_families */,
3169 const std::vector<int64_t>& /* rand_keys */,
3170 std::unique_ptr<MutexLock>& /* lock */) {
3171 assert(false);
3172 fprintf(stderr,
3173 "BatchedOpsStressTest does not support "
3174 "TestIngestExternalFile\n");
3175 std::terminate();
3176 }
3177
3178 // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K
3179 // in the same snapshot, and verifies that all the values are of the form
3180 // "0"+V, "1"+V,..."9"+V.
3181 // ASSUMES that BatchedOpsStressTest::TestPut was used to put (K, V) into
3182 // the DB.
3183 virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
3184 const std::vector<int>& rand_column_families,
3185 const std::vector<int64_t>& rand_keys) {
3186 std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
3187 Slice key_slices[10];
3188 std::string values[10];
3189 ReadOptions readoptionscopy = readoptions;
3190 readoptionscopy.snapshot = db_->GetSnapshot();
3191 std::string key_str = Key(rand_keys[0]);
3192 Slice key = key_str;
3193 auto cfh = column_families_[rand_column_families[0]];
3194 std::string from_db;
3195 Status s;
3196 for (int i = 0; i < 10; i++) {
3197 keys[i] += key.ToString();
3198 key_slices[i] = keys[i];
3199 s = db_->Get(readoptionscopy, cfh, key_slices[i], &from_db);
3200 if (!s.ok() && !s.IsNotFound()) {
3201 fprintf(stderr, "get error: %s\n", s.ToString().c_str());
3202 values[i] = "";
3203 thread->stats.AddErrors(1);
3204 // we continue after error rather than exiting so that we can
3205 // find more errors if any
3206 } else if (s.IsNotFound()) {
3207 values[i] = "";
3208 thread->stats.AddGets(1, 0);
3209 } else {
3210 values[i] = from_db;
3211
3212 char expected_prefix = (keys[i])[0];
3213 char actual_prefix = (values[i])[0];
3214 if (actual_prefix != expected_prefix) {
3215 fprintf(stderr, "error expected prefix = %c actual = %c\n",
3216 expected_prefix, actual_prefix);
3217 }
3218 (values[i])[0] = ' '; // blank out the differing character
3219 thread->stats.AddGets(1, 1);
3220 }
3221 }
3222 db_->ReleaseSnapshot(readoptionscopy.snapshot);
3223
3224 // Now that we retrieved all values, check that they all match
3225 for (int i = 1; i < 10; i++) {
3226 if (values[i] != values[0]) {
3227 fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
3228 key.ToString(true).c_str(), StringToHex(values[0]).c_str(),
3229 StringToHex(values[i]).c_str());
3230 // we continue after error rather than exiting so that we can
3231 // find more errors if any
3232 }
3233 }
3234
3235 return s;
3236 }
3237
3238 // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
3239 // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
3240 // of the key. Each of these 10 scans returns a series of values;
3241 // each series should be the same length, and it is verified for each
3242 // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V.
3243 // ASSUMES that MultiPut was used to put (K, V)
3244 virtual Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
3245 const std::vector<int>& rand_column_families,
3246 const std::vector<int64_t>& rand_keys) {
3247 std::string key_str = Key(rand_keys[0]);
3248 Slice key = key_str;
3249 auto cfh = column_families_[rand_column_families[0]];
3250 std::string prefixes[10] = {"0", "1", "2", "3", "4",
3251 "5", "6", "7", "8", "9"};
3252 Slice prefix_slices[10];
3253 ReadOptions readoptionscopy[10];
3254 const Snapshot* snapshot = db_->GetSnapshot();
3255 Iterator* iters[10];
3256 std::string upper_bounds[10];
3257 Slice ub_slices[10];
3258 Status s = Status::OK();
3259 for (int i = 0; i < 10; i++) {
3260 prefixes[i] += key.ToString();
3261 prefixes[i].resize(FLAGS_prefix_size);
3262 prefix_slices[i] = Slice(prefixes[i]);
3263 readoptionscopy[i] = readoptions;
3264 readoptionscopy[i].snapshot = snapshot;
3265 if (thread->rand.OneIn(2) &&
3266 GetNextPrefix(prefix_slices[i], &(upper_bounds[i]))) {
3267 // For half of the time, set the upper bound to the next prefix
3268 ub_slices[i] = Slice(upper_bounds[i]);
3269 readoptionscopy[i].iterate_upper_bound = &(ub_slices[i]);
3270 }
3271 iters[i] = db_->NewIterator(readoptionscopy[i], cfh);
3272 iters[i]->Seek(prefix_slices[i]);
3273 }
3274
3275 int count = 0;
3276 while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
3277 count++;
3278 std::string values[10];
3279 // get list of all values for this iteration
3280 for (int i = 0; i < 10; i++) {
3281 // no iterator should finish before the first one
3282 assert(iters[i]->Valid() &&
3283 iters[i]->key().starts_with(prefix_slices[i]));
3284 values[i] = iters[i]->value().ToString();
3285
3286 char expected_first = (prefixes[i])[0];
3287 char actual_first = (values[i])[0];
3288
3289 if (actual_first != expected_first) {
3290 fprintf(stderr, "error expected first = %c actual = %c\n",
3291 expected_first, actual_first);
3292 }
3293 (values[i])[0] = ' '; // blank out the differing character
3294 }
3295 // make sure all values are equivalent
3296 for (int i = 0; i < 10; i++) {
3297 if (values[i] != values[0]) {
3298 fprintf(stderr, "error : %d, inconsistent values for prefix %s: %s, %s\n",
3299 i, prefixes[i].c_str(), StringToHex(values[0]).c_str(),
3300 StringToHex(values[i]).c_str());
3301 // we continue after error rather than exiting so that we can
3302 // find more errors if any
3303 }
3304 iters[i]->Next();
3305 }
3306 }
3307
3308 // cleanup iterators and snapshot
3309 for (int i = 0; i < 10; i++) {
3310 // if the first iterator finished, they should have all finished
3311 assert(!iters[i]->Valid() ||
3312 !iters[i]->key().starts_with(prefix_slices[i]));
3313 assert(iters[i]->status().ok());
3314 delete iters[i];
3315 }
3316 db_->ReleaseSnapshot(snapshot);
3317
3318 if (s.ok()) {
3319 thread->stats.AddPrefixes(1, count);
3320 } else {
3321 thread->stats.AddErrors(1);
3322 }
3323
3324 return s;
3325 }
3326
3327 virtual void VerifyDb(ThreadState* /* thread */) const {}
3328 };
3329
3330 } // namespace rocksdb
3331
3332 int main(int argc, char** argv) {
3333 SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
3334 " [OPTIONS]...");
3335 ParseCommandLineFlags(&argc, &argv, true);
3336
3337 if (FLAGS_statistics) {
3338 dbstats = rocksdb::CreateDBStatistics();
3339 }
3340 FLAGS_compression_type_e =
3341 StringToCompressionType(FLAGS_compression_type.c_str());
3342 FLAGS_checksum_type_e = StringToChecksumType(FLAGS_checksum_type.c_str());
3343 if (!FLAGS_hdfs.empty()) {
3344 FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs);
3345 }
3346 FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
3347
3348 // The number of background threads should be at least as much the
3349 // max number of concurrent compactions.
3350 FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
3351 FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
3352 rocksdb::Env::Priority::BOTTOM);
3353 if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size <= 0) {
3354 fprintf(stderr,
3355 "Error: prefixpercent is non-zero while prefix_size is "
3356 "not positive!\n");
3357 exit(1);
3358 }
3359 if (FLAGS_test_batches_snapshots && FLAGS_prefix_size <= 0) {
3360 fprintf(stderr,
3361 "Error: please specify prefix_size for "
3362 "test_batches_snapshots test!\n");
3363 exit(1);
3364 }
3365 if (FLAGS_memtable_prefix_bloom_size_ratio > 0.0 && FLAGS_prefix_size <= 0) {
3366 fprintf(stderr,
3367 "Error: please specify positive prefix_size in order to use "
3368 "memtable_prefix_bloom_size_ratio\n");
3369 exit(1);
3370 }
3371 if ((FLAGS_readpercent + FLAGS_prefixpercent +
3372 FLAGS_writepercent + FLAGS_delpercent + FLAGS_delrangepercent +
3373 FLAGS_iterpercent) != 100) {
3374 fprintf(stderr,
3375 "Error: Read+Prefix+Write+Delete+DeleteRange+Iterate percents != "
3376 "100!\n");
3377 exit(1);
3378 }
3379 if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) {
3380 fprintf(stderr, "Error: Db cannot reopen safely with disable_wal set!\n");
3381 exit(1);
3382 }
3383 if ((unsigned)FLAGS_reopen >= FLAGS_ops_per_thread) {
3384 fprintf(stderr,
3385 "Error: #DB-reopens should be < ops_per_thread\n"
3386 "Provided reopens = %d and ops_per_thread = %lu\n",
3387 FLAGS_reopen,
3388 (unsigned long)FLAGS_ops_per_thread);
3389 exit(1);
3390 }
3391 if (FLAGS_test_batches_snapshots && FLAGS_delrangepercent > 0) {
3392 fprintf(stderr, "Error: nonzero delrangepercent unsupported in "
3393 "test_batches_snapshots mode\n");
3394 exit(1);
3395 }
3396 if (FLAGS_active_width > FLAGS_max_key) {
3397 fprintf(stderr, "Error: active_width can be at most max_key\n");
3398 exit(1);
3399 } else if (FLAGS_active_width == 0) {
3400 FLAGS_active_width = FLAGS_max_key;
3401 }
3402 if (FLAGS_value_size_mult * kRandomValueMaxFactor > kValueMaxLen) {
3403 fprintf(stderr, "Error: value_size_mult can be at most %d\n",
3404 kValueMaxLen / kRandomValueMaxFactor);
3405 exit(1);
3406 }
3407 if (FLAGS_use_merge && FLAGS_nooverwritepercent == 100) {
3408 fprintf(
3409 stderr,
3410 "Error: nooverwritepercent must not be 100 when using merge operands");
3411 exit(1);
3412 }
3413 if (FLAGS_ingest_external_file_one_in > 0 && FLAGS_nooverwritepercent > 0) {
3414 fprintf(stderr,
3415 "Error: nooverwritepercent must be 0 when using file ingestion\n");
3416 exit(1);
3417 }
3418
3419 // Choose a location for the test database if none given with --db=<path>
3420 if (FLAGS_db.empty()) {
3421 std::string default_db_path;
3422 rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
3423 default_db_path += "/dbstress";
3424 FLAGS_db = default_db_path;
3425 }
3426
3427 rocksdb_kill_odds = FLAGS_kill_random_test;
3428 rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
3429
3430 std::unique_ptr<rocksdb::StressTest> stress;
3431 if (FLAGS_test_batches_snapshots) {
3432 stress.reset(new rocksdb::BatchedOpsStressTest());
3433 } else {
3434 stress.reset(new rocksdb::NonBatchedOpsStressTest());
3435 }
3436 if (stress->Run()) {
3437 return 0;
3438 } else {
3439 return 1;
3440 }
3441 }
3442
3443 #endif // GFLAGS