]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | // | |
10 | // The test uses an array to compare against values written to the database. | |
11 | // Keys written to the array are in 1:1 correspondence to the actual values in | |
12 | // the database according to the formula in the function GenerateValue. | |
13 | ||
14 | // Space is reserved in the array from 0 to FLAGS_max_key and values are | |
15 | // randomly written/deleted/read from those positions. During verification we | |
16 | // compare all the positions in the array. To shorten/elongate the running | |
17 | // time, you could change the settings: FLAGS_max_key, FLAGS_ops_per_thread, | |
18 | // (sometimes also FLAGS_threads). | |
19 | // | |
20 | // NOTE that if FLAGS_test_batches_snapshots is set, the test will have | |
21 | // different behavior. See comment of the flag for details. | |
22 | ||
23 | #ifndef GFLAGS | |
24 | #include <cstdio> | |
25 | int main() { | |
26 | fprintf(stderr, "Please install gflags to run rocksdb tools\n"); | |
27 | return 1; | |
28 | } | |
29 | #else | |
30 | ||
11fdf7f2 | 31 | #ifndef __STDC_FORMAT_MACROS |
7c673cae | 32 | #define __STDC_FORMAT_MACROS |
11fdf7f2 TL |
33 | #endif // __STDC_FORMAT_MACROS |
34 | ||
35 | #include <fcntl.h> | |
7c673cae FG |
36 | #include <inttypes.h> |
37 | #include <stdio.h> | |
38 | #include <stdlib.h> | |
39 | #include <sys/types.h> | |
40 | #include <algorithm> | |
41 | #include <chrono> | |
42 | #include <exception> | |
11fdf7f2 | 43 | #include <queue> |
7c673cae FG |
44 | #include <thread> |
45 | ||
7c673cae FG |
46 | #include "db/db_impl.h" |
47 | #include "db/version_set.h" | |
48 | #include "hdfs/env_hdfs.h" | |
49 | #include "monitoring/histogram.h" | |
11fdf7f2 | 50 | #include "options/options_helper.h" |
7c673cae FG |
51 | #include "port/port.h" |
52 | #include "rocksdb/cache.h" | |
53 | #include "rocksdb/env.h" | |
54 | #include "rocksdb/slice.h" | |
55 | #include "rocksdb/slice_transform.h" | |
56 | #include "rocksdb/statistics.h" | |
11fdf7f2 TL |
57 | #include "rocksdb/utilities/backupable_db.h" |
58 | #include "rocksdb/utilities/checkpoint.h" | |
7c673cae | 59 | #include "rocksdb/utilities/db_ttl.h" |
11fdf7f2 TL |
60 | #include "rocksdb/utilities/options_util.h" |
61 | #include "rocksdb/utilities/transaction.h" | |
62 | #include "rocksdb/utilities/transaction_db.h" | |
7c673cae FG |
63 | #include "rocksdb/write_batch.h" |
64 | #include "util/coding.h" | |
65 | #include "util/compression.h" | |
66 | #include "util/crc32c.h" | |
11fdf7f2 | 67 | #include "util/gflags_compat.h" |
7c673cae FG |
68 | #include "util/logging.h" |
69 | #include "util/mutexlock.h" | |
70 | #include "util/random.h" | |
71 | #include "util/string_util.h" | |
11fdf7f2 TL |
72 | // SyncPoint is not supported in Released Windows Mode. |
73 | #if !(defined NDEBUG) || !defined(OS_WIN) | |
74 | #include "util/sync_point.h" | |
75 | #endif // !(defined NDEBUG) || !defined(OS_WIN) | |
7c673cae | 76 | #include "util/testutil.h" |
11fdf7f2 | 77 | |
7c673cae FG |
78 | #include "utilities/merge_operators.h" |
79 | ||
11fdf7f2 TL |
80 | using GFLAGS_NAMESPACE::ParseCommandLineFlags; |
81 | using GFLAGS_NAMESPACE::RegisterFlagValidator; | |
82 | using GFLAGS_NAMESPACE::SetUsageMessage; | |
7c673cae FG |
83 | |
84 | static const long KB = 1024; | |
11fdf7f2 TL |
85 | static const int kRandomValueMaxFactor = 3; |
86 | static const int kValueMaxLen = 100; | |
7c673cae FG |
87 | |
88 | static bool ValidateUint32Range(const char* flagname, uint64_t value) { | |
89 | if (value > std::numeric_limits<uint32_t>::max()) { | |
90 | fprintf(stderr, | |
91 | "Invalid value for --%s: %lu, overflow\n", | |
92 | flagname, | |
93 | (unsigned long)value); | |
94 | return false; | |
95 | } | |
96 | return true; | |
97 | } | |
98 | ||
99 | DEFINE_uint64(seed, 2341234, "Seed for PRNG"); | |
11fdf7f2 | 100 | static const bool FLAGS_seed_dummy __attribute__((__unused__)) = |
7c673cae FG |
101 | RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range); |
102 | ||
494da23a TL |
103 | DEFINE_bool(read_only, false, "True if open DB in read-only mode during tests"); |
104 | ||
7c673cae FG |
105 | DEFINE_int64(max_key, 1 * KB* KB, |
106 | "Max number of key/values to place in database"); | |
107 | ||
108 | DEFINE_int32(column_families, 10, "Number of column families"); | |
109 | ||
11fdf7f2 TL |
110 | DEFINE_string( |
111 | options_file, "", | |
112 | "The path to a RocksDB options file. If specified, then db_stress will " | |
113 | "run with the RocksDB options in the default column family of the " | |
114 | "specified options file. Note that, when an options file is provided, " | |
115 | "db_stress will ignore the flag values for all options that may be passed " | |
116 | "via options file."); | |
117 | ||
118 | DEFINE_int64( | |
119 | active_width, 0, | |
120 | "Number of keys in active span of the key-range at any given time. The " | |
121 | "span begins with its left endpoint at key 0, gradually moves rightwards, " | |
122 | "and ends with its right endpoint at max_key. If set to 0, active_width " | |
123 | "will be sanitized to be equal to max_key."); | |
124 | ||
7c673cae FG |
125 | // TODO(noetzli) Add support for single deletes |
126 | DEFINE_bool(test_batches_snapshots, false, | |
127 | "If set, the test uses MultiGet(), MultiPut() and MultiDelete()" | |
128 | " which read/write/delete multiple keys in a batch. In this mode," | |
129 | " we do not verify db content by comparing the content with the " | |
130 | "pre-allocated array. Instead, we do partial verification inside" | |
131 | " MultiGet() by checking various values in a batch. Benefit of" | |
132 | " this mode:\n" | |
133 | "\t(a) No need to acquire mutexes during writes (less cache " | |
134 | "flushes in multi-core leading to speed up)\n" | |
135 | "\t(b) No long validation at the end (more speed up)\n" | |
136 | "\t(c) Test snapshot and atomicity of batch writes"); | |
137 | ||
494da23a TL |
138 | DEFINE_bool(atomic_flush, false, |
139 | "If set, enables atomic flush in the options.\n"); | |
140 | ||
141 | DEFINE_bool(test_atomic_flush, false, | |
142 | "If set, runs the stress test dedicated to verifying atomic flush " | |
143 | "functionality. Setting this implies `atomic_flush=true`.\n"); | |
144 | ||
7c673cae FG |
145 | DEFINE_int32(threads, 32, "Number of concurrent threads to run."); |
146 | ||
147 | DEFINE_int32(ttl, -1, | |
148 | "Opens the db with this ttl value if this is not -1. " | |
149 | "Carefully specify a large value such that verifications on " | |
150 | "deleted values don't fail"); | |
151 | ||
152 | DEFINE_int32(value_size_mult, 8, | |
153 | "Size of value will be this number times rand_int(1,3) bytes"); | |
154 | ||
155 | DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size"); | |
156 | ||
11fdf7f2 TL |
157 | DEFINE_bool(enable_pipelined_write, false, "Pipeline WAL/memtable writes"); |
158 | ||
7c673cae FG |
159 | DEFINE_bool(verify_before_write, false, "Verify before write"); |
160 | ||
161 | DEFINE_bool(histogram, false, "Print histogram of operation timings"); | |
162 | ||
163 | DEFINE_bool(destroy_db_initially, true, | |
164 | "Destroys the database dir before start if this is true"); | |
165 | ||
166 | DEFINE_bool(verbose, false, "Verbose"); | |
167 | ||
168 | DEFINE_bool(progress_reports, true, | |
169 | "If true, db_stress will report number of finished operations"); | |
170 | ||
171 | DEFINE_uint64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size, | |
172 | "Number of bytes to buffer in all memtables before compacting"); | |
173 | ||
174 | DEFINE_int32(write_buffer_size, | |
175 | static_cast<int32_t>(rocksdb::Options().write_buffer_size), | |
176 | "Number of bytes to buffer in memtable before compacting"); | |
177 | ||
178 | DEFINE_int32(max_write_buffer_number, | |
179 | rocksdb::Options().max_write_buffer_number, | |
180 | "The number of in-memory memtables. " | |
181 | "Each memtable is of size FLAGS_write_buffer_size."); | |
182 | ||
183 | DEFINE_int32(min_write_buffer_number_to_merge, | |
184 | rocksdb::Options().min_write_buffer_number_to_merge, | |
185 | "The minimum number of write buffers that will be merged together " | |
186 | "before writing to storage. This is cheap because it is an " | |
187 | "in-memory merge. If this feature is not enabled, then all these " | |
188 | "write buffers are flushed to L0 as separate files and this " | |
189 | "increases read amplification because a get request has to check " | |
190 | "in all of these files. Also, an in-memory merge may result in " | |
191 | "writing less data to storage if there are duplicate records in" | |
192 | " each of these individual write buffers."); | |
193 | ||
194 | DEFINE_int32(max_write_buffer_number_to_maintain, | |
195 | rocksdb::Options().max_write_buffer_number_to_maintain, | |
196 | "The total maximum number of write buffers to maintain in memory " | |
197 | "including copies of buffers that have already been flushed. " | |
198 | "Unlike max_write_buffer_number, this parameter does not affect " | |
199 | "flushing. This controls the minimum amount of write history " | |
200 | "that will be available in memory for conflict checking when " | |
201 | "Transactions are used. If this value is too low, some " | |
202 | "transactions may fail at commit time due to not being able to " | |
203 | "determine whether there were any write conflicts. Setting this " | |
204 | "value to 0 will cause write buffers to be freed immediately " | |
205 | "after they are flushed. If this value is set to -1, " | |
206 | "'max_write_buffer_number' will be used."); | |
207 | ||
11fdf7f2 TL |
208 | DEFINE_double(memtable_prefix_bloom_size_ratio, |
209 | rocksdb::Options().memtable_prefix_bloom_size_ratio, | |
210 | "creates prefix blooms for memtables, each with size " | |
211 | "`write_buffer_size * memtable_prefix_bloom_size_ratio`."); | |
212 | ||
494da23a TL |
213 | DEFINE_bool(memtable_whole_key_filtering, |
214 | rocksdb::Options().memtable_whole_key_filtering, | |
215 | "Enable whole key filtering in memtables."); | |
216 | ||
7c673cae FG |
217 | DEFINE_int32(open_files, rocksdb::Options().max_open_files, |
218 | "Maximum number of files to keep open at the same time " | |
219 | "(use default if == 0)"); | |
220 | ||
221 | DEFINE_int64(compressed_cache_size, -1, | |
222 | "Number of bytes to use as a cache of compressed data." | |
223 | " Negative means use default settings."); | |
224 | ||
225 | DEFINE_int32(compaction_style, rocksdb::Options().compaction_style, ""); | |
226 | ||
227 | DEFINE_int32(level0_file_num_compaction_trigger, | |
228 | rocksdb::Options().level0_file_num_compaction_trigger, | |
229 | "Level0 compaction start trigger"); | |
230 | ||
231 | DEFINE_int32(level0_slowdown_writes_trigger, | |
232 | rocksdb::Options().level0_slowdown_writes_trigger, | |
233 | "Number of files in level-0 that will slow down writes"); | |
234 | ||
235 | DEFINE_int32(level0_stop_writes_trigger, | |
236 | rocksdb::Options().level0_stop_writes_trigger, | |
237 | "Number of files in level-0 that will trigger put stop."); | |
238 | ||
239 | DEFINE_int32(block_size, | |
240 | static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size), | |
241 | "Number of bytes in a block."); | |
242 | ||
11fdf7f2 TL |
243 | DEFINE_int32( |
244 | format_version, | |
245 | static_cast<int32_t>(rocksdb::BlockBasedTableOptions().format_version), | |
246 | "Format version of SST files."); | |
247 | ||
248 | DEFINE_int32(index_block_restart_interval, | |
249 | rocksdb::BlockBasedTableOptions().index_block_restart_interval, | |
250 | "Number of keys between restart points " | |
251 | "for delta encoding of keys in index block."); | |
252 | ||
7c673cae FG |
253 | DEFINE_int32(max_background_compactions, |
254 | rocksdb::Options().max_background_compactions, | |
255 | "The maximum number of concurrent background compactions " | |
256 | "that can occur in parallel."); | |
257 | ||
11fdf7f2 TL |
258 | DEFINE_int32(num_bottom_pri_threads, 0, |
259 | "The number of threads in the bottom-priority thread pool (used " | |
260 | "by universal compaction only)."); | |
261 | ||
7c673cae FG |
262 | DEFINE_int32(compaction_thread_pool_adjust_interval, 0, |
263 | "The interval (in milliseconds) to adjust compaction thread pool " | |
264 | "size. Don't change it periodically if the value is 0."); | |
265 | ||
266 | DEFINE_int32(compaction_thread_pool_variations, 2, | |
267 | "Range of background thread pool size variations when adjusted " | |
268 | "periodically."); | |
269 | ||
270 | DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes, | |
271 | "The maximum number of concurrent background flushes " | |
272 | "that can occur in parallel."); | |
273 | ||
274 | DEFINE_int32(universal_size_ratio, 0, "The ratio of file sizes that trigger" | |
275 | " compaction in universal style"); | |
276 | ||
277 | DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files to " | |
278 | "compact in universal style compaction"); | |
279 | ||
280 | DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact" | |
281 | " in universal style compaction"); | |
282 | ||
283 | DEFINE_int32(universal_max_size_amplification_percent, 0, | |
284 | "The max size amplification for universal style compaction"); | |
285 | ||
286 | DEFINE_int32(clear_column_family_one_in, 1000000, | |
287 | "With a chance of 1/N, delete a column family and then recreate " | |
288 | "it again. If N == 0, never drop/create column families. " | |
289 | "When test_batches_snapshots is true, this flag has no effect"); | |
290 | ||
291 | DEFINE_int32(set_options_one_in, 0, | |
292 | "With a chance of 1/N, change some random options"); | |
293 | ||
294 | DEFINE_int32(set_in_place_one_in, 0, | |
295 | "With a chance of 1/N, toggle in place support option"); | |
296 | ||
297 | DEFINE_int64(cache_size, 2LL * KB * KB * KB, | |
298 | "Number of bytes to use as a cache of uncompressed data."); | |
299 | ||
300 | DEFINE_bool(use_clock_cache, false, | |
301 | "Replace default LRU block cache with clock cache."); | |
302 | ||
303 | DEFINE_uint64(subcompactions, 1, | |
304 | "Maximum number of subcompactions to divide L0-L1 compactions " | |
305 | "into."); | |
306 | ||
11fdf7f2 | 307 | DEFINE_bool(allow_concurrent_memtable_write, false, |
7c673cae FG |
308 | "Allow multi-writers to update mem tables in parallel."); |
309 | ||
310 | DEFINE_bool(enable_write_thread_adaptive_yield, true, | |
311 | "Use a yielding spin loop for brief writer thread waits."); | |
312 | ||
11fdf7f2 | 313 | static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) = |
7c673cae FG |
314 | RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range); |
315 | ||
316 | static bool ValidateInt32Positive(const char* flagname, int32_t value) { | |
317 | if (value < 0) { | |
318 | fprintf(stderr, "Invalid value for --%s: %d, must be >=0\n", | |
319 | flagname, value); | |
320 | return false; | |
321 | } | |
322 | return true; | |
323 | } | |
324 | DEFINE_int32(reopen, 10, "Number of times database reopens"); | |
11fdf7f2 | 325 | static const bool FLAGS_reopen_dummy __attribute__((__unused__)) = |
7c673cae FG |
326 | RegisterFlagValidator(&FLAGS_reopen, &ValidateInt32Positive); |
327 | ||
328 | DEFINE_int32(bloom_bits, 10, "Bloom filter bits per key. " | |
329 | "Negative means use default settings."); | |
330 | ||
331 | DEFINE_bool(use_block_based_filter, false, "use block based filter" | |
332 | "instead of full filter for block based table"); | |
333 | ||
334 | DEFINE_string(db, "", "Use the db with the following name."); | |
335 | ||
11fdf7f2 TL |
336 | DEFINE_string( |
337 | expected_values_path, "", | |
338 | "File where the array of expected uint32_t values will be stored. If " | |
339 | "provided and non-empty, the DB state will be verified against these " | |
340 | "values after recovery. --max_key and --column_family must be kept the " | |
341 | "same across invocations of this program that use the same " | |
342 | "--expected_values_path."); | |
343 | ||
7c673cae FG |
344 | DEFINE_bool(verify_checksum, false, |
345 | "Verify checksum for every block read from storage"); | |
346 | ||
347 | DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads, | |
348 | "Allow reads to occur via mmap-ing files"); | |
349 | ||
350 | DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes, | |
351 | "Allow writes to occur via mmap-ing files"); | |
352 | ||
353 | DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads, | |
354 | "Use O_DIRECT for reading data"); | |
355 | ||
356 | DEFINE_bool(use_direct_io_for_flush_and_compaction, | |
357 | rocksdb::Options().use_direct_io_for_flush_and_compaction, | |
358 | "Use O_DIRECT for writing data"); | |
359 | ||
360 | // Database statistics | |
361 | static std::shared_ptr<rocksdb::Statistics> dbstats; | |
362 | DEFINE_bool(statistics, false, "Create database statistics"); | |
363 | ||
364 | DEFINE_bool(sync, false, "Sync all writes to disk"); | |
365 | ||
366 | DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync"); | |
367 | ||
368 | DEFINE_int32(kill_random_test, 0, | |
369 | "If non-zero, kill at various points in source code with " | |
370 | "probability 1/this"); | |
11fdf7f2 | 371 | static const bool FLAGS_kill_random_test_dummy __attribute__((__unused__)) = |
7c673cae FG |
372 | RegisterFlagValidator(&FLAGS_kill_random_test, &ValidateInt32Positive); |
373 | extern int rocksdb_kill_odds; | |
374 | ||
375 | DEFINE_string(kill_prefix_blacklist, "", | |
376 | "If non-empty, kill points with prefix in the list given will be" | |
377 | " skipped. Items are comma-separated."); | |
378 | extern std::vector<std::string> rocksdb_kill_prefix_blacklist; | |
379 | ||
380 | DEFINE_bool(disable_wal, false, "If true, do not write WAL for write."); | |
381 | ||
494da23a TL |
382 | DEFINE_uint64(recycle_log_file_num, rocksdb::Options().recycle_log_file_num, |
383 | "Number of old WAL files to keep around for later recycling"); | |
384 | ||
11fdf7f2 | 385 | DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base, |
7c673cae FG |
386 | "Target level-1 file size for compaction"); |
387 | ||
388 | DEFINE_int32(target_file_size_multiplier, 1, | |
389 | "A multiplier to compute target level-N file size (N >= 2)"); | |
390 | ||
11fdf7f2 TL |
391 | DEFINE_uint64(max_bytes_for_level_base, |
392 | rocksdb::Options().max_bytes_for_level_base, | |
393 | "Max bytes for level-1"); | |
7c673cae FG |
394 | |
395 | DEFINE_double(max_bytes_for_level_multiplier, 2, | |
396 | "A multiplier to compute max bytes for level-N (N >= 2)"); | |
397 | ||
398 | DEFINE_int32(range_deletion_width, 10, | |
399 | "The width of the range deletion intervals."); | |
400 | ||
11fdf7f2 TL |
401 | DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); |
402 | ||
403 | DEFINE_bool(rate_limit_bg_reads, false, | |
404 | "Use options.rate_limiter on compaction reads"); | |
405 | ||
406 | DEFINE_bool(use_txn, false, | |
407 | "Use TransactionDB. Currently the default write policy is " | |
408 | "TxnDBWritePolicy::WRITE_PREPARED"); | |
409 | ||
410 | DEFINE_int32(backup_one_in, 0, | |
411 | "If non-zero, then CreateNewBackup() will be called once for " | |
412 | "every N operations on average. 0 indicates CreateNewBackup() " | |
413 | "is disabled."); | |
414 | ||
415 | DEFINE_int32(checkpoint_one_in, 0, | |
416 | "If non-zero, then CreateCheckpoint() will be called once for " | |
417 | "every N operations on average. 0 indicates CreateCheckpoint() " | |
418 | "is disabled."); | |
419 | ||
420 | DEFINE_int32(ingest_external_file_one_in, 0, | |
421 | "If non-zero, then IngestExternalFile() will be called once for " | |
422 | "every N operations on average. 0 indicates IngestExternalFile() " | |
423 | "is disabled."); | |
424 | ||
425 | DEFINE_int32(ingest_external_file_width, 1000, | |
426 | "The width of the ingested external files."); | |
427 | ||
7c673cae | 428 | DEFINE_int32(compact_files_one_in, 0, |
11fdf7f2 TL |
429 | "If non-zero, then CompactFiles() will be called once for every N " |
430 | "operations on average. 0 indicates CompactFiles() is disabled."); | |
431 | ||
432 | DEFINE_int32(compact_range_one_in, 0, | |
433 | "If non-zero, then CompactRange() will be called once for every N " | |
434 | "operations on average. 0 indicates CompactRange() is disabled."); | |
435 | ||
436 | DEFINE_int32(flush_one_in, 0, | |
437 | "If non-zero, then Flush() will be called once for every N ops " | |
438 | "on average. 0 indicates calls to Flush() are disabled."); | |
439 | ||
440 | DEFINE_int32(compact_range_width, 10000, | |
441 | "The width of the ranges passed to CompactRange()."); | |
442 | ||
443 | DEFINE_int32(acquire_snapshot_one_in, 0, | |
444 | "If non-zero, then acquires a snapshot once every N operations on " | |
445 | "average."); | |
446 | ||
447 | DEFINE_bool(compare_full_db_state_snapshot, false, | |
448 | "If set we compare state of entire db (in one of the threads) with" | |
449 | "each snapshot."); | |
450 | ||
451 | DEFINE_uint64(snapshot_hold_ops, 0, | |
452 | "If non-zero, then releases snapshots N operations after they're " | |
453 | "acquired."); | |
7c673cae FG |
454 | |
455 | static bool ValidateInt32Percent(const char* flagname, int32_t value) { | |
456 | if (value < 0 || value>100) { | |
457 | fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n", | |
458 | flagname, value); | |
459 | return false; | |
460 | } | |
461 | return true; | |
462 | } | |
463 | ||
464 | DEFINE_int32(readpercent, 10, | |
465 | "Ratio of reads to total workload (expressed as a percentage)"); | |
11fdf7f2 | 466 | static const bool FLAGS_readpercent_dummy __attribute__((__unused__)) = |
7c673cae FG |
467 | RegisterFlagValidator(&FLAGS_readpercent, &ValidateInt32Percent); |
468 | ||
469 | DEFINE_int32(prefixpercent, 20, | |
470 | "Ratio of prefix iterators to total workload (expressed as a" | |
471 | " percentage)"); | |
11fdf7f2 | 472 | static const bool FLAGS_prefixpercent_dummy __attribute__((__unused__)) = |
7c673cae FG |
473 | RegisterFlagValidator(&FLAGS_prefixpercent, &ValidateInt32Percent); |
474 | ||
475 | DEFINE_int32(writepercent, 45, | |
476 | "Ratio of writes to total workload (expressed as a percentage)"); | |
11fdf7f2 | 477 | static const bool FLAGS_writepercent_dummy __attribute__((__unused__)) = |
7c673cae FG |
478 | RegisterFlagValidator(&FLAGS_writepercent, &ValidateInt32Percent); |
479 | ||
480 | DEFINE_int32(delpercent, 15, | |
481 | "Ratio of deletes to total workload (expressed as a percentage)"); | |
11fdf7f2 | 482 | static const bool FLAGS_delpercent_dummy __attribute__((__unused__)) = |
7c673cae FG |
483 | RegisterFlagValidator(&FLAGS_delpercent, &ValidateInt32Percent); |
484 | ||
485 | DEFINE_int32(delrangepercent, 0, | |
486 | "Ratio of range deletions to total workload (expressed as a " | |
487 | "percentage). Cannot be used with test_batches_snapshots"); | |
11fdf7f2 | 488 | static const bool FLAGS_delrangepercent_dummy __attribute__((__unused__)) = |
7c673cae FG |
489 | RegisterFlagValidator(&FLAGS_delrangepercent, &ValidateInt32Percent); |
490 | ||
491 | DEFINE_int32(nooverwritepercent, 60, | |
492 | "Ratio of keys without overwrite to total workload (expressed as " | |
493 | " a percentage)"); | |
494 | static const bool FLAGS_nooverwritepercent_dummy __attribute__((__unused__)) = | |
495 | RegisterFlagValidator(&FLAGS_nooverwritepercent, &ValidateInt32Percent); | |
496 | ||
497 | DEFINE_int32(iterpercent, 10, "Ratio of iterations to total workload" | |
498 | " (expressed as a percentage)"); | |
11fdf7f2 | 499 | static const bool FLAGS_iterpercent_dummy __attribute__((__unused__)) = |
7c673cae FG |
500 | RegisterFlagValidator(&FLAGS_iterpercent, &ValidateInt32Percent); |
501 | ||
502 | DEFINE_uint64(num_iterations, 10, "Number of iterations per MultiIterate run"); | |
11fdf7f2 | 503 | static const bool FLAGS_num_iterations_dummy __attribute__((__unused__)) = |
7c673cae FG |
504 | RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range); |
505 | ||
506 | namespace { | |
507 | enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { | |
508 | assert(ctype); | |
509 | ||
510 | if (!strcasecmp(ctype, "none")) | |
511 | return rocksdb::kNoCompression; | |
512 | else if (!strcasecmp(ctype, "snappy")) | |
513 | return rocksdb::kSnappyCompression; | |
514 | else if (!strcasecmp(ctype, "zlib")) | |
515 | return rocksdb::kZlibCompression; | |
516 | else if (!strcasecmp(ctype, "bzip2")) | |
517 | return rocksdb::kBZip2Compression; | |
518 | else if (!strcasecmp(ctype, "lz4")) | |
519 | return rocksdb::kLZ4Compression; | |
520 | else if (!strcasecmp(ctype, "lz4hc")) | |
521 | return rocksdb::kLZ4HCCompression; | |
522 | else if (!strcasecmp(ctype, "xpress")) | |
523 | return rocksdb::kXpressCompression; | |
524 | else if (!strcasecmp(ctype, "zstd")) | |
525 | return rocksdb::kZSTD; | |
526 | ||
11fdf7f2 | 527 | fprintf(stderr, "Cannot parse compression type '%s'\n", ctype); |
7c673cae FG |
528 | return rocksdb::kSnappyCompression; //default value |
529 | } | |
530 | ||
11fdf7f2 TL |
531 | enum rocksdb::ChecksumType StringToChecksumType(const char* ctype) { |
532 | assert(ctype); | |
533 | auto iter = rocksdb::checksum_type_string_map.find(ctype); | |
534 | if (iter != rocksdb::checksum_type_string_map.end()) { | |
535 | return iter->second; | |
536 | } | |
537 | fprintf(stderr, "Cannot parse checksum type '%s'\n", ctype); | |
538 | return rocksdb::kCRC32c; | |
539 | } | |
540 | ||
541 | std::string ChecksumTypeToString(rocksdb::ChecksumType ctype) { | |
542 | auto iter = std::find_if( | |
543 | rocksdb::checksum_type_string_map.begin(), | |
544 | rocksdb::checksum_type_string_map.end(), | |
545 | [&](const std::pair<std::string, rocksdb::ChecksumType>& | |
546 | name_and_enum_val) { return name_and_enum_val.second == ctype; }); | |
547 | assert(iter != rocksdb::checksum_type_string_map.end()); | |
548 | return iter->first; | |
549 | } | |
550 | ||
7c673cae FG |
551 | std::vector<std::string> SplitString(std::string src) { |
552 | std::vector<std::string> ret; | |
553 | if (src.empty()) { | |
554 | return ret; | |
555 | } | |
556 | size_t pos = 0; | |
557 | size_t pos_comma; | |
558 | while ((pos_comma = src.find(',', pos)) != std::string::npos) { | |
559 | ret.push_back(src.substr(pos, pos_comma - pos)); | |
560 | pos = pos_comma + 1; | |
561 | } | |
562 | ret.push_back(src.substr(pos, src.length())); | |
563 | return ret; | |
564 | } | |
565 | } // namespace | |
566 | ||
567 | DEFINE_string(compression_type, "snappy", | |
568 | "Algorithm to use to compress the database"); | |
569 | static enum rocksdb::CompressionType FLAGS_compression_type_e = | |
570 | rocksdb::kSnappyCompression; | |
571 | ||
11fdf7f2 TL |
572 | DEFINE_int32(compression_max_dict_bytes, 0, |
573 | "Maximum size of dictionary used to prime the compression " | |
574 | "library."); | |
575 | ||
576 | DEFINE_int32(compression_zstd_max_train_bytes, 0, | |
577 | "Maximum size of training data passed to zstd's dictionary " | |
578 | "trainer."); | |
579 | ||
580 | DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks"); | |
581 | static enum rocksdb::ChecksumType FLAGS_checksum_type_e = rocksdb::kCRC32c; | |
582 | ||
7c673cae FG |
583 | DEFINE_string(hdfs, "", "Name of hdfs environment"); |
584 | // posix or hdfs environment | |
585 | static rocksdb::Env* FLAGS_env = rocksdb::Env::Default(); | |
586 | ||
587 | DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread."); | |
11fdf7f2 | 588 | static const bool FLAGS_ops_per_thread_dummy __attribute__((__unused__)) = |
7c673cae FG |
589 | RegisterFlagValidator(&FLAGS_ops_per_thread, &ValidateUint32Range); |
590 | ||
591 | DEFINE_uint64(log2_keys_per_lock, 2, "Log2 of number of keys per lock"); | |
11fdf7f2 | 592 | static const bool FLAGS_log2_keys_per_lock_dummy __attribute__((__unused__)) = |
7c673cae FG |
593 | RegisterFlagValidator(&FLAGS_log2_keys_per_lock, &ValidateUint32Range); |
594 | ||
11fdf7f2 TL |
595 | DEFINE_uint64(max_manifest_file_size, 16384, "Maximum size of a MANIFEST file"); |
596 | ||
7c673cae FG |
597 | DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable"); |
598 | ||
599 | enum RepFactory { | |
600 | kSkipList, | |
601 | kHashSkipList, | |
602 | kVectorRep | |
603 | }; | |
604 | ||
605 | namespace { | |
606 | enum RepFactory StringToRepFactory(const char* ctype) { | |
607 | assert(ctype); | |
608 | ||
609 | if (!strcasecmp(ctype, "skip_list")) | |
610 | return kSkipList; | |
611 | else if (!strcasecmp(ctype, "prefix_hash")) | |
612 | return kHashSkipList; | |
613 | else if (!strcasecmp(ctype, "vector")) | |
614 | return kVectorRep; | |
615 | ||
616 | fprintf(stdout, "Cannot parse memreptable %s\n", ctype); | |
617 | return kSkipList; | |
618 | } | |
11fdf7f2 TL |
619 | |
620 | #ifdef _MSC_VER | |
621 | #pragma warning(push) | |
622 | // truncation of constant value on static_cast | |
623 | #pragma warning(disable : 4309) | |
624 | #endif | |
625 | bool GetNextPrefix(const rocksdb::Slice& src, std::string* v) { | |
626 | std::string ret = src.ToString(); | |
627 | for (int i = static_cast<int>(ret.size()) - 1; i >= 0; i--) { | |
628 | if (ret[i] != static_cast<char>(255)) { | |
629 | ret[i] = ret[i] + 1; | |
630 | break; | |
631 | } else if (i != 0) { | |
632 | ret[i] = 0; | |
633 | } else { | |
634 | // all FF. No next prefix | |
635 | return false; | |
636 | } | |
637 | } | |
638 | *v = ret; | |
639 | return true; | |
640 | } | |
641 | #ifdef _MSC_VER | |
642 | #pragma warning(pop) | |
643 | #endif | |
7c673cae FG |
644 | } // namespace |
645 | ||
646 | static enum RepFactory FLAGS_rep_factory; | |
647 | DEFINE_string(memtablerep, "prefix_hash", ""); | |
648 | ||
649 | static bool ValidatePrefixSize(const char* flagname, int32_t value) { | |
650 | if (value < 0 || value > 8) { | |
651 | fprintf(stderr, "Invalid value for --%s: %d. 0 <= PrefixSize <= 8\n", | |
652 | flagname, value); | |
653 | return false; | |
654 | } | |
655 | return true; | |
656 | } | |
657 | DEFINE_int32(prefix_size, 7, "Control the prefix size for HashSkipListRep"); | |
11fdf7f2 | 658 | static const bool FLAGS_prefix_size_dummy __attribute__((__unused__)) = |
7c673cae FG |
659 | RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); |
660 | ||
661 | DEFINE_bool(use_merge, false, "On true, replaces all writes with a Merge " | |
662 | "that behaves like a Put"); | |
663 | ||
664 | DEFINE_bool(use_full_merge_v1, false, | |
665 | "On true, use a merge operator that implement the deprecated " | |
666 | "version of FullMerge"); | |
667 | ||
668 | namespace rocksdb { | |
669 | ||
670 | // convert long to a big-endian slice key | |
671 | static std::string Key(int64_t val) { | |
672 | std::string little_endian_key; | |
673 | std::string big_endian_key; | |
674 | PutFixed64(&little_endian_key, val); | |
675 | assert(little_endian_key.size() == sizeof(val)); | |
676 | big_endian_key.resize(sizeof(val)); | |
677 | for (size_t i = 0 ; i < sizeof(val); ++i) { | |
678 | big_endian_key[i] = little_endian_key[sizeof(val) - 1 - i]; | |
679 | } | |
680 | return big_endian_key; | |
681 | } | |
682 | ||
11fdf7f2 TL |
683 | static bool GetIntVal(std::string big_endian_key, uint64_t *key_p) { |
684 | unsigned int size_key = sizeof(*key_p); | |
685 | assert(big_endian_key.size() == size_key); | |
686 | std::string little_endian_key; | |
687 | little_endian_key.resize(size_key); | |
688 | for (size_t i = 0 ; i < size_key; ++i) { | |
689 | little_endian_key[i] = big_endian_key[size_key - 1 - i]; | |
690 | } | |
691 | Slice little_endian_slice = Slice(little_endian_key); | |
692 | return GetFixed64(&little_endian_slice, key_p); | |
693 | } | |
694 | ||
7c673cae FG |
695 | static std::string StringToHex(const std::string& str) { |
696 | std::string result = "0x"; | |
697 | result.append(Slice(str).ToString(true)); | |
698 | return result; | |
699 | } | |
700 | ||
701 | ||
702 | class StressTest; | |
703 | namespace { | |
704 | ||
705 | class Stats { | |
706 | private: | |
707 | uint64_t start_; | |
708 | uint64_t finish_; | |
709 | double seconds_; | |
710 | long done_; | |
711 | long gets_; | |
712 | long prefixes_; | |
713 | long writes_; | |
714 | long deletes_; | |
715 | size_t single_deletes_; | |
716 | long iterator_size_sums_; | |
717 | long founds_; | |
718 | long iterations_; | |
719 | long range_deletions_; | |
720 | long covered_by_range_deletions_; | |
721 | long errors_; | |
722 | long num_compact_files_succeed_; | |
723 | long num_compact_files_failed_; | |
724 | int next_report_; | |
725 | size_t bytes_; | |
726 | uint64_t last_op_finish_; | |
727 | HistogramImpl hist_; | |
728 | ||
729 | public: | |
730 | Stats() { } | |
731 | ||
732 | void Start() { | |
733 | next_report_ = 100; | |
734 | hist_.Clear(); | |
735 | done_ = 0; | |
736 | gets_ = 0; | |
737 | prefixes_ = 0; | |
738 | writes_ = 0; | |
739 | deletes_ = 0; | |
740 | single_deletes_ = 0; | |
741 | iterator_size_sums_ = 0; | |
742 | founds_ = 0; | |
743 | iterations_ = 0; | |
744 | range_deletions_ = 0; | |
745 | covered_by_range_deletions_ = 0; | |
746 | errors_ = 0; | |
747 | bytes_ = 0; | |
748 | seconds_ = 0; | |
749 | num_compact_files_succeed_ = 0; | |
750 | num_compact_files_failed_ = 0; | |
751 | start_ = FLAGS_env->NowMicros(); | |
752 | last_op_finish_ = start_; | |
753 | finish_ = start_; | |
754 | } | |
755 | ||
756 | void Merge(const Stats& other) { | |
757 | hist_.Merge(other.hist_); | |
758 | done_ += other.done_; | |
759 | gets_ += other.gets_; | |
760 | prefixes_ += other.prefixes_; | |
761 | writes_ += other.writes_; | |
762 | deletes_ += other.deletes_; | |
763 | single_deletes_ += other.single_deletes_; | |
764 | iterator_size_sums_ += other.iterator_size_sums_; | |
765 | founds_ += other.founds_; | |
766 | iterations_ += other.iterations_; | |
767 | range_deletions_ += other.range_deletions_; | |
768 | covered_by_range_deletions_ = other.covered_by_range_deletions_; | |
769 | errors_ += other.errors_; | |
770 | bytes_ += other.bytes_; | |
771 | seconds_ += other.seconds_; | |
772 | num_compact_files_succeed_ += other.num_compact_files_succeed_; | |
773 | num_compact_files_failed_ += other.num_compact_files_failed_; | |
774 | if (other.start_ < start_) start_ = other.start_; | |
775 | if (other.finish_ > finish_) finish_ = other.finish_; | |
776 | } | |
777 | ||
778 | void Stop() { | |
779 | finish_ = FLAGS_env->NowMicros(); | |
780 | seconds_ = (finish_ - start_) * 1e-6; | |
781 | } | |
782 | ||
783 | void FinishedSingleOp() { | |
784 | if (FLAGS_histogram) { | |
785 | auto now = FLAGS_env->NowMicros(); | |
786 | auto micros = now - last_op_finish_; | |
787 | hist_.Add(micros); | |
788 | if (micros > 20000) { | |
789 | fprintf(stdout, "long op: %" PRIu64 " micros%30s\r", micros, ""); | |
790 | } | |
791 | last_op_finish_ = now; | |
792 | } | |
793 | ||
794 | done_++; | |
795 | if (FLAGS_progress_reports) { | |
796 | if (done_ >= next_report_) { | |
797 | if (next_report_ < 1000) next_report_ += 100; | |
798 | else if (next_report_ < 5000) next_report_ += 500; | |
799 | else if (next_report_ < 10000) next_report_ += 1000; | |
800 | else if (next_report_ < 50000) next_report_ += 5000; | |
801 | else if (next_report_ < 100000) next_report_ += 10000; | |
802 | else if (next_report_ < 500000) next_report_ += 50000; | |
803 | else next_report_ += 100000; | |
804 | fprintf(stdout, "... finished %ld ops%30s\r", done_, ""); | |
805 | } | |
806 | } | |
807 | } | |
808 | ||
494da23a | 809 | void AddBytesForWrites(long nwrites, size_t nbytes) { |
7c673cae FG |
810 | writes_ += nwrites; |
811 | bytes_ += nbytes; | |
812 | } | |
813 | ||
494da23a | 814 | void AddGets(long ngets, long nfounds) { |
7c673cae FG |
815 | founds_ += nfounds; |
816 | gets_ += ngets; | |
817 | } | |
818 | ||
494da23a | 819 | void AddPrefixes(long nprefixes, long count) { |
7c673cae FG |
820 | prefixes_ += nprefixes; |
821 | iterator_size_sums_ += count; | |
822 | } | |
823 | ||
494da23a | 824 | void AddIterations(long n) { iterations_ += n; } |
7c673cae | 825 | |
494da23a | 826 | void AddDeletes(long n) { deletes_ += n; } |
7c673cae FG |
827 | |
828 | void AddSingleDeletes(size_t n) { single_deletes_ += n; } | |
829 | ||
494da23a | 830 | void AddRangeDeletions(long n) { range_deletions_ += n; } |
7c673cae | 831 | |
494da23a | 832 | void AddCoveredByRangeDeletions(long n) { covered_by_range_deletions_ += n; } |
7c673cae | 833 | |
494da23a | 834 | void AddErrors(long n) { errors_ += n; } |
7c673cae | 835 | |
494da23a | 836 | void AddNumCompactFilesSucceed(long n) { num_compact_files_succeed_ += n; } |
7c673cae | 837 | |
494da23a | 838 | void AddNumCompactFilesFailed(long n) { num_compact_files_failed_ += n; } |
7c673cae FG |
839 | |
840 | void Report(const char* name) { | |
841 | std::string extra; | |
842 | if (bytes_ < 1 || done_ < 1) { | |
843 | fprintf(stderr, "No writes or ops?\n"); | |
844 | return; | |
845 | } | |
846 | ||
847 | double elapsed = (finish_ - start_) * 1e-6; | |
848 | double bytes_mb = bytes_ / 1048576.0; | |
849 | double rate = bytes_mb / elapsed; | |
850 | double throughput = (double)done_/elapsed; | |
851 | ||
852 | fprintf(stdout, "%-12s: ", name); | |
853 | fprintf(stdout, "%.3f micros/op %ld ops/sec\n", | |
854 | seconds_ * 1e6 / done_, (long)throughput); | |
855 | fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n", | |
856 | "", bytes_mb, rate, (100*writes_)/done_, done_); | |
857 | fprintf(stdout, "%-12s: Wrote %ld times\n", "", writes_); | |
858 | fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_); | |
859 | fprintf(stdout, "%-12s: Single deleted %" ROCKSDB_PRIszt " times\n", "", | |
860 | single_deletes_); | |
861 | fprintf(stdout, "%-12s: %ld read and %ld found the key\n", "", | |
862 | gets_, founds_); | |
863 | fprintf(stdout, "%-12s: Prefix scanned %ld times\n", "", prefixes_); | |
864 | fprintf(stdout, "%-12s: Iterator size sum is %ld\n", "", | |
865 | iterator_size_sums_); | |
866 | fprintf(stdout, "%-12s: Iterated %ld times\n", "", iterations_); | |
867 | fprintf(stdout, "%-12s: Deleted %ld key-ranges\n", "", range_deletions_); | |
868 | fprintf(stdout, "%-12s: Range deletions covered %ld keys\n", "", | |
869 | covered_by_range_deletions_); | |
870 | ||
871 | fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_); | |
872 | fprintf(stdout, "%-12s: %ld CompactFiles() succeed\n", "", | |
873 | num_compact_files_succeed_); | |
874 | fprintf(stdout, "%-12s: %ld CompactFiles() did not succeed\n", "", | |
875 | num_compact_files_failed_); | |
876 | ||
877 | if (FLAGS_histogram) { | |
878 | fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); | |
879 | } | |
880 | fflush(stdout); | |
881 | } | |
882 | }; | |
883 | ||
884 | // State shared by all concurrent executions of the same benchmark. | |
885 | class SharedState { | |
886 | public: | |
11fdf7f2 TL |
887 | // indicates a key may have any value (or not be present) as an operation on |
888 | // it is incomplete. | |
889 | static const uint32_t UNKNOWN_SENTINEL; | |
890 | // indicates a key should definitely be deleted | |
891 | static const uint32_t DELETION_SENTINEL; | |
7c673cae FG |
892 | |
893 | explicit SharedState(StressTest* stress_test) | |
894 | : cv_(&mu_), | |
895 | seed_(static_cast<uint32_t>(FLAGS_seed)), | |
896 | max_key_(FLAGS_max_key), | |
897 | log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)), | |
898 | num_threads_(FLAGS_threads), | |
899 | num_initialized_(0), | |
900 | num_populated_(0), | |
901 | vote_reopen_(0), | |
902 | num_done_(0), | |
903 | start_(false), | |
904 | start_verify_(false), | |
905 | should_stop_bg_thread_(false), | |
906 | bg_thread_finished_(false), | |
907 | stress_test_(stress_test), | |
908 | verification_failure_(false), | |
11fdf7f2 TL |
909 | no_overwrite_ids_(FLAGS_column_families), |
910 | values_(nullptr) { | |
7c673cae FG |
911 | // Pick random keys in each column family that will not experience |
912 | // overwrite | |
913 | ||
914 | printf("Choosing random keys with no overwrite\n"); | |
11fdf7f2 TL |
915 | Random64 rnd(seed_); |
916 | // Start with the identity permutation. Subsequent iterations of | |
917 | // for loop below will start with perm of previous for loop | |
918 | int64_t *permutation = new int64_t[max_key_]; | |
919 | for (int64_t i = 0; i < max_key_; i++) { | |
920 | permutation[i] = i; | |
921 | } | |
922 | // Now do the Knuth shuffle | |
923 | int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100; | |
924 | // Only need to figure out first num_no_overwrite_keys of permutation | |
925 | no_overwrite_ids_.reserve(num_no_overwrite_keys); | |
926 | for (int64_t i = 0; i < num_no_overwrite_keys; i++) { | |
927 | int64_t rand_index = i + rnd.Next() % (max_key_ - i); | |
928 | // Swap i and rand_index; | |
929 | int64_t temp = permutation[i]; | |
930 | permutation[i] = permutation[rand_index]; | |
931 | permutation[rand_index] = temp; | |
932 | // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of | |
933 | // permutation | |
934 | no_overwrite_ids_.insert(permutation[i]); | |
935 | } | |
936 | delete[] permutation; | |
937 | ||
938 | size_t expected_values_size = | |
939 | sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_; | |
940 | bool values_init_needed = false; | |
941 | Status status; | |
942 | if (!FLAGS_expected_values_path.empty()) { | |
943 | if (!std::atomic<uint32_t>{}.is_lock_free()) { | |
944 | status = Status::InvalidArgument( | |
945 | "Cannot use --expected_values_path on platforms without lock-free " | |
946 | "std::atomic<uint32_t>"); | |
947 | } | |
948 | if (status.ok() && FLAGS_clear_column_family_one_in > 0) { | |
949 | status = Status::InvalidArgument( | |
950 | "Cannot use --expected_values_path on when " | |
951 | "--clear_column_family_one_in is greater than zero."); | |
952 | } | |
953 | uint64_t size = 0; | |
954 | if (status.ok()) { | |
955 | status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size); | |
956 | } | |
494da23a | 957 | std::unique_ptr<WritableFile> wfile; |
11fdf7f2 TL |
958 | if (status.ok() && size == 0) { |
959 | const EnvOptions soptions; | |
960 | status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile, | |
961 | soptions); | |
962 | } | |
963 | if (status.ok() && size == 0) { | |
964 | std::string buf(expected_values_size, '\0'); | |
965 | status = wfile->Append(buf); | |
966 | values_init_needed = true; | |
967 | } | |
968 | if (status.ok()) { | |
969 | status = FLAGS_env->NewMemoryMappedFileBuffer( | |
970 | FLAGS_expected_values_path, &expected_mmap_buffer_); | |
971 | } | |
972 | if (status.ok()) { | |
973 | assert(expected_mmap_buffer_->GetLen() == expected_values_size); | |
974 | values_ = | |
975 | static_cast<std::atomic<uint32_t>*>(expected_mmap_buffer_->GetBase()); | |
976 | assert(values_ != nullptr); | |
977 | } else { | |
978 | fprintf(stderr, "Failed opening shared file '%s' with error: %s\n", | |
979 | FLAGS_expected_values_path.c_str(), status.ToString().c_str()); | |
980 | assert(values_ == nullptr); | |
981 | } | |
982 | } | |
983 | if (values_ == nullptr) { | |
984 | values_allocation_.reset( | |
985 | new std::atomic<uint32_t>[FLAGS_column_families * max_key_]); | |
986 | values_ = &values_allocation_[0]; | |
987 | values_init_needed = true; | |
988 | } | |
989 | assert(values_ != nullptr); | |
990 | if (values_init_needed) { | |
991 | for (int i = 0; i < FLAGS_column_families; ++i) { | |
992 | for (int j = 0; j < max_key_; ++j) { | |
993 | Delete(i, j, false /* pending */); | |
994 | } | |
7c673cae | 995 | } |
7c673cae FG |
996 | } |
997 | ||
998 | if (FLAGS_test_batches_snapshots) { | |
999 | fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); | |
1000 | return; | |
1001 | } | |
7c673cae FG |
1002 | |
1003 | long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_); | |
1004 | if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { | |
1005 | num_locks++; | |
1006 | } | |
1007 | fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families); | |
1008 | key_locks_.resize(FLAGS_column_families); | |
1009 | ||
1010 | for (int i = 0; i < FLAGS_column_families; ++i) { | |
1011 | key_locks_[i].resize(num_locks); | |
1012 | for (auto& ptr : key_locks_[i]) { | |
1013 | ptr.reset(new port::Mutex); | |
1014 | } | |
1015 | } | |
1016 | } | |
1017 | ||
1018 | ~SharedState() {} | |
1019 | ||
1020 | port::Mutex* GetMutex() { | |
1021 | return &mu_; | |
1022 | } | |
1023 | ||
1024 | port::CondVar* GetCondVar() { | |
1025 | return &cv_; | |
1026 | } | |
1027 | ||
1028 | StressTest* GetStressTest() const { | |
1029 | return stress_test_; | |
1030 | } | |
1031 | ||
1032 | int64_t GetMaxKey() const { | |
1033 | return max_key_; | |
1034 | } | |
1035 | ||
1036 | uint32_t GetNumThreads() const { | |
1037 | return num_threads_; | |
1038 | } | |
1039 | ||
1040 | void IncInitialized() { | |
1041 | num_initialized_++; | |
1042 | } | |
1043 | ||
1044 | void IncOperated() { | |
1045 | num_populated_++; | |
1046 | } | |
1047 | ||
1048 | void IncDone() { | |
1049 | num_done_++; | |
1050 | } | |
1051 | ||
1052 | void IncVotedReopen() { | |
1053 | vote_reopen_ = (vote_reopen_ + 1) % num_threads_; | |
1054 | } | |
1055 | ||
1056 | bool AllInitialized() const { | |
1057 | return num_initialized_ >= num_threads_; | |
1058 | } | |
1059 | ||
1060 | bool AllOperated() const { | |
1061 | return num_populated_ >= num_threads_; | |
1062 | } | |
1063 | ||
1064 | bool AllDone() const { | |
1065 | return num_done_ >= num_threads_; | |
1066 | } | |
1067 | ||
1068 | bool AllVotedReopen() { | |
1069 | return (vote_reopen_ == 0); | |
1070 | } | |
1071 | ||
1072 | void SetStart() { | |
1073 | start_ = true; | |
1074 | } | |
1075 | ||
1076 | void SetStartVerify() { | |
1077 | start_verify_ = true; | |
1078 | } | |
1079 | ||
1080 | bool Started() const { | |
1081 | return start_; | |
1082 | } | |
1083 | ||
1084 | bool VerifyStarted() const { | |
1085 | return start_verify_; | |
1086 | } | |
1087 | ||
1088 | void SetVerificationFailure() { verification_failure_.store(true); } | |
1089 | ||
1090 | bool HasVerificationFailedYet() { return verification_failure_.load(); } | |
1091 | ||
11fdf7f2 | 1092 | port::Mutex* GetMutexForKey(int cf, int64_t key) { |
7c673cae FG |
1093 | return key_locks_[cf][key >> log2_keys_per_lock_].get(); |
1094 | } | |
1095 | ||
1096 | void LockColumnFamily(int cf) { | |
1097 | for (auto& mutex : key_locks_[cf]) { | |
1098 | mutex->Lock(); | |
1099 | } | |
1100 | } | |
1101 | ||
1102 | void UnlockColumnFamily(int cf) { | |
1103 | for (auto& mutex : key_locks_[cf]) { | |
1104 | mutex->Unlock(); | |
1105 | } | |
1106 | } | |
1107 | ||
11fdf7f2 TL |
1108 | std::atomic<uint32_t>& Value(int cf, int64_t key) const { |
1109 | return values_[cf * max_key_ + key]; | |
1110 | } | |
1111 | ||
7c673cae | 1112 | void ClearColumnFamily(int cf) { |
11fdf7f2 TL |
1113 | std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */), |
1114 | DELETION_SENTINEL); | |
7c673cae FG |
1115 | } |
1116 | ||
11fdf7f2 TL |
1117 | // @param pending True if the update may have started but is not yet |
1118 | // guaranteed finished. This is useful for crash-recovery testing when the | |
1119 | // process may crash before updating the expected values array. | |
1120 | void Put(int cf, int64_t key, uint32_t value_base, bool pending) { | |
1121 | if (!pending) { | |
1122 | // prevent expected-value update from reordering before Write | |
1123 | std::atomic_thread_fence(std::memory_order_release); | |
1124 | } | |
1125 | Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base, | |
1126 | std::memory_order_relaxed); | |
1127 | if (pending) { | |
1128 | // prevent Write from reordering before expected-value update | |
1129 | std::atomic_thread_fence(std::memory_order_release); | |
1130 | } | |
7c673cae FG |
1131 | } |
1132 | ||
11fdf7f2 | 1133 | uint32_t Get(int cf, int64_t key) const { return Value(cf, key); } |
7c673cae | 1134 | |
11fdf7f2 TL |
1135 | // @param pending See comment above Put() |
1136 | // Returns true if the key was not yet deleted. | |
1137 | bool Delete(int cf, int64_t key, bool pending) { | |
1138 | if (Value(cf, key) == DELETION_SENTINEL) { | |
1139 | return false; | |
1140 | } | |
1141 | Put(cf, key, DELETION_SENTINEL, pending); | |
1142 | return true; | |
1143 | } | |
7c673cae | 1144 | |
11fdf7f2 TL |
1145 | // @param pending See comment above Put() |
1146 | // Returns true if the key was not yet deleted. | |
1147 | bool SingleDelete(int cf, int64_t key, bool pending) { | |
1148 | return Delete(cf, key, pending); | |
1149 | } | |
7c673cae | 1150 | |
11fdf7f2 TL |
1151 | // @param pending See comment above Put() |
1152 | // Returns number of keys deleted by the call. | |
1153 | int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) { | |
7c673cae FG |
1154 | int covered = 0; |
1155 | for (int64_t key = begin_key; key < end_key; ++key) { | |
11fdf7f2 | 1156 | if (Delete(cf, key, pending)) { |
7c673cae FG |
1157 | ++covered; |
1158 | } | |
7c673cae FG |
1159 | } |
1160 | return covered; | |
1161 | } | |
1162 | ||
11fdf7f2 TL |
1163 | bool AllowsOverwrite(int64_t key) { |
1164 | return no_overwrite_ids_.find(key) == no_overwrite_ids_.end(); | |
7c673cae FG |
1165 | } |
1166 | ||
11fdf7f2 TL |
1167 | bool Exists(int cf, int64_t key) { |
1168 | // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite | |
1169 | // is disallowed can't be accidentally added a second time, in which case | |
1170 | // SingleDelete wouldn't be able to properly delete the key. It does allow | |
1171 | // the case where a SingleDelete might be added which covers nothing, but | |
1172 | // that's not a correctness issue. | |
1173 | uint32_t expected_value = Value(cf, key).load(); | |
1174 | return expected_value != DELETION_SENTINEL; | |
1175 | } | |
7c673cae FG |
1176 | |
1177 | uint32_t GetSeed() const { return seed_; } | |
1178 | ||
1179 | void SetShouldStopBgThread() { should_stop_bg_thread_ = true; } | |
1180 | ||
1181 | bool ShoudStopBgThread() { return should_stop_bg_thread_; } | |
1182 | ||
1183 | void SetBgThreadFinish() { bg_thread_finished_ = true; } | |
1184 | ||
1185 | bool BgThreadFinished() const { return bg_thread_finished_; } | |
1186 | ||
11fdf7f2 TL |
1187 | bool ShouldVerifyAtBeginning() const { |
1188 | return expected_mmap_buffer_.get() != nullptr; | |
1189 | } | |
1190 | ||
7c673cae FG |
1191 | private: |
1192 | port::Mutex mu_; | |
1193 | port::CondVar cv_; | |
1194 | const uint32_t seed_; | |
1195 | const int64_t max_key_; | |
1196 | const uint32_t log2_keys_per_lock_; | |
1197 | const int num_threads_; | |
1198 | long num_initialized_; | |
1199 | long num_populated_; | |
1200 | long vote_reopen_; | |
1201 | long num_done_; | |
1202 | bool start_; | |
1203 | bool start_verify_; | |
1204 | bool should_stop_bg_thread_; | |
1205 | bool bg_thread_finished_; | |
1206 | StressTest* stress_test_; | |
1207 | std::atomic<bool> verification_failure_; | |
1208 | ||
1209 | // Keys that should not be overwritten | |
11fdf7f2 | 1210 | std::unordered_set<size_t> no_overwrite_ids_; |
7c673cae | 1211 | |
11fdf7f2 TL |
1212 | std::atomic<uint32_t>* values_; |
1213 | std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_; | |
7c673cae FG |
1214 | // Has to make it owned by a smart ptr as port::Mutex is not copyable |
1215 | // and storing it in the container may require copying depending on the impl. | |
1216 | std::vector<std::vector<std::unique_ptr<port::Mutex> > > key_locks_; | |
11fdf7f2 | 1217 | std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_; |
7c673cae FG |
1218 | }; |
1219 | ||
11fdf7f2 TL |
1220 | const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe; |
1221 | const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff; | |
7c673cae FG |
1222 | |
1223 | // Per-thread state for concurrent executions of the same benchmark. | |
1224 | struct ThreadState { | |
11fdf7f2 TL |
1225 | uint32_t tid; // 0..n-1 |
1226 | Random rand; // Has different seeds for different threads | |
7c673cae FG |
1227 | SharedState* shared; |
1228 | Stats stats; | |
11fdf7f2 TL |
1229 | struct SnapshotState { |
1230 | const Snapshot* snapshot; | |
1231 | // The cf from which we did a Get at this snapshot | |
1232 | int cf_at; | |
1233 | // The name of the cf at the time that we did a read | |
1234 | std::string cf_at_name; | |
1235 | // The key with which we did a Get at this snapshot | |
1236 | std::string key; | |
1237 | // The status of the Get | |
1238 | Status status; | |
1239 | // The value of the Get | |
1240 | std::string value; | |
1241 | // optional state of all keys in the db | |
1242 | std::vector<bool> *key_vec; | |
1243 | }; | |
1244 | std::queue<std::pair<uint64_t, SnapshotState> > snapshot_queue; | |
7c673cae FG |
1245 | |
1246 | ThreadState(uint32_t index, SharedState* _shared) | |
1247 | : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {} | |
1248 | }; | |
1249 | ||
1250 | class DbStressListener : public EventListener { | |
1251 | public: | |
11fdf7f2 TL |
1252 | DbStressListener(const std::string& db_name, |
1253 | const std::vector<DbPath>& db_paths, | |
1254 | const std::vector<ColumnFamilyDescriptor>& column_families) | |
1255 | : db_name_(db_name), | |
1256 | db_paths_(db_paths), | |
1257 | column_families_(column_families), | |
1258 | num_pending_file_creations_(0) {} | |
1259 | virtual ~DbStressListener() { | |
1260 | assert(num_pending_file_creations_ == 0); | |
1261 | } | |
7c673cae | 1262 | #ifndef ROCKSDB_LITE |
11fdf7f2 | 1263 | virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { |
7c673cae FG |
1264 | assert(IsValidColumnFamilyName(info.cf_name)); |
1265 | VerifyFilePath(info.file_path); | |
1266 | // pretending doing some work here | |
1267 | std::this_thread::sleep_for( | |
11fdf7f2 | 1268 | std::chrono::microseconds(Random::GetTLSInstance()->Uniform(5000))); |
7c673cae FG |
1269 | } |
1270 | ||
11fdf7f2 TL |
1271 | virtual void OnCompactionCompleted(DB* /*db*/, |
1272 | const CompactionJobInfo& ci) override { | |
7c673cae FG |
1273 | assert(IsValidColumnFamilyName(ci.cf_name)); |
1274 | assert(ci.input_files.size() + ci.output_files.size() > 0U); | |
1275 | for (const auto& file_path : ci.input_files) { | |
1276 | VerifyFilePath(file_path); | |
1277 | } | |
1278 | for (const auto& file_path : ci.output_files) { | |
1279 | VerifyFilePath(file_path); | |
1280 | } | |
1281 | // pretending doing some work here | |
1282 | std::this_thread::sleep_for( | |
11fdf7f2 | 1283 | std::chrono::microseconds(Random::GetTLSInstance()->Uniform(5000))); |
7c673cae FG |
1284 | } |
1285 | ||
11fdf7f2 TL |
1286 | virtual void OnTableFileCreationStarted( |
1287 | const TableFileCreationBriefInfo& /*info*/) override { | |
1288 | ++num_pending_file_creations_; | |
1289 | } | |
1290 | virtual void OnTableFileCreated(const TableFileCreationInfo& info) override { | |
7c673cae FG |
1291 | assert(info.db_name == db_name_); |
1292 | assert(IsValidColumnFamilyName(info.cf_name)); | |
11fdf7f2 TL |
1293 | if (info.file_size) { |
1294 | VerifyFilePath(info.file_path); | |
1295 | } | |
7c673cae | 1296 | assert(info.job_id > 0 || FLAGS_compact_files_one_in > 0); |
11fdf7f2 | 1297 | if (info.status.ok() && info.file_size > 0) { |
494da23a TL |
1298 | assert(info.table_properties.data_size > 0 || |
1299 | info.table_properties.num_range_deletions > 0); | |
7c673cae FG |
1300 | assert(info.table_properties.raw_key_size > 0); |
1301 | assert(info.table_properties.num_entries > 0); | |
1302 | } | |
11fdf7f2 | 1303 | --num_pending_file_creations_; |
7c673cae FG |
1304 | } |
1305 | ||
1306 | protected: | |
1307 | bool IsValidColumnFamilyName(const std::string& cf_name) const { | |
1308 | if (cf_name == kDefaultColumnFamilyName) { | |
1309 | return true; | |
1310 | } | |
1311 | // The column family names in the stress tests are numbers. | |
1312 | for (size_t i = 0; i < cf_name.size(); ++i) { | |
1313 | if (cf_name[i] < '0' || cf_name[i] > '9') { | |
1314 | return false; | |
1315 | } | |
1316 | } | |
1317 | return true; | |
1318 | } | |
1319 | ||
1320 | void VerifyFileDir(const std::string& file_dir) { | |
1321 | #ifndef NDEBUG | |
1322 | if (db_name_ == file_dir) { | |
1323 | return; | |
1324 | } | |
1325 | for (const auto& db_path : db_paths_) { | |
1326 | if (db_path.path == file_dir) { | |
1327 | return; | |
1328 | } | |
1329 | } | |
11fdf7f2 TL |
1330 | for (auto& cf : column_families_) { |
1331 | for (const auto& cf_path : cf.options.cf_paths) { | |
1332 | if (cf_path.path == file_dir) { | |
1333 | return; | |
1334 | } | |
1335 | } | |
1336 | } | |
7c673cae | 1337 | assert(false); |
11fdf7f2 TL |
1338 | #else |
1339 | (void)file_dir; | |
7c673cae FG |
1340 | #endif // !NDEBUG |
1341 | } | |
1342 | ||
1343 | void VerifyFileName(const std::string& file_name) { | |
1344 | #ifndef NDEBUG | |
1345 | uint64_t file_number; | |
1346 | FileType file_type; | |
1347 | bool result = ParseFileName(file_name, &file_number, &file_type); | |
1348 | assert(result); | |
1349 | assert(file_type == kTableFile); | |
11fdf7f2 TL |
1350 | #else |
1351 | (void)file_name; | |
7c673cae FG |
1352 | #endif // !NDEBUG |
1353 | } | |
1354 | ||
1355 | void VerifyFilePath(const std::string& file_path) { | |
1356 | #ifndef NDEBUG | |
1357 | size_t pos = file_path.find_last_of("/"); | |
1358 | if (pos == std::string::npos) { | |
1359 | VerifyFileName(file_path); | |
1360 | } else { | |
1361 | if (pos > 0) { | |
1362 | VerifyFileDir(file_path.substr(0, pos)); | |
1363 | } | |
1364 | VerifyFileName(file_path.substr(pos)); | |
1365 | } | |
11fdf7f2 TL |
1366 | #else |
1367 | (void)file_path; | |
7c673cae FG |
1368 | #endif // !NDEBUG |
1369 | } | |
1370 | #endif // !ROCKSDB_LITE | |
1371 | ||
1372 | private: | |
1373 | std::string db_name_; | |
1374 | std::vector<DbPath> db_paths_; | |
11fdf7f2 TL |
1375 | std::vector<ColumnFamilyDescriptor> column_families_; |
1376 | std::atomic<int> num_pending_file_creations_; | |
7c673cae FG |
1377 | }; |
1378 | ||
1379 | } // namespace | |
1380 | ||
1381 | class StressTest { | |
1382 | public: | |
1383 | StressTest() | |
1384 | : cache_(NewCache(FLAGS_cache_size)), | |
1385 | compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)), | |
1386 | filter_policy_(FLAGS_bloom_bits >= 0 | |
1387 | ? FLAGS_use_block_based_filter | |
1388 | ? NewBloomFilterPolicy(FLAGS_bloom_bits, true) | |
1389 | : NewBloomFilterPolicy(FLAGS_bloom_bits, false) | |
1390 | : nullptr), | |
1391 | db_(nullptr), | |
11fdf7f2 TL |
1392 | #ifndef ROCKSDB_LITE |
1393 | txn_db_(nullptr), | |
1394 | #endif | |
7c673cae | 1395 | new_column_family_name_(1), |
494da23a TL |
1396 | num_times_reopened_(0), |
1397 | db_preload_finished_(false) { | |
7c673cae FG |
1398 | if (FLAGS_destroy_db_initially) { |
1399 | std::vector<std::string> files; | |
1400 | FLAGS_env->GetChildren(FLAGS_db, &files); | |
1401 | for (unsigned int i = 0; i < files.size(); i++) { | |
1402 | if (Slice(files[i]).starts_with("heap-")) { | |
1403 | FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]); | |
1404 | } | |
1405 | } | |
494da23a TL |
1406 | Options options; |
1407 | options.env = FLAGS_env; | |
1408 | Status s = DestroyDB(FLAGS_db, options); | |
1409 | if (!s.ok()) { | |
1410 | fprintf(stderr, "Cannot destroy original db: %s\n", | |
1411 | s.ToString().c_str()); | |
1412 | exit(1); | |
1413 | } | |
7c673cae FG |
1414 | } |
1415 | } | |
1416 | ||
11fdf7f2 | 1417 | virtual ~StressTest() { |
7c673cae FG |
1418 | for (auto cf : column_families_) { |
1419 | delete cf; | |
1420 | } | |
1421 | column_families_.clear(); | |
1422 | delete db_; | |
1423 | } | |
1424 | ||
1425 | std::shared_ptr<Cache> NewCache(size_t capacity) { | |
1426 | if (capacity <= 0) { | |
1427 | return nullptr; | |
1428 | } | |
1429 | if (FLAGS_use_clock_cache) { | |
1430 | auto cache = NewClockCache((size_t)capacity); | |
1431 | if (!cache) { | |
1432 | fprintf(stderr, "Clock cache not supported."); | |
1433 | exit(1); | |
1434 | } | |
1435 | return cache; | |
1436 | } else { | |
1437 | return NewLRUCache((size_t)capacity); | |
1438 | } | |
1439 | } | |
1440 | ||
1441 | bool BuildOptionsTable() { | |
1442 | if (FLAGS_set_options_one_in <= 0) { | |
1443 | return true; | |
1444 | } | |
1445 | ||
1446 | std::unordered_map<std::string, std::vector<std::string> > options_tbl = { | |
1447 | {"write_buffer_size", | |
11fdf7f2 TL |
1448 | {ToString(options_.write_buffer_size), |
1449 | ToString(options_.write_buffer_size * 2), | |
1450 | ToString(options_.write_buffer_size * 4)}}, | |
7c673cae | 1451 | {"max_write_buffer_number", |
11fdf7f2 TL |
1452 | {ToString(options_.max_write_buffer_number), |
1453 | ToString(options_.max_write_buffer_number * 2), | |
1454 | ToString(options_.max_write_buffer_number * 4)}}, | |
7c673cae FG |
1455 | {"arena_block_size", |
1456 | { | |
11fdf7f2 TL |
1457 | ToString(options_.arena_block_size), |
1458 | ToString(options_.write_buffer_size / 4), | |
1459 | ToString(options_.write_buffer_size / 8), | |
7c673cae | 1460 | }}, |
7c673cae FG |
1461 | {"memtable_huge_page_size", {"0", ToString(2 * 1024 * 1024)}}, |
1462 | {"max_successive_merges", {"0", "2", "4"}}, | |
1463 | {"inplace_update_num_locks", {"100", "200", "300"}}, | |
1464 | // TODO(ljin): enable test for this option | |
1465 | // {"disable_auto_compactions", {"100", "200", "300"}}, | |
1466 | {"soft_rate_limit", {"0", "0.5", "0.9"}}, | |
1467 | {"hard_rate_limit", {"0", "1.1", "2.0"}}, | |
1468 | {"level0_file_num_compaction_trigger", | |
1469 | { | |
11fdf7f2 TL |
1470 | ToString(options_.level0_file_num_compaction_trigger), |
1471 | ToString(options_.level0_file_num_compaction_trigger + 2), | |
1472 | ToString(options_.level0_file_num_compaction_trigger + 4), | |
7c673cae FG |
1473 | }}, |
1474 | {"level0_slowdown_writes_trigger", | |
1475 | { | |
11fdf7f2 TL |
1476 | ToString(options_.level0_slowdown_writes_trigger), |
1477 | ToString(options_.level0_slowdown_writes_trigger + 2), | |
1478 | ToString(options_.level0_slowdown_writes_trigger + 4), | |
7c673cae FG |
1479 | }}, |
1480 | {"level0_stop_writes_trigger", | |
1481 | { | |
11fdf7f2 TL |
1482 | ToString(options_.level0_stop_writes_trigger), |
1483 | ToString(options_.level0_stop_writes_trigger + 2), | |
1484 | ToString(options_.level0_stop_writes_trigger + 4), | |
7c673cae FG |
1485 | }}, |
1486 | {"max_compaction_bytes", | |
1487 | { | |
11fdf7f2 TL |
1488 | ToString(options_.target_file_size_base * 5), |
1489 | ToString(options_.target_file_size_base * 15), | |
1490 | ToString(options_.target_file_size_base * 100), | |
7c673cae FG |
1491 | }}, |
1492 | {"target_file_size_base", | |
1493 | { | |
11fdf7f2 TL |
1494 | ToString(options_.target_file_size_base), |
1495 | ToString(options_.target_file_size_base * 2), | |
1496 | ToString(options_.target_file_size_base * 4), | |
7c673cae FG |
1497 | }}, |
1498 | {"target_file_size_multiplier", | |
1499 | { | |
11fdf7f2 | 1500 | ToString(options_.target_file_size_multiplier), "1", "2", |
7c673cae FG |
1501 | }}, |
1502 | {"max_bytes_for_level_base", | |
1503 | { | |
11fdf7f2 TL |
1504 | ToString(options_.max_bytes_for_level_base / 2), |
1505 | ToString(options_.max_bytes_for_level_base), | |
1506 | ToString(options_.max_bytes_for_level_base * 2), | |
7c673cae FG |
1507 | }}, |
1508 | {"max_bytes_for_level_multiplier", | |
1509 | { | |
11fdf7f2 | 1510 | ToString(options_.max_bytes_for_level_multiplier), "1", "2", |
7c673cae FG |
1511 | }}, |
1512 | {"max_sequential_skip_in_iterations", {"4", "8", "12"}}, | |
1513 | }; | |
1514 | ||
1515 | options_table_ = std::move(options_tbl); | |
1516 | ||
1517 | for (const auto& iter : options_table_) { | |
1518 | options_index_.push_back(iter.first); | |
1519 | } | |
1520 | return true; | |
1521 | } | |
1522 | ||
1523 | bool Run() { | |
11fdf7f2 TL |
1524 | uint64_t now = FLAGS_env->NowMicros(); |
1525 | fprintf(stdout, "%s Initializing db_stress\n", | |
1526 | FLAGS_env->TimeToString(now / 1000000).c_str()); | |
7c673cae | 1527 | PrintEnv(); |
7c673cae | 1528 | Open(); |
11fdf7f2 | 1529 | BuildOptionsTable(); |
7c673cae | 1530 | SharedState shared(this); |
494da23a TL |
1531 | |
1532 | if (FLAGS_read_only) { | |
1533 | now = FLAGS_env->NowMicros(); | |
1534 | fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n", | |
1535 | FLAGS_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key); | |
1536 | PreloadDbAndReopenAsReadOnly(FLAGS_max_key, &shared); | |
1537 | } | |
7c673cae FG |
1538 | uint32_t n = shared.GetNumThreads(); |
1539 | ||
11fdf7f2 TL |
1540 | now = FLAGS_env->NowMicros(); |
1541 | fprintf(stdout, "%s Initializing worker threads\n", | |
1542 | FLAGS_env->TimeToString(now / 1000000).c_str()); | |
7c673cae FG |
1543 | std::vector<ThreadState*> threads(n); |
1544 | for (uint32_t i = 0; i < n; i++) { | |
1545 | threads[i] = new ThreadState(i, &shared); | |
1546 | FLAGS_env->StartThread(ThreadBody, threads[i]); | |
1547 | } | |
1548 | ThreadState bg_thread(0, &shared); | |
1549 | if (FLAGS_compaction_thread_pool_adjust_interval > 0) { | |
1550 | FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread); | |
1551 | } | |
1552 | ||
1553 | // Each thread goes through the following states: | |
1554 | // initializing -> wait for others to init -> read/populate/depopulate | |
1555 | // wait for others to operate -> verify -> done | |
1556 | ||
1557 | { | |
1558 | MutexLock l(shared.GetMutex()); | |
1559 | while (!shared.AllInitialized()) { | |
1560 | shared.GetCondVar()->Wait(); | |
1561 | } | |
11fdf7f2 TL |
1562 | if (shared.ShouldVerifyAtBeginning()) { |
1563 | if (shared.HasVerificationFailedYet()) { | |
1564 | printf("Crash-recovery verification failed :(\n"); | |
1565 | } else { | |
1566 | printf("Crash-recovery verification passed :)\n"); | |
1567 | } | |
1568 | } | |
7c673cae | 1569 | |
11fdf7f2 | 1570 | now = FLAGS_env->NowMicros(); |
7c673cae FG |
1571 | fprintf(stdout, "%s Starting database operations\n", |
1572 | FLAGS_env->TimeToString(now/1000000).c_str()); | |
1573 | ||
1574 | shared.SetStart(); | |
1575 | shared.GetCondVar()->SignalAll(); | |
1576 | while (!shared.AllOperated()) { | |
1577 | shared.GetCondVar()->Wait(); | |
1578 | } | |
1579 | ||
1580 | now = FLAGS_env->NowMicros(); | |
1581 | if (FLAGS_test_batches_snapshots) { | |
1582 | fprintf(stdout, "%s Limited verification already done during gets\n", | |
1583 | FLAGS_env->TimeToString((uint64_t) now/1000000).c_str()); | |
1584 | } else { | |
1585 | fprintf(stdout, "%s Starting verification\n", | |
1586 | FLAGS_env->TimeToString((uint64_t) now/1000000).c_str()); | |
1587 | } | |
1588 | ||
1589 | shared.SetStartVerify(); | |
1590 | shared.GetCondVar()->SignalAll(); | |
1591 | while (!shared.AllDone()) { | |
1592 | shared.GetCondVar()->Wait(); | |
1593 | } | |
1594 | } | |
1595 | ||
1596 | for (unsigned int i = 1; i < n; i++) { | |
1597 | threads[0]->stats.Merge(threads[i]->stats); | |
1598 | } | |
1599 | threads[0]->stats.Report("Stress Test"); | |
1600 | ||
1601 | for (unsigned int i = 0; i < n; i++) { | |
1602 | delete threads[i]; | |
1603 | threads[i] = nullptr; | |
1604 | } | |
11fdf7f2 TL |
1605 | now = FLAGS_env->NowMicros(); |
1606 | if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) { | |
7c673cae FG |
1607 | fprintf(stdout, "%s Verification successful\n", |
1608 | FLAGS_env->TimeToString(now/1000000).c_str()); | |
1609 | } | |
1610 | PrintStatistics(); | |
1611 | ||
1612 | if (FLAGS_compaction_thread_pool_adjust_interval > 0) { | |
1613 | MutexLock l(shared.GetMutex()); | |
1614 | shared.SetShouldStopBgThread(); | |
1615 | while (!shared.BgThreadFinished()) { | |
1616 | shared.GetCondVar()->Wait(); | |
1617 | } | |
1618 | } | |
1619 | ||
1620 | if (shared.HasVerificationFailedYet()) { | |
1621 | printf("Verification failed :(\n"); | |
1622 | return false; | |
1623 | } | |
1624 | return true; | |
1625 | } | |
1626 | ||
11fdf7f2 | 1627 | protected: |
7c673cae FG |
1628 | static void ThreadBody(void* v) { |
1629 | ThreadState* thread = reinterpret_cast<ThreadState*>(v); | |
1630 | SharedState* shared = thread->shared; | |
1631 | ||
11fdf7f2 TL |
1632 | if (shared->ShouldVerifyAtBeginning()) { |
1633 | thread->shared->GetStressTest()->VerifyDb(thread); | |
1634 | } | |
7c673cae FG |
1635 | { |
1636 | MutexLock l(shared->GetMutex()); | |
1637 | shared->IncInitialized(); | |
1638 | if (shared->AllInitialized()) { | |
1639 | shared->GetCondVar()->SignalAll(); | |
1640 | } | |
1641 | while (!shared->Started()) { | |
1642 | shared->GetCondVar()->Wait(); | |
1643 | } | |
1644 | } | |
1645 | thread->shared->GetStressTest()->OperateDb(thread); | |
1646 | ||
1647 | { | |
1648 | MutexLock l(shared->GetMutex()); | |
1649 | shared->IncOperated(); | |
1650 | if (shared->AllOperated()) { | |
1651 | shared->GetCondVar()->SignalAll(); | |
1652 | } | |
1653 | while (!shared->VerifyStarted()) { | |
1654 | shared->GetCondVar()->Wait(); | |
1655 | } | |
1656 | } | |
1657 | ||
11fdf7f2 | 1658 | thread->shared->GetStressTest()->VerifyDb(thread); |
7c673cae FG |
1659 | |
1660 | { | |
1661 | MutexLock l(shared->GetMutex()); | |
1662 | shared->IncDone(); | |
1663 | if (shared->AllDone()) { | |
1664 | shared->GetCondVar()->SignalAll(); | |
1665 | } | |
1666 | } | |
7c673cae FG |
1667 | } |
1668 | ||
1669 | static void PoolSizeChangeThread(void* v) { | |
1670 | assert(FLAGS_compaction_thread_pool_adjust_interval > 0); | |
1671 | ThreadState* thread = reinterpret_cast<ThreadState*>(v); | |
1672 | SharedState* shared = thread->shared; | |
1673 | ||
1674 | while (true) { | |
1675 | { | |
1676 | MutexLock l(shared->GetMutex()); | |
1677 | if (shared->ShoudStopBgThread()) { | |
1678 | shared->SetBgThreadFinish(); | |
1679 | shared->GetCondVar()->SignalAll(); | |
1680 | return; | |
1681 | } | |
1682 | } | |
1683 | ||
1684 | auto thread_pool_size_base = FLAGS_max_background_compactions; | |
1685 | auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations; | |
1686 | int new_thread_pool_size = | |
1687 | thread_pool_size_base - thread_pool_size_var + | |
1688 | thread->rand.Next() % (thread_pool_size_var * 2 + 1); | |
1689 | if (new_thread_pool_size < 1) { | |
1690 | new_thread_pool_size = 1; | |
1691 | } | |
1692 | FLAGS_env->SetBackgroundThreads(new_thread_pool_size); | |
1693 | // Sleep up to 3 seconds | |
1694 | FLAGS_env->SleepForMicroseconds( | |
1695 | thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval * | |
1696 | 1000 + | |
1697 | 1); | |
1698 | } | |
1699 | } | |
1700 | ||
11fdf7f2 TL |
1701 | static void PrintKeyValue(int cf, uint64_t key, const char* value, |
1702 | size_t sz) { | |
1703 | if (!FLAGS_verbose) { | |
1704 | return; | |
7c673cae | 1705 | } |
11fdf7f2 TL |
1706 | std::string tmp; |
1707 | tmp.reserve(sz * 2 + 16); | |
1708 | char buf[4]; | |
1709 | for (size_t i = 0; i < sz; i++) { | |
1710 | snprintf(buf, 4, "%X", value[i]); | |
1711 | tmp.append(buf); | |
7c673cae | 1712 | } |
11fdf7f2 TL |
1713 | fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n", cf, |
1714 | key, sz, tmp.c_str()); | |
1715 | } | |
7c673cae | 1716 | |
11fdf7f2 TL |
1717 | static int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) { |
1718 | const double completed_ratio = | |
1719 | static_cast<double>(iteration) / FLAGS_ops_per_thread; | |
1720 | const int64_t base_key = static_cast<int64_t>( | |
1721 | completed_ratio * (FLAGS_max_key - FLAGS_active_width)); | |
1722 | return base_key + thread->rand.Next() % FLAGS_active_width; | |
7c673cae FG |
1723 | } |
1724 | ||
11fdf7f2 TL |
1725 | static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) { |
1726 | size_t value_sz = | |
1727 | ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult; | |
1728 | assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t)); | |
1729 | (void) max_sz; | |
1730 | *((uint32_t*)v) = rand; | |
1731 | for (size_t i=sizeof(uint32_t); i < value_sz; i++) { | |
1732 | v[i] = (char)(rand ^ i); | |
1733 | } | |
1734 | v[value_sz] = '\0'; | |
1735 | return value_sz; // the size of the value set. | |
1736 | } | |
7c673cae | 1737 | |
11fdf7f2 TL |
1738 | Status AssertSame(DB* db, ColumnFamilyHandle* cf, |
1739 | ThreadState::SnapshotState& snap_state) { | |
7c673cae | 1740 | Status s; |
11fdf7f2 TL |
1741 | if (cf->GetName() != snap_state.cf_at_name) { |
1742 | return s; | |
1743 | } | |
1744 | ReadOptions ropt; | |
1745 | ropt.snapshot = snap_state.snapshot; | |
1746 | PinnableSlice exp_v(&snap_state.value); | |
1747 | exp_v.PinSelf(); | |
1748 | PinnableSlice v; | |
1749 | s = db->Get(ropt, cf, snap_state.key, &v); | |
1750 | if (!s.ok() && !s.IsNotFound()) { | |
1751 | return s; | |
1752 | } | |
1753 | if (snap_state.status != s) { | |
1754 | return Status::Corruption( | |
1755 | "The snapshot gave inconsistent results for key " + | |
1756 | ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) + | |
1757 | " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() + | |
1758 | ") vs. (" + s.ToString() + ")"); | |
7c673cae | 1759 | } |
11fdf7f2 TL |
1760 | if (s.ok()) { |
1761 | if (exp_v != v) { | |
1762 | return Status::Corruption("The snapshot gave inconsistent values: (" + | |
1763 | exp_v.ToString() + ") vs. (" + v.ToString() + | |
1764 | ")"); | |
1765 | } | |
1766 | } | |
1767 | if (snap_state.key_vec != nullptr) { | |
494da23a TL |
1768 | // When `prefix_extractor` is set, seeking to beginning and scanning |
1769 | // across prefixes are only supported with `total_order_seek` set. | |
1770 | ropt.total_order_seek = true; | |
11fdf7f2 TL |
1771 | std::unique_ptr<Iterator> iterator(db->NewIterator(ropt)); |
1772 | std::unique_ptr<std::vector<bool>> tmp_bitvec(new std::vector<bool>(FLAGS_max_key)); | |
1773 | for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { | |
1774 | uint64_t key_val; | |
1775 | if (GetIntVal(iterator->key().ToString(), &key_val)) { | |
1776 | (*tmp_bitvec.get())[key_val] = true; | |
1777 | } | |
1778 | } | |
1779 | if (!std::equal(snap_state.key_vec->begin(), | |
1780 | snap_state.key_vec->end(), | |
1781 | tmp_bitvec.get()->begin())) { | |
1782 | return Status::Corruption("Found inconsistent keys at this snapshot"); | |
1783 | } | |
1784 | } | |
1785 | return Status::OK(); | |
1786 | } | |
7c673cae | 1787 | |
494da23a TL |
1788 | // Currently PreloadDb has to be single-threaded. |
1789 | void PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, | |
1790 | SharedState* shared) { | |
1791 | WriteOptions write_opts; | |
1792 | write_opts.disableWAL = FLAGS_disable_wal; | |
1793 | if (FLAGS_sync) { | |
1794 | write_opts.sync = true; | |
1795 | } | |
1796 | char value[100]; | |
1797 | int cf_idx = 0; | |
1798 | Status s; | |
1799 | for (auto cfh : column_families_) { | |
1800 | for (int64_t k = 0; k != number_of_keys; ++k) { | |
1801 | std::string key_str = Key(k); | |
1802 | Slice key = key_str; | |
1803 | size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value)); | |
1804 | Slice v(value, sz); | |
1805 | shared->Put(cf_idx, k, 0, true /* pending */); | |
1806 | ||
1807 | if (FLAGS_use_merge) { | |
1808 | if (!FLAGS_use_txn) { | |
1809 | s = db_->Merge(write_opts, cfh, key, v); | |
1810 | } else { | |
1811 | #ifndef ROCKSDB_LITE | |
1812 | Transaction* txn; | |
1813 | s = NewTxn(write_opts, &txn); | |
1814 | if (s.ok()) { | |
1815 | s = txn->Merge(cfh, key, v); | |
1816 | if (s.ok()) { | |
1817 | s = CommitTxn(txn); | |
1818 | } | |
1819 | } | |
1820 | #endif | |
1821 | } | |
1822 | } else { | |
1823 | if (!FLAGS_use_txn) { | |
1824 | s = db_->Put(write_opts, cfh, key, v); | |
1825 | } else { | |
1826 | #ifndef ROCKSDB_LITE | |
1827 | Transaction* txn; | |
1828 | s = NewTxn(write_opts, &txn); | |
1829 | if (s.ok()) { | |
1830 | s = txn->Put(cfh, key, v); | |
1831 | if (s.ok()) { | |
1832 | s = CommitTxn(txn); | |
1833 | } | |
1834 | } | |
1835 | #endif | |
1836 | } | |
1837 | } | |
1838 | ||
1839 | shared->Put(cf_idx, k, 0, false /* pending */); | |
1840 | if (!s.ok()) { | |
1841 | break; | |
1842 | } | |
1843 | } | |
1844 | if (!s.ok()) { | |
1845 | break; | |
1846 | } | |
1847 | ++cf_idx; | |
1848 | } | |
1849 | if (s.ok()) { | |
1850 | s = db_->Flush(FlushOptions(), column_families_); | |
1851 | } | |
1852 | if (s.ok()) { | |
1853 | for (auto cf : column_families_) { | |
1854 | delete cf; | |
1855 | } | |
1856 | column_families_.clear(); | |
1857 | delete db_; | |
1858 | db_ = nullptr; | |
1859 | #ifndef ROCKSDB_LITE | |
1860 | txn_db_ = nullptr; | |
1861 | #endif | |
1862 | ||
1863 | db_preload_finished_.store(true); | |
1864 | auto now = FLAGS_env->NowMicros(); | |
1865 | fprintf(stdout, "%s Reopening database in read-only\n", | |
1866 | FLAGS_env->TimeToString(now / 1000000).c_str()); | |
1867 | // Reopen as read-only, can ignore all options related to updates | |
1868 | Open(); | |
1869 | } else { | |
1870 | fprintf(stderr, "Failed to preload db"); | |
1871 | exit(1); | |
1872 | } | |
1873 | } | |
1874 | ||
11fdf7f2 TL |
1875 | Status SetOptions(ThreadState* thread) { |
1876 | assert(FLAGS_set_options_one_in > 0); | |
1877 | std::unordered_map<std::string, std::string> opts; | |
1878 | std::string name = options_index_[ | |
1879 | thread->rand.Next() % options_index_.size()]; | |
1880 | int value_idx = thread->rand.Next() % options_table_[name].size(); | |
1881 | if (name == "soft_rate_limit" || name == "hard_rate_limit") { | |
1882 | opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx]; | |
1883 | opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx]; | |
1884 | } else if (name == "level0_file_num_compaction_trigger" || | |
1885 | name == "level0_slowdown_writes_trigger" || | |
1886 | name == "level0_stop_writes_trigger") { | |
1887 | opts["level0_file_num_compaction_trigger"] = | |
1888 | options_table_["level0_file_num_compaction_trigger"][value_idx]; | |
1889 | opts["level0_slowdown_writes_trigger"] = | |
1890 | options_table_["level0_slowdown_writes_trigger"][value_idx]; | |
1891 | opts["level0_stop_writes_trigger"] = | |
1892 | options_table_["level0_stop_writes_trigger"][value_idx]; | |
7c673cae | 1893 | } else { |
11fdf7f2 | 1894 | opts[name] = options_table_[name][value_idx]; |
7c673cae FG |
1895 | } |
1896 | ||
11fdf7f2 TL |
1897 | int rand_cf_idx = thread->rand.Next() % FLAGS_column_families; |
1898 | auto cfh = column_families_[rand_cf_idx]; | |
1899 | return db_->SetOptions(cfh, opts); | |
7c673cae FG |
1900 | } |
1901 | ||
11fdf7f2 TL |
1902 | #ifndef ROCKSDB_LITE |
1903 | Status NewTxn(WriteOptions& write_opts, Transaction** txn) { | |
1904 | if (!FLAGS_use_txn) { | |
1905 | return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); | |
1906 | } | |
1907 | static std::atomic<uint64_t> txn_id = {0}; | |
1908 | TransactionOptions txn_options; | |
1909 | *txn = txn_db_->BeginTransaction(write_opts, txn_options); | |
1910 | auto istr = std::to_string(txn_id.fetch_add(1)); | |
1911 | Status s = (*txn)->SetName("xid" + istr); | |
1912 | return s; | |
1913 | } | |
7c673cae | 1914 | |
11fdf7f2 TL |
1915 | Status CommitTxn(Transaction* txn) { |
1916 | if (!FLAGS_use_txn) { | |
1917 | return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); | |
7c673cae | 1918 | } |
11fdf7f2 TL |
1919 | Status s = txn->Prepare(); |
1920 | if (s.ok()) { | |
1921 | s = txn->Commit(); | |
7c673cae | 1922 | } |
11fdf7f2 | 1923 | delete txn; |
7c673cae FG |
1924 | return s; |
1925 | } | |
11fdf7f2 | 1926 | #endif |
7c673cae | 1927 | |
11fdf7f2 TL |
1928 | virtual void OperateDb(ThreadState* thread) { |
1929 | ReadOptions read_opts(FLAGS_verify_checksum, true); | |
1930 | WriteOptions write_opts; | |
1931 | auto shared = thread->shared; | |
1932 | char value[100]; | |
1933 | std::string from_db; | |
1934 | if (FLAGS_sync) { | |
1935 | write_opts.sync = true; | |
7c673cae FG |
1936 | } |
1937 | write_opts.disableWAL = FLAGS_disable_wal; | |
1938 | const int prefixBound = (int)FLAGS_readpercent + (int)FLAGS_prefixpercent; | |
1939 | const int writeBound = prefixBound + (int)FLAGS_writepercent; | |
1940 | const int delBound = writeBound + (int)FLAGS_delpercent; | |
1941 | const int delRangeBound = delBound + (int)FLAGS_delrangepercent; | |
1942 | ||
1943 | thread->stats.Start(); | |
1944 | for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { | |
1945 | if (thread->shared->HasVerificationFailedYet()) { | |
1946 | break; | |
1947 | } | |
1948 | if (i != 0 && (i % (FLAGS_ops_per_thread / (FLAGS_reopen + 1))) == 0) { | |
1949 | { | |
1950 | thread->stats.FinishedSingleOp(); | |
1951 | MutexLock l(thread->shared->GetMutex()); | |
11fdf7f2 TL |
1952 | while (!thread->snapshot_queue.empty()) { |
1953 | db_->ReleaseSnapshot( | |
1954 | thread->snapshot_queue.front().second.snapshot); | |
1955 | delete thread->snapshot_queue.front().second.key_vec; | |
1956 | thread->snapshot_queue.pop(); | |
1957 | } | |
7c673cae FG |
1958 | thread->shared->IncVotedReopen(); |
1959 | if (thread->shared->AllVotedReopen()) { | |
1960 | thread->shared->GetStressTest()->Reopen(); | |
1961 | thread->shared->GetCondVar()->SignalAll(); | |
494da23a | 1962 | } else { |
7c673cae FG |
1963 | thread->shared->GetCondVar()->Wait(); |
1964 | } | |
1965 | // Commenting this out as we don't want to reset stats on each open. | |
1966 | // thread->stats.Start(); | |
1967 | } | |
1968 | } | |
1969 | ||
1970 | // Change Options | |
1971 | if (FLAGS_set_options_one_in > 0 && | |
1972 | thread->rand.OneIn(FLAGS_set_options_one_in)) { | |
1973 | SetOptions(thread); | |
1974 | } | |
1975 | ||
1976 | if (FLAGS_set_in_place_one_in > 0 && | |
1977 | thread->rand.OneIn(FLAGS_set_in_place_one_in)) { | |
1978 | options_.inplace_update_support ^= options_.inplace_update_support; | |
1979 | } | |
1980 | ||
11fdf7f2 TL |
1981 | MaybeClearOneColumnFamily(thread); |
1982 | ||
1983 | #ifndef ROCKSDB_LITE | |
7c673cae FG |
1984 | if (FLAGS_compact_files_one_in > 0 && |
1985 | thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) { | |
1986 | auto* random_cf = | |
1987 | column_families_[thread->rand.Next() % FLAGS_column_families]; | |
1988 | rocksdb::ColumnFamilyMetaData cf_meta_data; | |
1989 | db_->GetColumnFamilyMetaData(random_cf, &cf_meta_data); | |
1990 | ||
1991 | // Randomly compact up to three consecutive files from a level | |
1992 | const int kMaxRetry = 3; | |
1993 | for (int attempt = 0; attempt < kMaxRetry; ++attempt) { | |
1994 | size_t random_level = thread->rand.Uniform( | |
1995 | static_cast<int>(cf_meta_data.levels.size())); | |
1996 | ||
1997 | const auto& files = cf_meta_data.levels[random_level].files; | |
1998 | if (files.size() > 0) { | |
1999 | size_t random_file_index = | |
2000 | thread->rand.Uniform(static_cast<int>(files.size())); | |
2001 | if (files[random_file_index].being_compacted) { | |
2002 | // Retry as the selected file is currently being compacted | |
2003 | continue; | |
2004 | } | |
2005 | ||
2006 | std::vector<std::string> input_files; | |
2007 | input_files.push_back(files[random_file_index].name); | |
2008 | if (random_file_index > 0 && | |
2009 | !files[random_file_index - 1].being_compacted) { | |
2010 | input_files.push_back(files[random_file_index - 1].name); | |
2011 | } | |
2012 | if (random_file_index + 1 < files.size() && | |
2013 | !files[random_file_index + 1].being_compacted) { | |
2014 | input_files.push_back(files[random_file_index + 1].name); | |
2015 | } | |
2016 | ||
2017 | size_t output_level = | |
2018 | std::min(random_level + 1, cf_meta_data.levels.size() - 1); | |
2019 | auto s = | |
2020 | db_->CompactFiles(CompactionOptions(), random_cf, input_files, | |
2021 | static_cast<int>(output_level)); | |
2022 | if (!s.ok()) { | |
11fdf7f2 TL |
2023 | fprintf(stdout, "Unable to perform CompactFiles(): %s\n", |
2024 | s.ToString().c_str()); | |
7c673cae FG |
2025 | thread->stats.AddNumCompactFilesFailed(1); |
2026 | } else { | |
2027 | thread->stats.AddNumCompactFilesSucceed(1); | |
2028 | } | |
2029 | break; | |
2030 | } | |
2031 | } | |
2032 | } | |
2033 | #endif // !ROCKSDB_LITE | |
11fdf7f2 | 2034 | int64_t rand_key = GenerateOneKey(thread, i); |
7c673cae FG |
2035 | int rand_column_family = thread->rand.Next() % FLAGS_column_families; |
2036 | std::string keystr = Key(rand_key); | |
2037 | Slice key = keystr; | |
11fdf7f2 TL |
2038 | std::unique_ptr<MutexLock> lock; |
2039 | if (ShouldAcquireMutexOnKey()) { | |
2040 | lock.reset(new MutexLock( | |
7c673cae FG |
2041 | shared->GetMutexForKey(rand_column_family, rand_key))); |
2042 | } | |
11fdf7f2 | 2043 | |
7c673cae FG |
2044 | auto column_family = column_families_[rand_column_family]; |
2045 | ||
11fdf7f2 TL |
2046 | if (FLAGS_compact_range_one_in > 0 && |
2047 | thread->rand.Uniform(FLAGS_compact_range_one_in) == 0) { | |
2048 | int64_t end_key_num; | |
2049 | if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) { | |
2050 | end_key_num = port::kMaxInt64; | |
2051 | } else { | |
2052 | end_key_num = FLAGS_compact_range_width + rand_key; | |
2053 | } | |
2054 | std::string end_key_buf = Key(end_key_num); | |
2055 | Slice end_key(end_key_buf); | |
2056 | ||
2057 | CompactRangeOptions cro; | |
2058 | cro.exclusive_manual_compaction = | |
2059 | static_cast<bool>(thread->rand.Next() % 2); | |
2060 | Status status = db_->CompactRange(cro, column_family, &key, &end_key); | |
2061 | if (!status.ok()) { | |
2062 | printf("Unable to perform CompactRange(): %s\n", | |
2063 | status.ToString().c_str()); | |
2064 | } | |
2065 | } | |
2066 | ||
2067 | std::vector<int> rand_column_families = | |
2068 | GenerateColumnFamilies(FLAGS_column_families, rand_column_family); | |
494da23a TL |
2069 | |
2070 | if (FLAGS_flush_one_in > 0 && | |
2071 | thread->rand.Uniform(FLAGS_flush_one_in) == 0) { | |
2072 | FlushOptions flush_opts; | |
2073 | std::vector<ColumnFamilyHandle*> cfhs; | |
2074 | std::for_each( | |
2075 | rand_column_families.begin(), rand_column_families.end(), | |
2076 | [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); }); | |
2077 | Status status = db_->Flush(flush_opts, cfhs); | |
2078 | if (!status.ok()) { | |
2079 | fprintf(stdout, "Unable to perform Flush(): %s\n", | |
2080 | status.ToString().c_str()); | |
2081 | } | |
2082 | } | |
2083 | ||
11fdf7f2 TL |
2084 | std::vector<int64_t> rand_keys = GenerateKeys(rand_key); |
2085 | ||
2086 | if (FLAGS_ingest_external_file_one_in > 0 && | |
2087 | thread->rand.Uniform(FLAGS_ingest_external_file_one_in) == 0) { | |
2088 | TestIngestExternalFile(thread, rand_column_families, rand_keys, lock); | |
2089 | } | |
2090 | ||
494da23a TL |
2091 | if (FLAGS_backup_one_in > 0 && |
2092 | thread->rand.Uniform(FLAGS_backup_one_in) == 0) { | |
2093 | Status s = TestBackupRestore(thread, rand_column_families, rand_keys); | |
2094 | if (!s.ok()) { | |
2095 | VerificationAbort(shared, "Backup/restore gave inconsistent state", | |
2096 | s); | |
2097 | } | |
2098 | } | |
2099 | ||
2100 | if (FLAGS_checkpoint_one_in > 0 && | |
2101 | thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) { | |
2102 | Status s = TestCheckpoint(thread, rand_column_families, rand_keys); | |
2103 | if (!s.ok()) { | |
2104 | VerificationAbort(shared, "Checkpoint gave inconsistent state", s); | |
2105 | } | |
2106 | } | |
2107 | ||
11fdf7f2 TL |
2108 | if (FLAGS_acquire_snapshot_one_in > 0 && |
2109 | thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) { | |
2110 | auto snapshot = db_->GetSnapshot(); | |
2111 | ReadOptions ropt; | |
2112 | ropt.snapshot = snapshot; | |
2113 | std::string value_at; | |
2114 | // When taking a snapshot, we also read a key from that snapshot. We | |
2115 | // will later read the same key before releasing the snapshot and verify | |
2116 | // that the results are the same. | |
2117 | auto status_at = db_->Get(ropt, column_family, key, &value_at); | |
2118 | std::vector<bool> *key_vec = nullptr; | |
2119 | ||
2120 | if (FLAGS_compare_full_db_state_snapshot && | |
2121 | (thread->tid == 0)) { | |
2122 | key_vec = new std::vector<bool>(FLAGS_max_key); | |
494da23a TL |
2123 | // When `prefix_extractor` is set, seeking to beginning and scanning |
2124 | // across prefixes are only supported with `total_order_seek` set. | |
2125 | ropt.total_order_seek = true; | |
11fdf7f2 TL |
2126 | std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt)); |
2127 | for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { | |
2128 | uint64_t key_val; | |
2129 | if (GetIntVal(iterator->key().ToString(), &key_val)) { | |
2130 | (*key_vec)[key_val] = true; | |
2131 | } | |
2132 | } | |
2133 | } | |
2134 | ||
2135 | ThreadState::SnapshotState snap_state = { | |
2136 | snapshot, rand_column_family, column_family->GetName(), | |
2137 | keystr, status_at, value_at, key_vec}; | |
2138 | thread->snapshot_queue.emplace( | |
2139 | std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops), | |
2140 | snap_state); | |
2141 | } | |
2142 | while (!thread->snapshot_queue.empty() && | |
2143 | i == thread->snapshot_queue.front().first) { | |
2144 | auto snap_state = thread->snapshot_queue.front().second; | |
2145 | assert(snap_state.snapshot); | |
2146 | // Note: this is unsafe as the cf might be dropped concurrently. But it | |
2147 | // is ok since unclean cf drop is cunnrently not supported by write | |
2148 | // prepared transactions. | |
2149 | Status s = | |
2150 | AssertSame(db_, column_families_[snap_state.cf_at], snap_state); | |
2151 | if (!s.ok()) { | |
2152 | VerificationAbort(shared, "Snapshot gave inconsistent state", s); | |
2153 | } | |
2154 | db_->ReleaseSnapshot(snap_state.snapshot); | |
2155 | delete snap_state.key_vec; | |
2156 | thread->snapshot_queue.pop(); | |
2157 | } | |
2158 | ||
7c673cae FG |
2159 | int prob_op = thread->rand.Uniform(100); |
2160 | if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) { | |
2161 | // OPERATION read | |
11fdf7f2 | 2162 | TestGet(thread, read_opts, rand_column_families, rand_keys); |
7c673cae FG |
2163 | } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) { |
2164 | // OPERATION prefix scan | |
2165 | // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are | |
2166 | // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will | |
2167 | // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same | |
2168 | // prefix | |
11fdf7f2 | 2169 | TestPrefixScan(thread, read_opts, rand_column_families, rand_keys); |
7c673cae FG |
2170 | } else if (prefixBound <= prob_op && prob_op < writeBound) { |
2171 | // OPERATION write | |
11fdf7f2 TL |
2172 | TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys, |
2173 | value, lock); | |
7c673cae FG |
2174 | } else if (writeBound <= prob_op && prob_op < delBound) { |
2175 | // OPERATION delete | |
11fdf7f2 | 2176 | TestDelete(thread, write_opts, rand_column_families, rand_keys, lock); |
7c673cae FG |
2177 | } else if (delBound <= prob_op && prob_op < delRangeBound) { |
2178 | // OPERATION delete range | |
11fdf7f2 TL |
2179 | TestDeleteRange(thread, write_opts, rand_column_families, rand_keys, |
2180 | lock); | |
7c673cae FG |
2181 | } else { |
2182 | // OPERATION iterate | |
11fdf7f2 | 2183 | TestIterate(thread, read_opts, rand_column_families, rand_keys); |
7c673cae FG |
2184 | } |
2185 | thread->stats.FinishedSingleOp(); | |
2186 | } | |
2187 | ||
2188 | thread->stats.Stop(); | |
2189 | } | |
2190 | ||
11fdf7f2 TL |
2191 | virtual void VerifyDb(ThreadState* thread) const = 0; |
2192 | ||
2193 | virtual void MaybeClearOneColumnFamily(ThreadState* /* thread */) {} | |
2194 | ||
2195 | virtual bool ShouldAcquireMutexOnKey() const { return false; } | |
2196 | ||
2197 | virtual std::vector<int> GenerateColumnFamilies( | |
2198 | const int /* num_column_families */, int rand_column_family) const { | |
2199 | return {rand_column_family}; | |
2200 | } | |
2201 | ||
2202 | virtual std::vector<int64_t> GenerateKeys(int64_t rand_key) const { | |
2203 | return {rand_key}; | |
2204 | } | |
2205 | ||
2206 | virtual Status TestGet(ThreadState* thread, | |
2207 | const ReadOptions& read_opts, | |
2208 | const std::vector<int>& rand_column_families, | |
2209 | const std::vector<int64_t>& rand_keys) = 0; | |
2210 | ||
2211 | virtual Status TestPrefixScan(ThreadState* thread, | |
2212 | const ReadOptions& read_opts, | |
2213 | const std::vector<int>& rand_column_families, | |
2214 | const std::vector<int64_t>& rand_keys) = 0; | |
2215 | ||
2216 | virtual Status TestPut(ThreadState* thread, | |
2217 | WriteOptions& write_opts, const ReadOptions& read_opts, | |
2218 | const std::vector<int>& cf_ids, const std::vector<int64_t>& keys, | |
2219 | char (&value)[100], std::unique_ptr<MutexLock>& lock) = 0; | |
2220 | ||
2221 | virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts, | |
2222 | const std::vector<int>& rand_column_families, | |
2223 | const std::vector<int64_t>& rand_keys, | |
2224 | std::unique_ptr<MutexLock>& lock) = 0; | |
2225 | ||
2226 | virtual Status TestDeleteRange(ThreadState* thread, | |
2227 | WriteOptions& write_opts, | |
2228 | const std::vector<int>& rand_column_families, | |
2229 | const std::vector<int64_t>& rand_keys, | |
2230 | std::unique_ptr<MutexLock>& lock) = 0; | |
2231 | ||
2232 | virtual void TestIngestExternalFile( | |
2233 | ThreadState* thread, const std::vector<int>& rand_column_families, | |
2234 | const std::vector<int64_t>& rand_keys, | |
2235 | std::unique_ptr<MutexLock>& lock) = 0; | |
2236 | ||
2237 | // Given a key K, this creates an iterator which scans to K and then | |
2238 | // does a random sequence of Next/Prev operations. | |
2239 | virtual Status TestIterate(ThreadState* thread, | |
2240 | const ReadOptions& read_opts, | |
2241 | const std::vector<int>& rand_column_families, | |
2242 | const std::vector<int64_t>& rand_keys) { | |
2243 | Status s; | |
2244 | const Snapshot* snapshot = db_->GetSnapshot(); | |
2245 | ReadOptions readoptionscopy = read_opts; | |
2246 | readoptionscopy.snapshot = snapshot; | |
2247 | ||
2248 | std::string upper_bound_str; | |
2249 | Slice upper_bound; | |
2250 | if (thread->rand.OneIn(16)) { | |
2251 | // in 1/16 chance, set a iterator upper bound | |
2252 | int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread); | |
2253 | upper_bound_str = Key(rand_upper_key); | |
2254 | upper_bound = Slice(upper_bound_str); | |
2255 | // uppder_bound can be smaller than seek key, but the query itself | |
2256 | // should not crash either. | |
2257 | readoptionscopy.iterate_upper_bound = &upper_bound; | |
2258 | } | |
2259 | std::string lower_bound_str; | |
2260 | Slice lower_bound; | |
2261 | if (thread->rand.OneIn(16)) { | |
2262 | // in 1/16 chance, set a iterator lower bound | |
2263 | int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread); | |
2264 | lower_bound_str = Key(rand_lower_key); | |
2265 | lower_bound = Slice(lower_bound_str); | |
2266 | // uppder_bound can be smaller than seek key, but the query itself | |
2267 | // should not crash either. | |
2268 | readoptionscopy.iterate_lower_bound = &lower_bound; | |
2269 | } | |
2270 | ||
2271 | auto cfh = column_families_[rand_column_families[0]]; | |
2272 | std::unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, cfh)); | |
2273 | ||
2274 | std::string key_str = Key(rand_keys[0]); | |
2275 | Slice key = key_str; | |
2276 | iter->Seek(key); | |
2277 | for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) { | |
2278 | if (thread->rand.OneIn(2)) { | |
2279 | iter->Next(); | |
2280 | } else { | |
2281 | iter->Prev(); | |
7c673cae | 2282 | } |
7c673cae FG |
2283 | } |
2284 | ||
2285 | if (s.ok()) { | |
11fdf7f2 | 2286 | thread->stats.AddIterations(1); |
7c673cae | 2287 | } else { |
11fdf7f2 | 2288 | thread->stats.AddErrors(1); |
7c673cae | 2289 | } |
11fdf7f2 TL |
2290 | |
2291 | db_->ReleaseSnapshot(snapshot); | |
2292 | ||
2293 | return s; | |
7c673cae FG |
2294 | } |
2295 | ||
494da23a TL |
2296 | #ifdef ROCKSDB_LITE |
2297 | virtual Status TestBackupRestore( | |
2298 | ThreadState* /* thread */, | |
2299 | const std::vector<int>& /* rand_column_families */, | |
2300 | const std::vector<int64_t>& /* rand_keys */) { | |
2301 | assert(false); | |
2302 | fprintf(stderr, | |
2303 | "RocksDB lite does not support " | |
2304 | "TestBackupRestore\n"); | |
2305 | std::terminate(); | |
2306 | } | |
2307 | ||
2308 | virtual Status TestCheckpoint( | |
2309 | ThreadState* /* thread */, | |
2310 | const std::vector<int>& /* rand_column_families */, | |
2311 | const std::vector<int64_t>& /* rand_keys */) { | |
2312 | assert(false); | |
2313 | fprintf(stderr, | |
2314 | "RocksDB lite does not support " | |
2315 | "TestCheckpoint\n"); | |
2316 | std::terminate(); | |
2317 | } | |
2318 | #else // ROCKSDB_LITE | |
2319 | virtual Status TestBackupRestore(ThreadState* thread, | |
2320 | const std::vector<int>& rand_column_families, | |
2321 | const std::vector<int64_t>& rand_keys) { | |
2322 | // Note the column families chosen by `rand_column_families` cannot be | |
2323 | // dropped while the locks for `rand_keys` are held. So we should not have | |
2324 | // to worry about accessing those column families throughout this function. | |
2325 | assert(rand_column_families.size() == rand_keys.size()); | |
2326 | std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid); | |
2327 | std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid); | |
2328 | BackupableDBOptions backup_opts(backup_dir); | |
2329 | BackupEngine* backup_engine = nullptr; | |
2330 | Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine); | |
2331 | if (s.ok()) { | |
2332 | s = backup_engine->CreateNewBackup(db_); | |
2333 | } | |
2334 | if (s.ok()) { | |
2335 | delete backup_engine; | |
2336 | backup_engine = nullptr; | |
2337 | s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine); | |
2338 | } | |
2339 | if (s.ok()) { | |
2340 | s = backup_engine->RestoreDBFromLatestBackup(restore_dir /* db_dir */, | |
2341 | restore_dir /* wal_dir */); | |
2342 | } | |
2343 | if (s.ok()) { | |
2344 | s = backup_engine->PurgeOldBackups(0 /* num_backups_to_keep */); | |
2345 | } | |
2346 | DB* restored_db = nullptr; | |
2347 | std::vector<ColumnFamilyHandle*> restored_cf_handles; | |
2348 | if (s.ok()) { | |
2349 | Options restore_options(options_); | |
2350 | restore_options.listeners.clear(); | |
2351 | std::vector<ColumnFamilyDescriptor> cf_descriptors; | |
2352 | // TODO(ajkr): `column_family_names_` is not safe to access here when | |
2353 | // `clear_column_family_one_in != 0`. But we can't easily switch to | |
2354 | // `ListColumnFamilies` to get names because it won't necessarily give | |
2355 | // the same order as `column_family_names_`. | |
2356 | assert(FLAGS_clear_column_family_one_in == 0); | |
2357 | for (auto name : column_family_names_) { | |
2358 | cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options)); | |
2359 | } | |
2360 | s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors, | |
2361 | &restored_cf_handles, &restored_db); | |
2362 | } | |
2363 | // for simplicity, currently only verifies existence/non-existence of a few | |
2364 | // keys | |
2365 | for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) { | |
2366 | std::string key_str = Key(rand_keys[i]); | |
2367 | Slice key = key_str; | |
2368 | std::string restored_value; | |
2369 | Status get_status = restored_db->Get( | |
2370 | ReadOptions(), restored_cf_handles[rand_column_families[i]], key, | |
2371 | &restored_value); | |
2372 | bool exists = | |
2373 | thread->shared->Exists(rand_column_families[i], rand_keys[i]); | |
2374 | if (get_status.ok()) { | |
2375 | if (!exists) { | |
2376 | s = Status::Corruption( | |
2377 | "key exists in restore but not in original db"); | |
2378 | } | |
2379 | } else if (get_status.IsNotFound()) { | |
2380 | if (exists) { | |
2381 | s = Status::Corruption( | |
2382 | "key exists in original db but not in restore"); | |
2383 | } | |
2384 | } else { | |
2385 | s = get_status; | |
2386 | } | |
2387 | } | |
2388 | if (backup_engine != nullptr) { | |
2389 | delete backup_engine; | |
2390 | backup_engine = nullptr; | |
2391 | } | |
2392 | if (restored_db != nullptr) { | |
2393 | for (auto* cf_handle : restored_cf_handles) { | |
2394 | restored_db->DestroyColumnFamilyHandle(cf_handle); | |
2395 | } | |
2396 | delete restored_db; | |
2397 | restored_db = nullptr; | |
2398 | } | |
2399 | if (!s.ok()) { | |
2400 | printf("A backup/restore operation failed with: %s\n", | |
2401 | s.ToString().c_str()); | |
2402 | } | |
2403 | return s; | |
2404 | } | |
2405 | ||
2406 | virtual Status TestCheckpoint(ThreadState* thread, | |
2407 | const std::vector<int>& rand_column_families, | |
2408 | const std::vector<int64_t>& rand_keys) { | |
2409 | // Note the column families chosen by `rand_column_families` cannot be | |
2410 | // dropped while the locks for `rand_keys` are held. So we should not have | |
2411 | // to worry about accessing those column families throughout this function. | |
2412 | assert(rand_column_families.size() == rand_keys.size()); | |
2413 | std::string checkpoint_dir = | |
2414 | FLAGS_db + "/.checkpoint" + ToString(thread->tid); | |
2415 | DestroyDB(checkpoint_dir, Options()); | |
2416 | Checkpoint* checkpoint = nullptr; | |
2417 | Status s = Checkpoint::Create(db_, &checkpoint); | |
2418 | if (s.ok()) { | |
2419 | s = checkpoint->CreateCheckpoint(checkpoint_dir); | |
2420 | } | |
2421 | std::vector<ColumnFamilyHandle*> cf_handles; | |
2422 | DB* checkpoint_db = nullptr; | |
2423 | if (s.ok()) { | |
2424 | delete checkpoint; | |
2425 | checkpoint = nullptr; | |
2426 | Options options(options_); | |
2427 | options.listeners.clear(); | |
2428 | std::vector<ColumnFamilyDescriptor> cf_descs; | |
2429 | // TODO(ajkr): `column_family_names_` is not safe to access here when | |
2430 | // `clear_column_family_one_in != 0`. But we can't easily switch to | |
2431 | // `ListColumnFamilies` to get names because it won't necessarily give | |
2432 | // the same order as `column_family_names_`. | |
2433 | if (FLAGS_clear_column_family_one_in == 0) { | |
2434 | for (const auto& name : column_family_names_) { | |
2435 | cf_descs.emplace_back(name, ColumnFamilyOptions(options)); | |
2436 | } | |
2437 | s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs, | |
2438 | &cf_handles, &checkpoint_db); | |
2439 | } | |
2440 | } | |
2441 | if (checkpoint_db != nullptr) { | |
2442 | for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) { | |
2443 | std::string key_str = Key(rand_keys[i]); | |
2444 | Slice key = key_str; | |
2445 | std::string value; | |
2446 | Status get_status = checkpoint_db->Get( | |
2447 | ReadOptions(), cf_handles[rand_column_families[i]], key, &value); | |
2448 | bool exists = | |
2449 | thread->shared->Exists(rand_column_families[i], rand_keys[i]); | |
2450 | if (get_status.ok()) { | |
2451 | if (!exists) { | |
2452 | s = Status::Corruption( | |
2453 | "key exists in checkpoint but not in original db"); | |
2454 | } | |
2455 | } else if (get_status.IsNotFound()) { | |
2456 | if (exists) { | |
2457 | s = Status::Corruption( | |
2458 | "key exists in original db but not in checkpoint"); | |
2459 | } | |
2460 | } else { | |
2461 | s = get_status; | |
2462 | } | |
2463 | } | |
2464 | for (auto cfh : cf_handles) { | |
2465 | delete cfh; | |
2466 | } | |
2467 | cf_handles.clear(); | |
2468 | delete checkpoint_db; | |
2469 | checkpoint_db = nullptr; | |
2470 | } | |
2471 | DestroyDB(checkpoint_dir, Options()); | |
2472 | if (!s.ok()) { | |
2473 | fprintf(stderr, "A checkpoint operation failed with: %s\n", | |
2474 | s.ToString().c_str()); | |
2475 | } | |
2476 | return s; | |
2477 | } | |
2478 | #endif // ROCKSDB_LITE | |
2479 | ||
11fdf7f2 TL |
2480 | void VerificationAbort(SharedState* shared, std::string msg, Status s) const { |
2481 | printf("Verification failed: %s. Status is %s\n", msg.c_str(), | |
2482 | s.ToString().c_str()); | |
2483 | shared->SetVerificationFailure(); | |
7c673cae FG |
2484 | } |
2485 | ||
11fdf7f2 TL |
2486 | void VerificationAbort(SharedState* shared, std::string msg, int cf, |
2487 | int64_t key) const { | |
2488 | printf("Verification failed for column family %d key %" PRIi64 ": %s\n", cf, key, | |
2489 | msg.c_str()); | |
2490 | shared->SetVerificationFailure(); | |
7c673cae FG |
2491 | } |
2492 | ||
2493 | void PrintEnv() const { | |
2494 | fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion, | |
2495 | kMinorVersion); | |
11fdf7f2 TL |
2496 | fprintf(stdout, "Format version : %d\n", FLAGS_format_version); |
2497 | fprintf(stdout, "TransactionDB : %s\n", | |
2498 | FLAGS_use_txn ? "true" : "false"); | |
494da23a TL |
2499 | fprintf(stdout, "Read only mode : %s\n", |
2500 | FLAGS_read_only ? "true" : "false"); | |
2501 | fprintf(stdout, "Atomic flush : %s\n", | |
2502 | FLAGS_atomic_flush ? "true" : "false"); | |
7c673cae FG |
2503 | fprintf(stdout, "Column families : %d\n", FLAGS_column_families); |
2504 | if (!FLAGS_test_batches_snapshots) { | |
2505 | fprintf(stdout, "Clear CFs one in : %d\n", | |
2506 | FLAGS_clear_column_family_one_in); | |
2507 | } | |
2508 | fprintf(stdout, "Number of threads : %d\n", FLAGS_threads); | |
2509 | fprintf(stdout, "Ops per thread : %lu\n", | |
2510 | (unsigned long)FLAGS_ops_per_thread); | |
2511 | std::string ttl_state("unused"); | |
2512 | if (FLAGS_ttl > 0) { | |
2513 | ttl_state = NumberToString(FLAGS_ttl); | |
2514 | } | |
2515 | fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str()); | |
2516 | fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent); | |
2517 | fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent); | |
2518 | fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent); | |
2519 | fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent); | |
2520 | fprintf(stdout, "Delete range percentage : %d%%\n", FLAGS_delrangepercent); | |
2521 | fprintf(stdout, "No overwrite percentage : %d%%\n", | |
2522 | FLAGS_nooverwritepercent); | |
2523 | fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent); | |
2524 | fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n", | |
2525 | FLAGS_db_write_buffer_size); | |
2526 | fprintf(stdout, "Write-buffer-size : %d\n", | |
2527 | FLAGS_write_buffer_size); | |
2528 | fprintf(stdout, "Iterations : %lu\n", | |
2529 | (unsigned long)FLAGS_num_iterations); | |
2530 | fprintf(stdout, "Max key : %lu\n", | |
2531 | (unsigned long)FLAGS_max_key); | |
2532 | fprintf(stdout, "Ratio #ops/#keys : %f\n", | |
2533 | (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key); | |
2534 | fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen); | |
2535 | fprintf(stdout, "Batches/snapshots : %d\n", | |
2536 | FLAGS_test_batches_snapshots); | |
2537 | fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update); | |
2538 | fprintf(stdout, "Num keys per lock : %d\n", | |
2539 | 1 << FLAGS_log2_keys_per_lock); | |
2540 | std::string compression = CompressionTypeToString(FLAGS_compression_type_e); | |
2541 | fprintf(stdout, "Compression : %s\n", compression.c_str()); | |
11fdf7f2 TL |
2542 | std::string checksum = ChecksumTypeToString(FLAGS_checksum_type_e); |
2543 | fprintf(stdout, "Checksum type : %s\n", checksum.c_str()); | |
7c673cae FG |
2544 | fprintf(stdout, "Max subcompactions : %" PRIu64 "\n", |
2545 | FLAGS_subcompactions); | |
2546 | ||
2547 | const char* memtablerep = ""; | |
2548 | switch (FLAGS_rep_factory) { | |
2549 | case kSkipList: | |
2550 | memtablerep = "skip_list"; | |
2551 | break; | |
2552 | case kHashSkipList: | |
2553 | memtablerep = "prefix_hash"; | |
2554 | break; | |
2555 | case kVectorRep: | |
2556 | memtablerep = "vector"; | |
2557 | break; | |
2558 | } | |
2559 | ||
2560 | fprintf(stdout, "Memtablerep : %s\n", memtablerep); | |
2561 | ||
2562 | fprintf(stdout, "Test kill odd : %d\n", rocksdb_kill_odds); | |
2563 | if (!rocksdb_kill_prefix_blacklist.empty()) { | |
2564 | fprintf(stdout, "Skipping kill points prefixes:\n"); | |
2565 | for (auto& p : rocksdb_kill_prefix_blacklist) { | |
2566 | fprintf(stdout, " %s\n", p.c_str()); | |
2567 | } | |
2568 | } | |
2569 | ||
2570 | fprintf(stdout, "------------------------------------------------\n"); | |
2571 | } | |
2572 | ||
2573 | void Open() { | |
2574 | assert(db_ == nullptr); | |
11fdf7f2 TL |
2575 | #ifndef ROCKSDB_LITE |
2576 | assert(txn_db_ == nullptr); | |
2577 | #endif | |
2578 | if (FLAGS_options_file.empty()) { | |
2579 | BlockBasedTableOptions block_based_options; | |
2580 | block_based_options.block_cache = cache_; | |
2581 | block_based_options.block_cache_compressed = compressed_cache_; | |
2582 | block_based_options.checksum = FLAGS_checksum_type_e; | |
2583 | block_based_options.block_size = FLAGS_block_size; | |
2584 | block_based_options.format_version = | |
2585 | static_cast<uint32_t>(FLAGS_format_version); | |
2586 | block_based_options.index_block_restart_interval = | |
2587 | static_cast<int32_t>(FLAGS_index_block_restart_interval); | |
2588 | block_based_options.filter_policy = filter_policy_; | |
2589 | options_.table_factory.reset( | |
2590 | NewBlockBasedTableFactory(block_based_options)); | |
2591 | options_.db_write_buffer_size = FLAGS_db_write_buffer_size; | |
2592 | options_.write_buffer_size = FLAGS_write_buffer_size; | |
2593 | options_.max_write_buffer_number = FLAGS_max_write_buffer_number; | |
2594 | options_.min_write_buffer_number_to_merge = | |
2595 | FLAGS_min_write_buffer_number_to_merge; | |
2596 | options_.max_write_buffer_number_to_maintain = | |
2597 | FLAGS_max_write_buffer_number_to_maintain; | |
2598 | options_.memtable_prefix_bloom_size_ratio = | |
2599 | FLAGS_memtable_prefix_bloom_size_ratio; | |
494da23a TL |
2600 | options_.memtable_whole_key_filtering = |
2601 | FLAGS_memtable_whole_key_filtering; | |
11fdf7f2 TL |
2602 | options_.max_background_compactions = FLAGS_max_background_compactions; |
2603 | options_.max_background_flushes = FLAGS_max_background_flushes; | |
2604 | options_.compaction_style = | |
2605 | static_cast<rocksdb::CompactionStyle>(FLAGS_compaction_style); | |
2606 | options_.prefix_extractor.reset( | |
2607 | NewFixedPrefixTransform(FLAGS_prefix_size)); | |
2608 | options_.max_open_files = FLAGS_open_files; | |
2609 | options_.statistics = dbstats; | |
2610 | options_.env = FLAGS_env; | |
2611 | options_.use_fsync = FLAGS_use_fsync; | |
2612 | options_.compaction_readahead_size = FLAGS_compaction_readahead_size; | |
2613 | options_.allow_mmap_reads = FLAGS_mmap_read; | |
2614 | options_.allow_mmap_writes = FLAGS_mmap_write; | |
2615 | options_.use_direct_reads = FLAGS_use_direct_reads; | |
2616 | options_.use_direct_io_for_flush_and_compaction = | |
2617 | FLAGS_use_direct_io_for_flush_and_compaction; | |
494da23a TL |
2618 | options_.recycle_log_file_num = |
2619 | static_cast<size_t>(FLAGS_recycle_log_file_num); | |
11fdf7f2 TL |
2620 | options_.target_file_size_base = FLAGS_target_file_size_base; |
2621 | options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier; | |
2622 | options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base; | |
2623 | options_.max_bytes_for_level_multiplier = | |
2624 | FLAGS_max_bytes_for_level_multiplier; | |
2625 | options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger; | |
2626 | options_.level0_slowdown_writes_trigger = | |
2627 | FLAGS_level0_slowdown_writes_trigger; | |
2628 | options_.level0_file_num_compaction_trigger = | |
2629 | FLAGS_level0_file_num_compaction_trigger; | |
2630 | options_.compression = FLAGS_compression_type_e; | |
2631 | options_.compression_opts.max_dict_bytes = | |
2632 | FLAGS_compression_max_dict_bytes; | |
2633 | options_.compression_opts.zstd_max_train_bytes = | |
2634 | FLAGS_compression_zstd_max_train_bytes; | |
2635 | options_.create_if_missing = true; | |
2636 | options_.max_manifest_file_size = FLAGS_max_manifest_file_size; | |
2637 | options_.inplace_update_support = FLAGS_in_place_update; | |
2638 | options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions); | |
2639 | options_.allow_concurrent_memtable_write = | |
2640 | FLAGS_allow_concurrent_memtable_write; | |
2641 | options_.enable_pipelined_write = FLAGS_enable_pipelined_write; | |
2642 | options_.enable_write_thread_adaptive_yield = | |
2643 | FLAGS_enable_write_thread_adaptive_yield; | |
2644 | options_.compaction_options_universal.size_ratio = | |
2645 | FLAGS_universal_size_ratio; | |
2646 | options_.compaction_options_universal.min_merge_width = | |
2647 | FLAGS_universal_min_merge_width; | |
2648 | options_.compaction_options_universal.max_merge_width = | |
2649 | FLAGS_universal_max_merge_width; | |
2650 | options_.compaction_options_universal.max_size_amplification_percent = | |
2651 | FLAGS_universal_max_size_amplification_percent; | |
494da23a | 2652 | options_.atomic_flush = FLAGS_atomic_flush; |
11fdf7f2 TL |
2653 | } else { |
2654 | #ifdef ROCKSDB_LITE | |
2655 | fprintf(stderr, "--options_file not supported in lite mode\n"); | |
2656 | exit(1); | |
2657 | #else | |
2658 | DBOptions db_options; | |
2659 | std::vector<ColumnFamilyDescriptor> cf_descriptors; | |
2660 | Status s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), | |
2661 | &db_options, &cf_descriptors); | |
2662 | if (!s.ok()) { | |
2663 | fprintf(stderr, "Unable to load options file %s --- %s\n", | |
2664 | FLAGS_options_file.c_str(), s.ToString().c_str()); | |
2665 | exit(1); | |
2666 | } | |
2667 | options_ = Options(db_options, cf_descriptors[0].options); | |
2668 | #endif // ROCKSDB_LITE | |
2669 | } | |
2670 | ||
2671 | if (FLAGS_rate_limiter_bytes_per_sec > 0) { | |
2672 | options_.rate_limiter.reset(NewGenericRateLimiter( | |
2673 | FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */, | |
2674 | 10 /* fairness */, | |
2675 | FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly | |
2676 | : RateLimiter::Mode::kWritesOnly)); | |
2677 | if (FLAGS_rate_limit_bg_reads) { | |
2678 | options_.new_table_reader_for_compaction_inputs = true; | |
2679 | } | |
2680 | } | |
7c673cae FG |
2681 | |
2682 | if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) { | |
2683 | fprintf(stderr, | |
2684 | "prefeix_size cannot be zero if memtablerep == prefix_hash\n"); | |
2685 | exit(1); | |
2686 | } | |
2687 | if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) { | |
2688 | fprintf(stderr, | |
2689 | "WARNING: prefix_size is non-zero but " | |
2690 | "memtablerep != prefix_hash\n"); | |
2691 | } | |
2692 | switch (FLAGS_rep_factory) { | |
2693 | case kSkipList: | |
2694 | // no need to do anything | |
2695 | break; | |
2696 | #ifndef ROCKSDB_LITE | |
2697 | case kHashSkipList: | |
2698 | options_.memtable_factory.reset(NewHashSkipListRepFactory(10000)); | |
2699 | break; | |
2700 | case kVectorRep: | |
2701 | options_.memtable_factory.reset(new VectorRepFactory()); | |
2702 | break; | |
2703 | #else | |
2704 | default: | |
2705 | fprintf(stderr, | |
2706 | "RocksdbLite only supports skip list mem table. Skip " | |
2707 | "--rep_factory\n"); | |
2708 | #endif // ROCKSDB_LITE | |
2709 | } | |
2710 | ||
2711 | if (FLAGS_use_full_merge_v1) { | |
2712 | options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator(); | |
2713 | } else { | |
2714 | options_.merge_operator = MergeOperators::CreatePutOperator(); | |
2715 | } | |
2716 | ||
7c673cae FG |
2717 | fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); |
2718 | ||
2719 | Status s; | |
2720 | if (FLAGS_ttl == -1) { | |
2721 | std::vector<std::string> existing_column_families; | |
2722 | s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db, | |
2723 | &existing_column_families); // ignore errors | |
2724 | if (!s.ok()) { | |
2725 | // DB doesn't exist | |
2726 | assert(existing_column_families.empty()); | |
2727 | assert(column_family_names_.empty()); | |
2728 | column_family_names_.push_back(kDefaultColumnFamilyName); | |
2729 | } else if (column_family_names_.empty()) { | |
2730 | // this is the first call to the function Open() | |
2731 | column_family_names_ = existing_column_families; | |
2732 | } else { | |
2733 | // this is a reopen. just assert that existing column_family_names are | |
2734 | // equivalent to what we remember | |
2735 | auto sorted_cfn = column_family_names_; | |
2736 | std::sort(sorted_cfn.begin(), sorted_cfn.end()); | |
2737 | std::sort(existing_column_families.begin(), | |
2738 | existing_column_families.end()); | |
2739 | if (sorted_cfn != existing_column_families) { | |
2740 | fprintf(stderr, | |
2741 | "Expected column families differ from the existing:\n"); | |
2742 | printf("Expected: {"); | |
2743 | for (auto cf : sorted_cfn) { | |
2744 | printf("%s ", cf.c_str()); | |
2745 | } | |
2746 | printf("}\n"); | |
2747 | printf("Existing: {"); | |
2748 | for (auto cf : existing_column_families) { | |
2749 | printf("%s ", cf.c_str()); | |
2750 | } | |
2751 | printf("}\n"); | |
2752 | } | |
2753 | assert(sorted_cfn == existing_column_families); | |
2754 | } | |
2755 | std::vector<ColumnFamilyDescriptor> cf_descriptors; | |
2756 | for (auto name : column_family_names_) { | |
2757 | if (name != kDefaultColumnFamilyName) { | |
2758 | new_column_family_name_ = | |
2759 | std::max(new_column_family_name_.load(), std::stoi(name) + 1); | |
2760 | } | |
2761 | cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); | |
2762 | } | |
2763 | while (cf_descriptors.size() < (size_t)FLAGS_column_families) { | |
2764 | std::string name = ToString(new_column_family_name_.load()); | |
2765 | new_column_family_name_++; | |
2766 | cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); | |
2767 | column_family_names_.push_back(name); | |
2768 | } | |
2769 | options_.listeners.clear(); | |
2770 | options_.listeners.emplace_back( | |
11fdf7f2 | 2771 | new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors)); |
7c673cae | 2772 | options_.create_missing_column_families = true; |
11fdf7f2 | 2773 | if (!FLAGS_use_txn) { |
494da23a TL |
2774 | if (db_preload_finished_.load() && FLAGS_read_only) { |
2775 | s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, cf_descriptors, | |
2776 | &column_families_, &db_); | |
2777 | } else { | |
2778 | s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, | |
2779 | &column_families_, &db_); | |
2780 | } | |
11fdf7f2 TL |
2781 | } else { |
2782 | #ifndef ROCKSDB_LITE | |
2783 | TransactionDBOptions txn_db_options; | |
2784 | // For the moment it is sufficient to test WRITE_PREPARED policy | |
2785 | txn_db_options.write_policy = TxnDBWritePolicy::WRITE_PREPARED; | |
2786 | s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, | |
2787 | cf_descriptors, &column_families_, &txn_db_); | |
2788 | db_ = txn_db_; | |
2789 | // after a crash, rollback to commit recovered transactions | |
2790 | std::vector<Transaction*> trans; | |
2791 | txn_db_->GetAllPreparedTransactions(&trans); | |
2792 | Random rand(static_cast<uint32_t>(FLAGS_seed)); | |
2793 | for (auto txn : trans) { | |
2794 | if (rand.OneIn(2)) { | |
2795 | s = txn->Commit(); | |
2796 | assert(s.ok()); | |
2797 | } else { | |
2798 | s = txn->Rollback(); | |
2799 | assert(s.ok()); | |
2800 | } | |
2801 | delete txn; | |
2802 | } | |
2803 | trans.clear(); | |
2804 | txn_db_->GetAllPreparedTransactions(&trans); | |
2805 | assert(trans.size() == 0); | |
2806 | #endif | |
2807 | } | |
7c673cae FG |
2808 | assert(!s.ok() || column_families_.size() == |
2809 | static_cast<size_t>(FLAGS_column_families)); | |
2810 | } else { | |
2811 | #ifndef ROCKSDB_LITE | |
2812 | DBWithTTL* db_with_ttl; | |
2813 | s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); | |
2814 | db_ = db_with_ttl; | |
2815 | #else | |
2816 | fprintf(stderr, "TTL is not supported in RocksDBLite\n"); | |
2817 | exit(1); | |
2818 | #endif | |
2819 | } | |
2820 | if (!s.ok()) { | |
2821 | fprintf(stderr, "open error: %s\n", s.ToString().c_str()); | |
2822 | exit(1); | |
2823 | } | |
2824 | } | |
2825 | ||
2826 | void Reopen() { | |
2827 | for (auto cf : column_families_) { | |
2828 | delete cf; | |
2829 | } | |
2830 | column_families_.clear(); | |
2831 | delete db_; | |
2832 | db_ = nullptr; | |
11fdf7f2 TL |
2833 | #ifndef ROCKSDB_LITE |
2834 | txn_db_ = nullptr; | |
2835 | #endif | |
7c673cae FG |
2836 | |
2837 | num_times_reopened_++; | |
2838 | auto now = FLAGS_env->NowMicros(); | |
2839 | fprintf(stdout, "%s Reopening database for the %dth time\n", | |
2840 | FLAGS_env->TimeToString(now/1000000).c_str(), | |
2841 | num_times_reopened_); | |
2842 | Open(); | |
2843 | } | |
2844 | ||
2845 | void PrintStatistics() { | |
2846 | if (dbstats) { | |
2847 | fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); | |
2848 | } | |
2849 | } | |
2850 | ||
7c673cae FG |
2851 | std::shared_ptr<Cache> cache_; |
2852 | std::shared_ptr<Cache> compressed_cache_; | |
2853 | std::shared_ptr<const FilterPolicy> filter_policy_; | |
2854 | DB* db_; | |
11fdf7f2 TL |
2855 | #ifndef ROCKSDB_LITE |
2856 | TransactionDB* txn_db_; | |
2857 | #endif | |
7c673cae FG |
2858 | Options options_; |
2859 | std::vector<ColumnFamilyHandle*> column_families_; | |
2860 | std::vector<std::string> column_family_names_; | |
2861 | std::atomic<int> new_column_family_name_; | |
2862 | int num_times_reopened_; | |
2863 | std::unordered_map<std::string, std::vector<std::string>> options_table_; | |
2864 | std::vector<std::string> options_index_; | |
494da23a | 2865 | std::atomic<bool> db_preload_finished_; |
7c673cae FG |
2866 | }; |
2867 | ||
11fdf7f2 TL |
2868 | class NonBatchedOpsStressTest : public StressTest { |
2869 | public: | |
2870 | NonBatchedOpsStressTest() {} | |
7c673cae | 2871 | |
11fdf7f2 | 2872 | virtual ~NonBatchedOpsStressTest() {} |
7c673cae | 2873 | |
11fdf7f2 TL |
2874 | virtual void VerifyDb(ThreadState* thread) const { |
2875 | ReadOptions options(FLAGS_verify_checksum, true); | |
2876 | auto shared = thread->shared; | |
2877 | const int64_t max_key = shared->GetMaxKey(); | |
2878 | const int64_t keys_per_thread = max_key / shared->GetNumThreads(); | |
2879 | int64_t start = keys_per_thread * thread->tid; | |
2880 | int64_t end = start + keys_per_thread; | |
2881 | if (thread->tid == shared->GetNumThreads() - 1) { | |
2882 | end = max_key; | |
2883 | } | |
2884 | for (size_t cf = 0; cf < column_families_.size(); ++cf) { | |
2885 | if (thread->shared->HasVerificationFailedYet()) { | |
2886 | break; | |
2887 | } | |
2888 | if (!thread->rand.OneIn(2)) { | |
2889 | // Use iterator to verify this range | |
494da23a | 2890 | std::unique_ptr<Iterator> iter( |
11fdf7f2 TL |
2891 | db_->NewIterator(options, column_families_[cf])); |
2892 | iter->Seek(Key(start)); | |
2893 | for (auto i = start; i < end; i++) { | |
2894 | if (thread->shared->HasVerificationFailedYet()) { | |
2895 | break; | |
2896 | } | |
2897 | // TODO(ljin): update "long" to uint64_t | |
2898 | // Reseek when the prefix changes | |
2899 | if (i % (static_cast<int64_t>(1) << 8 * (8 - FLAGS_prefix_size)) == | |
2900 | 0) { | |
2901 | iter->Seek(Key(i)); | |
2902 | } | |
2903 | std::string from_db; | |
2904 | std::string keystr = Key(i); | |
2905 | Slice k = keystr; | |
2906 | Status s = iter->status(); | |
2907 | if (iter->Valid()) { | |
2908 | if (iter->key().compare(k) > 0) { | |
2909 | s = Status::NotFound(Slice()); | |
2910 | } else if (iter->key().compare(k) == 0) { | |
2911 | from_db = iter->value().ToString(); | |
2912 | iter->Next(); | |
2913 | } else if (iter->key().compare(k) < 0) { | |
2914 | VerificationAbort(shared, "An out of range key was found", | |
2915 | static_cast<int>(cf), i); | |
2916 | } | |
2917 | } else { | |
2918 | // The iterator found no value for the key in question, so do not | |
2919 | // move to the next item in the iterator | |
2920 | s = Status::NotFound(Slice()); | |
2921 | } | |
2922 | VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s, | |
2923 | true); | |
2924 | if (from_db.length()) { | |
2925 | PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i), | |
2926 | from_db.data(), from_db.length()); | |
2927 | } | |
2928 | } | |
2929 | } else { | |
2930 | // Use Get to verify this range | |
2931 | for (auto i = start; i < end; i++) { | |
2932 | if (thread->shared->HasVerificationFailedYet()) { | |
2933 | break; | |
2934 | } | |
2935 | std::string from_db; | |
2936 | std::string keystr = Key(i); | |
2937 | Slice k = keystr; | |
2938 | Status s = db_->Get(options, column_families_[cf], k, &from_db); | |
2939 | VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s, | |
2940 | true); | |
2941 | if (from_db.length()) { | |
2942 | PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i), | |
2943 | from_db.data(), from_db.length()); | |
2944 | } | |
2945 | } | |
2946 | } | |
2947 | } | |
7c673cae | 2948 | } |
11fdf7f2 TL |
2949 | |
2950 | virtual void MaybeClearOneColumnFamily(ThreadState* thread) { | |
2951 | if (FLAGS_clear_column_family_one_in != 0 && FLAGS_column_families > 1) { | |
2952 | if (thread->rand.OneIn(FLAGS_clear_column_family_one_in)) { | |
2953 | // drop column family and then create it again (can't drop default) | |
2954 | int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1; | |
2955 | std::string new_name = | |
2956 | ToString(new_column_family_name_.fetch_add(1)); | |
2957 | { | |
2958 | MutexLock l(thread->shared->GetMutex()); | |
2959 | fprintf( | |
2960 | stdout, | |
2961 | "[CF %d] Dropping and recreating column family. new name: %s\n", | |
2962 | cf, new_name.c_str()); | |
2963 | } | |
2964 | thread->shared->LockColumnFamily(cf); | |
2965 | Status s = db_->DropColumnFamily(column_families_[cf]); | |
2966 | delete column_families_[cf]; | |
2967 | if (!s.ok()) { | |
2968 | fprintf(stderr, "dropping column family error: %s\n", | |
2969 | s.ToString().c_str()); | |
2970 | std::terminate(); | |
2971 | } | |
2972 | s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name, | |
2973 | &column_families_[cf]); | |
2974 | column_family_names_[cf] = new_name; | |
2975 | thread->shared->ClearColumnFamily(cf); | |
2976 | if (!s.ok()) { | |
2977 | fprintf(stderr, "creating column family error: %s\n", | |
2978 | s.ToString().c_str()); | |
2979 | std::terminate(); | |
2980 | } | |
2981 | thread->shared->UnlockColumnFamily(cf); | |
2982 | } | |
2983 | } | |
7c673cae | 2984 | } |
7c673cae | 2985 | |
11fdf7f2 | 2986 | virtual bool ShouldAcquireMutexOnKey() const { return true; } |
7c673cae | 2987 | |
11fdf7f2 TL |
2988 | virtual Status TestGet(ThreadState* thread, |
2989 | const ReadOptions& read_opts, | |
2990 | const std::vector<int>& rand_column_families, | |
2991 | const std::vector<int64_t>& rand_keys) { | |
2992 | auto cfh = column_families_[rand_column_families[0]]; | |
2993 | std::string key_str = Key(rand_keys[0]); | |
2994 | Slice key = key_str; | |
2995 | std::string from_db; | |
2996 | Status s = db_->Get(read_opts, cfh, key, &from_db); | |
2997 | if (s.ok()) { | |
2998 | // found case | |
2999 | thread->stats.AddGets(1, 1); | |
3000 | } else if (s.IsNotFound()) { | |
3001 | // not found case | |
3002 | thread->stats.AddGets(1, 0); | |
3003 | } else { | |
3004 | // errors case | |
3005 | thread->stats.AddErrors(1); | |
3006 | } | |
3007 | return s; | |
7c673cae | 3008 | } |
11fdf7f2 TL |
3009 | |
3010 | virtual Status TestPrefixScan(ThreadState* thread, | |
3011 | const ReadOptions& read_opts, | |
3012 | const std::vector<int>& rand_column_families, | |
3013 | const std::vector<int64_t>& rand_keys) { | |
3014 | auto cfh = column_families_[rand_column_families[0]]; | |
3015 | std::string key_str = Key(rand_keys[0]); | |
3016 | Slice key = key_str; | |
3017 | Slice prefix = Slice(key.data(), FLAGS_prefix_size); | |
3018 | ||
3019 | std::string upper_bound; | |
3020 | Slice ub_slice; | |
3021 | ReadOptions ro_copy = read_opts; | |
3022 | if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) { | |
3023 | // For half of the time, set the upper bound to the next prefix | |
3024 | ub_slice = Slice(upper_bound); | |
3025 | ro_copy.iterate_upper_bound = &ub_slice; | |
3026 | } | |
3027 | ||
3028 | Iterator* iter = db_->NewIterator(ro_copy, cfh); | |
494da23a | 3029 | long count = 0; |
11fdf7f2 TL |
3030 | for (iter->Seek(prefix); |
3031 | iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { | |
3032 | ++count; | |
3033 | } | |
494da23a | 3034 | assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8))); |
11fdf7f2 TL |
3035 | Status s = iter->status(); |
3036 | if (iter->status().ok()) { | |
494da23a | 3037 | thread->stats.AddPrefixes(1, count); |
11fdf7f2 TL |
3038 | } else { |
3039 | thread->stats.AddErrors(1); | |
3040 | } | |
3041 | delete iter; | |
3042 | return s; | |
3043 | } | |
3044 | ||
3045 | virtual Status TestPut(ThreadState* thread, | |
3046 | WriteOptions& write_opts, const ReadOptions& read_opts, | |
3047 | const std::vector<int>& rand_column_families, | |
3048 | const std::vector<int64_t>& rand_keys, | |
3049 | char (&value) [100], std::unique_ptr<MutexLock>& lock) { | |
3050 | auto shared = thread->shared; | |
3051 | int64_t max_key = shared->GetMaxKey(); | |
3052 | int64_t rand_key = rand_keys[0]; | |
3053 | int rand_column_family = rand_column_families[0]; | |
3054 | while (!shared->AllowsOverwrite(rand_key) && | |
3055 | (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) { | |
3056 | lock.reset(); | |
3057 | rand_key = thread->rand.Next() % max_key; | |
3058 | rand_column_family = thread->rand.Next() % FLAGS_column_families; | |
3059 | lock.reset(new MutexLock( | |
3060 | shared->GetMutexForKey(rand_column_family, rand_key))); | |
3061 | } | |
3062 | ||
3063 | std::string key_str = Key(rand_key); | |
3064 | Slice key = key_str; | |
3065 | ColumnFamilyHandle* cfh = column_families_[rand_column_family]; | |
3066 | ||
3067 | if (FLAGS_verify_before_write) { | |
3068 | std::string key_str2 = Key(rand_key); | |
3069 | Slice k = key_str2; | |
3070 | std::string from_db; | |
3071 | Status s = db_->Get(read_opts, cfh, k, &from_db); | |
3072 | if (!VerifyValue(rand_column_family, rand_key, read_opts, shared, | |
3073 | from_db, s, true)) { | |
3074 | return s; | |
3075 | } | |
3076 | } | |
3077 | uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; | |
3078 | size_t sz = GenerateValue(value_base, value, sizeof(value)); | |
3079 | Slice v(value, sz); | |
3080 | shared->Put(rand_column_family, rand_key, value_base, true /* pending */); | |
3081 | Status s; | |
3082 | if (FLAGS_use_merge) { | |
3083 | if (!FLAGS_use_txn) { | |
3084 | s = db_->Merge(write_opts, cfh, key, v); | |
3085 | } else { | |
3086 | #ifndef ROCKSDB_LITE | |
3087 | Transaction* txn; | |
3088 | s = NewTxn(write_opts, &txn); | |
3089 | if (s.ok()) { | |
3090 | s = txn->Merge(cfh, key, v); | |
3091 | if (s.ok()) { | |
3092 | s = CommitTxn(txn); | |
3093 | } | |
3094 | } | |
3095 | #endif | |
3096 | } | |
3097 | } else { | |
3098 | if (!FLAGS_use_txn) { | |
3099 | s = db_->Put(write_opts, cfh, key, v); | |
3100 | } else { | |
3101 | #ifndef ROCKSDB_LITE | |
3102 | Transaction* txn; | |
3103 | s = NewTxn(write_opts, &txn); | |
3104 | if (s.ok()) { | |
3105 | s = txn->Put(cfh, key, v); | |
3106 | if (s.ok()) { | |
3107 | s = CommitTxn(txn); | |
3108 | } | |
3109 | } | |
3110 | #endif | |
3111 | } | |
3112 | } | |
3113 | shared->Put(rand_column_family, rand_key, value_base, false /* pending */); | |
3114 | if (!s.ok()) { | |
3115 | fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); | |
3116 | std::terminate(); | |
3117 | } | |
3118 | thread->stats.AddBytesForWrites(1, sz); | |
3119 | PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), | |
3120 | value, sz); | |
3121 | return s; | |
3122 | } | |
3123 | ||
3124 | virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts, | |
3125 | const std::vector<int>& rand_column_families, | |
3126 | const std::vector<int64_t>& rand_keys, | |
3127 | std::unique_ptr<MutexLock>& lock) { | |
3128 | int64_t rand_key = rand_keys[0]; | |
3129 | int rand_column_family = rand_column_families[0]; | |
3130 | auto shared = thread->shared; | |
3131 | int64_t max_key = shared->GetMaxKey(); | |
3132 | ||
3133 | // OPERATION delete | |
3134 | // If the chosen key does not allow overwrite and it does not exist, | |
3135 | // choose another key. | |
3136 | while (!shared->AllowsOverwrite(rand_key) && | |
3137 | !shared->Exists(rand_column_family, rand_key)) { | |
3138 | lock.reset(); | |
3139 | rand_key = thread->rand.Next() % max_key; | |
3140 | rand_column_family = thread->rand.Next() % FLAGS_column_families; | |
3141 | lock.reset(new MutexLock( | |
3142 | shared->GetMutexForKey(rand_column_family, rand_key))); | |
3143 | } | |
3144 | ||
3145 | std::string key_str = Key(rand_key); | |
3146 | Slice key = key_str; | |
3147 | auto cfh = column_families_[rand_column_family]; | |
3148 | ||
3149 | // Use delete if the key may be overwritten and a single deletion | |
3150 | // otherwise. | |
3151 | Status s; | |
3152 | if (shared->AllowsOverwrite(rand_key)) { | |
3153 | shared->Delete(rand_column_family, rand_key, true /* pending */); | |
3154 | if (!FLAGS_use_txn) { | |
3155 | s = db_->Delete(write_opts, cfh, key); | |
3156 | } else { | |
3157 | #ifndef ROCKSDB_LITE | |
3158 | Transaction* txn; | |
3159 | s = NewTxn(write_opts, &txn); | |
3160 | if (s.ok()) { | |
3161 | s = txn->Delete(cfh, key); | |
3162 | if (s.ok()) { | |
3163 | s = CommitTxn(txn); | |
3164 | } | |
3165 | } | |
3166 | #endif | |
3167 | } | |
3168 | shared->Delete(rand_column_family, rand_key, false /* pending */); | |
3169 | thread->stats.AddDeletes(1); | |
3170 | if (!s.ok()) { | |
3171 | fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); | |
3172 | std::terminate(); | |
3173 | } | |
3174 | } else { | |
3175 | shared->SingleDelete(rand_column_family, rand_key, true /* pending */); | |
3176 | if (!FLAGS_use_txn) { | |
3177 | s = db_->SingleDelete(write_opts, cfh, key); | |
3178 | } else { | |
3179 | #ifndef ROCKSDB_LITE | |
3180 | Transaction* txn; | |
3181 | s = NewTxn(write_opts, &txn); | |
3182 | if (s.ok()) { | |
3183 | s = txn->SingleDelete(cfh, key); | |
3184 | if (s.ok()) { | |
3185 | s = CommitTxn(txn); | |
3186 | } | |
3187 | } | |
3188 | #endif | |
3189 | } | |
3190 | shared->SingleDelete(rand_column_family, rand_key, false /* pending */); | |
3191 | thread->stats.AddSingleDeletes(1); | |
3192 | if (!s.ok()) { | |
3193 | fprintf(stderr, "single delete error: %s\n", | |
3194 | s.ToString().c_str()); | |
3195 | std::terminate(); | |
3196 | } | |
3197 | } | |
3198 | return s; | |
3199 | } | |
3200 | ||
3201 | virtual Status TestDeleteRange(ThreadState* thread, | |
3202 | WriteOptions& write_opts, | |
3203 | const std::vector<int>& rand_column_families, | |
3204 | const std::vector<int64_t>& rand_keys, | |
3205 | std::unique_ptr<MutexLock>& lock) { | |
3206 | // OPERATION delete range | |
3207 | std::vector<std::unique_ptr<MutexLock>> range_locks; | |
3208 | // delete range does not respect disallowed overwrites. the keys for | |
3209 | // which overwrites are disallowed are randomly distributed so it | |
3210 | // could be expensive to find a range where each key allows | |
3211 | // overwrites. | |
3212 | int64_t rand_key = rand_keys[0]; | |
3213 | int rand_column_family = rand_column_families[0]; | |
3214 | auto shared = thread->shared; | |
3215 | int64_t max_key = shared->GetMaxKey(); | |
3216 | if (rand_key > max_key - FLAGS_range_deletion_width) { | |
3217 | lock.reset(); | |
3218 | rand_key = thread->rand.Next() % | |
3219 | (max_key - FLAGS_range_deletion_width + 1); | |
3220 | range_locks.emplace_back(new MutexLock( | |
3221 | shared->GetMutexForKey(rand_column_family, rand_key))); | |
3222 | } else { | |
3223 | range_locks.emplace_back(std::move(lock)); | |
3224 | } | |
3225 | for (int j = 1; j < FLAGS_range_deletion_width; ++j) { | |
3226 | if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) { | |
3227 | range_locks.emplace_back(new MutexLock( | |
3228 | shared->GetMutexForKey(rand_column_family, rand_key + j))); | |
3229 | } | |
3230 | } | |
3231 | shared->DeleteRange(rand_column_family, rand_key, | |
3232 | rand_key + FLAGS_range_deletion_width, | |
3233 | true /* pending */); | |
3234 | ||
3235 | std::string keystr = Key(rand_key); | |
3236 | Slice key = keystr; | |
3237 | auto cfh = column_families_[rand_column_family]; | |
3238 | std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width); | |
3239 | Slice end_key = end_keystr; | |
3240 | Status s = db_->DeleteRange(write_opts, cfh, key, end_key); | |
3241 | if (!s.ok()) { | |
3242 | fprintf(stderr, "delete range error: %s\n", | |
3243 | s.ToString().c_str()); | |
3244 | std::terminate(); | |
3245 | } | |
3246 | int covered = shared->DeleteRange( | |
3247 | rand_column_family, rand_key, | |
3248 | rand_key + FLAGS_range_deletion_width, false /* pending */); | |
3249 | thread->stats.AddRangeDeletions(1); | |
3250 | thread->stats.AddCoveredByRangeDeletions(covered); | |
3251 | return s; | |
3252 | } | |
3253 | ||
3254 | #ifdef ROCKSDB_LITE | |
3255 | virtual void TestIngestExternalFile( | |
3256 | ThreadState* /* thread */, | |
3257 | const std::vector<int>& /* rand_column_families */, | |
3258 | const std::vector<int64_t>& /* rand_keys */, | |
3259 | std::unique_ptr<MutexLock>& /* lock */) { | |
3260 | assert(false); | |
3261 | fprintf(stderr, | |
3262 | "RocksDB lite does not support " | |
3263 | "TestIngestExternalFile\n"); | |
3264 | std::terminate(); | |
3265 | } | |
3266 | #else | |
3267 | virtual void TestIngestExternalFile( | |
3268 | ThreadState* thread, const std::vector<int>& rand_column_families, | |
3269 | const std::vector<int64_t>& rand_keys, std::unique_ptr<MutexLock>& lock) { | |
3270 | const std::string sst_filename = | |
3271 | FLAGS_db + "/." + ToString(thread->tid) + ".sst"; | |
3272 | Status s; | |
3273 | if (FLAGS_env->FileExists(sst_filename).ok()) { | |
3274 | // Maybe we terminated abnormally before, so cleanup to give this file | |
3275 | // ingestion a clean slate | |
3276 | s = FLAGS_env->DeleteFile(sst_filename); | |
3277 | } | |
3278 | ||
3279 | SstFileWriter sst_file_writer(EnvOptions(), options_); | |
3280 | if (s.ok()) { | |
3281 | s = sst_file_writer.Open(sst_filename); | |
3282 | } | |
3283 | int64_t key_base = rand_keys[0]; | |
3284 | int column_family = rand_column_families[0]; | |
3285 | std::vector<std::unique_ptr<MutexLock> > range_locks; | |
3286 | std::vector<uint32_t> values; | |
3287 | SharedState* shared = thread->shared; | |
3288 | ||
3289 | // Grab locks, set pending state on expected values, and add keys | |
3290 | for (int64_t key = key_base; | |
3291 | s.ok() && key < std::min(key_base + FLAGS_ingest_external_file_width, | |
3292 | shared->GetMaxKey()); | |
3293 | ++key) { | |
3294 | if (key == key_base) { | |
3295 | range_locks.emplace_back(std::move(lock)); | |
3296 | } else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) { | |
3297 | range_locks.emplace_back( | |
3298 | new MutexLock(shared->GetMutexForKey(column_family, key))); | |
3299 | } | |
3300 | ||
3301 | uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; | |
3302 | values.push_back(value_base); | |
3303 | shared->Put(column_family, key, value_base, true /* pending */); | |
3304 | ||
3305 | char value[100]; | |
3306 | size_t value_len = GenerateValue(value_base, value, sizeof(value)); | |
3307 | auto key_str = Key(key); | |
3308 | s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len)); | |
3309 | } | |
3310 | ||
3311 | if (s.ok()) { | |
3312 | s = sst_file_writer.Finish(); | |
3313 | } | |
3314 | if (s.ok()) { | |
3315 | s = db_->IngestExternalFile(column_families_[column_family], | |
3316 | {sst_filename}, IngestExternalFileOptions()); | |
3317 | } | |
3318 | if (!s.ok()) { | |
3319 | fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str()); | |
3320 | std::terminate(); | |
3321 | } | |
3322 | int64_t key = key_base; | |
3323 | for (int32_t value : values) { | |
3324 | shared->Put(column_family, key, value, false /* pending */); | |
3325 | ++key; | |
3326 | } | |
3327 | } | |
3328 | #endif // ROCKSDB_LITE | |
3329 | ||
3330 | bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/, | |
3331 | SharedState* shared, const std::string& value_from_db, | |
3332 | Status s, bool strict = false) const { | |
3333 | if (shared->HasVerificationFailedYet()) { | |
3334 | return false; | |
3335 | } | |
3336 | // compare value_from_db with the value in the shared state | |
3337 | char value[kValueMaxLen]; | |
3338 | uint32_t value_base = shared->Get(cf, key); | |
3339 | if (value_base == SharedState::UNKNOWN_SENTINEL) { | |
3340 | return true; | |
3341 | } | |
3342 | if (value_base == SharedState::DELETION_SENTINEL && !strict) { | |
3343 | return true; | |
3344 | } | |
3345 | ||
3346 | if (s.ok()) { | |
3347 | if (value_base == SharedState::DELETION_SENTINEL) { | |
3348 | VerificationAbort(shared, "Unexpected value found", cf, key); | |
3349 | return false; | |
3350 | } | |
3351 | size_t sz = GenerateValue(value_base, value, sizeof(value)); | |
3352 | if (value_from_db.length() != sz) { | |
3353 | VerificationAbort(shared, "Length of value read is not equal", cf, key); | |
3354 | return false; | |
3355 | } | |
3356 | if (memcmp(value_from_db.data(), value, sz) != 0) { | |
3357 | VerificationAbort(shared, "Contents of value read don't match", cf, | |
3358 | key); | |
3359 | return false; | |
3360 | } | |
3361 | } else { | |
3362 | if (value_base != SharedState::DELETION_SENTINEL) { | |
3363 | VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key); | |
3364 | return false; | |
3365 | } | |
3366 | } | |
3367 | return true; | |
3368 | } | |
3369 | }; | |
3370 | ||
3371 | class BatchedOpsStressTest : public StressTest { | |
3372 | public: | |
3373 | BatchedOpsStressTest() {} | |
3374 | virtual ~BatchedOpsStressTest() {} | |
3375 | ||
3376 | // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ... | |
3377 | // ("9"+K, "9"+V) in DB atomically i.e in a single batch. | |
3378 | // Also refer BatchedOpsStressTest::TestGet | |
3379 | virtual Status TestPut(ThreadState* thread, | |
3380 | WriteOptions& write_opts, const ReadOptions& /* read_opts */, | |
3381 | const std::vector<int>& rand_column_families, const std::vector<int64_t>& rand_keys, | |
3382 | char (&value)[100], std::unique_ptr<MutexLock>& /* lock */) { | |
3383 | uint32_t value_base = | |
3384 | thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL; | |
3385 | size_t sz = GenerateValue(value_base, value, sizeof(value)); | |
3386 | Slice v(value, sz); | |
3387 | std::string keys[10] = {"9", "8", "7", "6", "5", | |
3388 | "4", "3", "2", "1", "0"}; | |
3389 | std::string values[10] = {"9", "8", "7", "6", "5", | |
3390 | "4", "3", "2", "1", "0"}; | |
3391 | Slice value_slices[10]; | |
3392 | WriteBatch batch; | |
3393 | Status s; | |
3394 | auto cfh = column_families_[rand_column_families[0]]; | |
3395 | std::string key_str = Key(rand_keys[0]); | |
3396 | for (int i = 0; i < 10; i++) { | |
3397 | keys[i] += key_str; | |
3398 | values[i] += v.ToString(); | |
3399 | value_slices[i] = values[i]; | |
3400 | if (FLAGS_use_merge) { | |
3401 | batch.Merge(cfh, keys[i], value_slices[i]); | |
3402 | } else { | |
3403 | batch.Put(cfh, keys[i], value_slices[i]); | |
3404 | } | |
3405 | } | |
3406 | ||
3407 | s = db_->Write(write_opts, &batch); | |
3408 | if (!s.ok()) { | |
3409 | fprintf(stderr, "multiput error: %s\n", s.ToString().c_str()); | |
3410 | thread->stats.AddErrors(1); | |
3411 | } else { | |
3412 | // we did 10 writes each of size sz + 1 | |
3413 | thread->stats.AddBytesForWrites(10, (sz + 1) * 10); | |
3414 | } | |
3415 | ||
3416 | return s; | |
3417 | } | |
3418 | ||
3419 | // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K) | |
3420 | // in DB atomically i.e in a single batch. Also refer MultiGet. | |
3421 | virtual Status TestDelete(ThreadState* thread, WriteOptions& writeoptions, | |
3422 | const std::vector<int>& rand_column_families, | |
3423 | const std::vector<int64_t>& rand_keys, | |
3424 | std::unique_ptr<MutexLock>& /* lock */) { | |
3425 | std::string keys[10] = {"9", "7", "5", "3", "1", | |
3426 | "8", "6", "4", "2", "0"}; | |
3427 | ||
3428 | WriteBatch batch; | |
3429 | Status s; | |
3430 | auto cfh = column_families_[rand_column_families[0]]; | |
3431 | std::string key_str = Key(rand_keys[0]); | |
3432 | for (int i = 0; i < 10; i++) { | |
3433 | keys[i] += key_str; | |
3434 | batch.Delete(cfh, keys[i]); | |
3435 | } | |
3436 | ||
3437 | s = db_->Write(writeoptions, &batch); | |
3438 | if (!s.ok()) { | |
3439 | fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str()); | |
3440 | thread->stats.AddErrors(1); | |
3441 | } else { | |
3442 | thread->stats.AddDeletes(10); | |
3443 | } | |
3444 | ||
3445 | return s; | |
3446 | } | |
3447 | ||
3448 | virtual Status TestDeleteRange(ThreadState* /* thread */, | |
3449 | WriteOptions& /* write_opts */, | |
3450 | const std::vector<int>& /* rand_column_families */, | |
3451 | const std::vector<int64_t>& /* rand_keys */, | |
3452 | std::unique_ptr<MutexLock>& /* lock */) { | |
3453 | assert(false); | |
3454 | return Status::NotSupported("BatchedOpsStressTest does not support " | |
3455 | "TestDeleteRange"); | |
3456 | } | |
3457 | ||
3458 | virtual void TestIngestExternalFile( | |
3459 | ThreadState* /* thread */, | |
3460 | const std::vector<int>& /* rand_column_families */, | |
3461 | const std::vector<int64_t>& /* rand_keys */, | |
3462 | std::unique_ptr<MutexLock>& /* lock */) { | |
3463 | assert(false); | |
3464 | fprintf(stderr, | |
3465 | "BatchedOpsStressTest does not support " | |
3466 | "TestIngestExternalFile\n"); | |
3467 | std::terminate(); | |
3468 | } | |
3469 | ||
3470 | // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K | |
3471 | // in the same snapshot, and verifies that all the values are of the form | |
3472 | // "0"+V, "1"+V,..."9"+V. | |
3473 | // ASSUMES that BatchedOpsStressTest::TestPut was used to put (K, V) into | |
3474 | // the DB. | |
3475 | virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions, | |
3476 | const std::vector<int>& rand_column_families, | |
3477 | const std::vector<int64_t>& rand_keys) { | |
3478 | std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; | |
3479 | Slice key_slices[10]; | |
3480 | std::string values[10]; | |
3481 | ReadOptions readoptionscopy = readoptions; | |
3482 | readoptionscopy.snapshot = db_->GetSnapshot(); | |
3483 | std::string key_str = Key(rand_keys[0]); | |
3484 | Slice key = key_str; | |
3485 | auto cfh = column_families_[rand_column_families[0]]; | |
3486 | std::string from_db; | |
3487 | Status s; | |
3488 | for (int i = 0; i < 10; i++) { | |
3489 | keys[i] += key.ToString(); | |
3490 | key_slices[i] = keys[i]; | |
3491 | s = db_->Get(readoptionscopy, cfh, key_slices[i], &from_db); | |
3492 | if (!s.ok() && !s.IsNotFound()) { | |
3493 | fprintf(stderr, "get error: %s\n", s.ToString().c_str()); | |
3494 | values[i] = ""; | |
3495 | thread->stats.AddErrors(1); | |
3496 | // we continue after error rather than exiting so that we can | |
3497 | // find more errors if any | |
3498 | } else if (s.IsNotFound()) { | |
3499 | values[i] = ""; | |
3500 | thread->stats.AddGets(1, 0); | |
3501 | } else { | |
3502 | values[i] = from_db; | |
3503 | ||
3504 | char expected_prefix = (keys[i])[0]; | |
3505 | char actual_prefix = (values[i])[0]; | |
3506 | if (actual_prefix != expected_prefix) { | |
3507 | fprintf(stderr, "error expected prefix = %c actual = %c\n", | |
3508 | expected_prefix, actual_prefix); | |
3509 | } | |
3510 | (values[i])[0] = ' '; // blank out the differing character | |
3511 | thread->stats.AddGets(1, 1); | |
3512 | } | |
3513 | } | |
3514 | db_->ReleaseSnapshot(readoptionscopy.snapshot); | |
3515 | ||
3516 | // Now that we retrieved all values, check that they all match | |
3517 | for (int i = 1; i < 10; i++) { | |
3518 | if (values[i] != values[0]) { | |
3519 | fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n", | |
3520 | key.ToString(true).c_str(), StringToHex(values[0]).c_str(), | |
3521 | StringToHex(values[i]).c_str()); | |
3522 | // we continue after error rather than exiting so that we can | |
3523 | // find more errors if any | |
3524 | } | |
3525 | } | |
3526 | ||
3527 | return s; | |
3528 | } | |
3529 | ||
3530 | // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P | |
3531 | // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes | |
3532 | // of the key. Each of these 10 scans returns a series of values; | |
3533 | // each series should be the same length, and it is verified for each | |
3534 | // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V. | |
3535 | // ASSUMES that MultiPut was used to put (K, V) | |
3536 | virtual Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions, | |
3537 | const std::vector<int>& rand_column_families, | |
3538 | const std::vector<int64_t>& rand_keys) { | |
3539 | std::string key_str = Key(rand_keys[0]); | |
3540 | Slice key = key_str; | |
3541 | auto cfh = column_families_[rand_column_families[0]]; | |
3542 | std::string prefixes[10] = {"0", "1", "2", "3", "4", | |
3543 | "5", "6", "7", "8", "9"}; | |
3544 | Slice prefix_slices[10]; | |
3545 | ReadOptions readoptionscopy[10]; | |
3546 | const Snapshot* snapshot = db_->GetSnapshot(); | |
3547 | Iterator* iters[10]; | |
3548 | std::string upper_bounds[10]; | |
3549 | Slice ub_slices[10]; | |
3550 | Status s = Status::OK(); | |
3551 | for (int i = 0; i < 10; i++) { | |
3552 | prefixes[i] += key.ToString(); | |
3553 | prefixes[i].resize(FLAGS_prefix_size); | |
3554 | prefix_slices[i] = Slice(prefixes[i]); | |
3555 | readoptionscopy[i] = readoptions; | |
3556 | readoptionscopy[i].snapshot = snapshot; | |
3557 | if (thread->rand.OneIn(2) && | |
3558 | GetNextPrefix(prefix_slices[i], &(upper_bounds[i]))) { | |
3559 | // For half of the time, set the upper bound to the next prefix | |
3560 | ub_slices[i] = Slice(upper_bounds[i]); | |
3561 | readoptionscopy[i].iterate_upper_bound = &(ub_slices[i]); | |
3562 | } | |
3563 | iters[i] = db_->NewIterator(readoptionscopy[i], cfh); | |
3564 | iters[i]->Seek(prefix_slices[i]); | |
3565 | } | |
3566 | ||
494da23a | 3567 | long count = 0; |
11fdf7f2 TL |
3568 | while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) { |
3569 | count++; | |
3570 | std::string values[10]; | |
3571 | // get list of all values for this iteration | |
3572 | for (int i = 0; i < 10; i++) { | |
3573 | // no iterator should finish before the first one | |
3574 | assert(iters[i]->Valid() && | |
3575 | iters[i]->key().starts_with(prefix_slices[i])); | |
3576 | values[i] = iters[i]->value().ToString(); | |
3577 | ||
3578 | char expected_first = (prefixes[i])[0]; | |
3579 | char actual_first = (values[i])[0]; | |
3580 | ||
3581 | if (actual_first != expected_first) { | |
3582 | fprintf(stderr, "error expected first = %c actual = %c\n", | |
3583 | expected_first, actual_first); | |
3584 | } | |
3585 | (values[i])[0] = ' '; // blank out the differing character | |
3586 | } | |
3587 | // make sure all values are equivalent | |
3588 | for (int i = 0; i < 10; i++) { | |
3589 | if (values[i] != values[0]) { | |
3590 | fprintf(stderr, "error : %d, inconsistent values for prefix %s: %s, %s\n", | |
3591 | i, prefixes[i].c_str(), StringToHex(values[0]).c_str(), | |
3592 | StringToHex(values[i]).c_str()); | |
3593 | // we continue after error rather than exiting so that we can | |
3594 | // find more errors if any | |
3595 | } | |
3596 | iters[i]->Next(); | |
3597 | } | |
3598 | } | |
3599 | ||
3600 | // cleanup iterators and snapshot | |
3601 | for (int i = 0; i < 10; i++) { | |
3602 | // if the first iterator finished, they should have all finished | |
3603 | assert(!iters[i]->Valid() || | |
3604 | !iters[i]->key().starts_with(prefix_slices[i])); | |
3605 | assert(iters[i]->status().ok()); | |
3606 | delete iters[i]; | |
3607 | } | |
3608 | db_->ReleaseSnapshot(snapshot); | |
3609 | ||
3610 | if (s.ok()) { | |
3611 | thread->stats.AddPrefixes(1, count); | |
3612 | } else { | |
3613 | thread->stats.AddErrors(1); | |
3614 | } | |
3615 | ||
3616 | return s; | |
3617 | } | |
3618 | ||
3619 | virtual void VerifyDb(ThreadState* /* thread */) const {} | |
3620 | }; | |
3621 | ||
494da23a TL |
3622 | class AtomicFlushStressTest : public StressTest { |
3623 | public: | |
3624 | AtomicFlushStressTest() : batch_id_(0) {} | |
3625 | ||
3626 | virtual ~AtomicFlushStressTest() {} | |
3627 | ||
3628 | virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts, | |
3629 | const ReadOptions& /* read_opts */, | |
3630 | const std::vector<int>& rand_column_families, | |
3631 | const std::vector<int64_t>& rand_keys, | |
3632 | char (&value)[100], | |
3633 | std::unique_ptr<MutexLock>& /* lock */) { | |
3634 | std::string key_str = Key(rand_keys[0]); | |
3635 | Slice key = key_str; | |
3636 | uint64_t value_base = batch_id_.fetch_add(1); | |
3637 | size_t sz = | |
3638 | GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value)); | |
3639 | Slice v(value, sz); | |
3640 | WriteBatch batch; | |
3641 | for (auto cf : rand_column_families) { | |
3642 | ColumnFamilyHandle* cfh = column_families_[cf]; | |
3643 | if (FLAGS_use_merge) { | |
3644 | batch.Merge(cfh, key, v); | |
3645 | } else { /* !FLAGS_use_merge */ | |
3646 | batch.Put(cfh, key, v); | |
3647 | } | |
3648 | } | |
3649 | Status s = db_->Write(write_opts, &batch); | |
3650 | if (!s.ok()) { | |
3651 | fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str()); | |
3652 | thread->stats.AddErrors(1); | |
3653 | } else { | |
3654 | auto num = static_cast<long>(rand_column_families.size()); | |
3655 | thread->stats.AddBytesForWrites(num, (sz + 1) * num); | |
3656 | } | |
3657 | ||
3658 | return s; | |
3659 | } | |
3660 | ||
3661 | virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts, | |
3662 | const std::vector<int>& rand_column_families, | |
3663 | const std::vector<int64_t>& rand_keys, | |
3664 | std::unique_ptr<MutexLock>& /* lock */) { | |
3665 | std::string key_str = Key(rand_keys[0]); | |
3666 | Slice key = key_str; | |
3667 | WriteBatch batch; | |
3668 | for (auto cf : rand_column_families) { | |
3669 | ColumnFamilyHandle* cfh = column_families_[cf]; | |
3670 | batch.Delete(cfh, key); | |
3671 | } | |
3672 | Status s = db_->Write(write_opts, &batch); | |
3673 | if (!s.ok()) { | |
3674 | fprintf(stderr, "multidel error: %s\n", s.ToString().c_str()); | |
3675 | thread->stats.AddErrors(1); | |
3676 | } else { | |
3677 | thread->stats.AddDeletes(static_cast<long>(rand_column_families.size())); | |
3678 | } | |
3679 | return s; | |
3680 | } | |
3681 | ||
3682 | virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts, | |
3683 | const std::vector<int>& rand_column_families, | |
3684 | const std::vector<int64_t>& rand_keys, | |
3685 | std::unique_ptr<MutexLock>& /* lock */) { | |
3686 | int64_t rand_key = rand_keys[0]; | |
3687 | auto shared = thread->shared; | |
3688 | int64_t max_key = shared->GetMaxKey(); | |
3689 | if (rand_key > max_key - FLAGS_range_deletion_width) { | |
3690 | rand_key = | |
3691 | thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1); | |
3692 | } | |
3693 | std::string key_str = Key(rand_key); | |
3694 | Slice key = key_str; | |
3695 | std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width); | |
3696 | Slice end_key = end_key_str; | |
3697 | WriteBatch batch; | |
3698 | for (auto cf : rand_column_families) { | |
3699 | ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]]; | |
3700 | batch.DeleteRange(cfh, key, end_key); | |
3701 | } | |
3702 | Status s = db_->Write(write_opts, &batch); | |
3703 | if (!s.ok()) { | |
3704 | fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str()); | |
3705 | thread->stats.AddErrors(1); | |
3706 | } else { | |
3707 | thread->stats.AddRangeDeletions( | |
3708 | static_cast<long>(rand_column_families.size())); | |
3709 | } | |
3710 | return s; | |
3711 | } | |
3712 | ||
3713 | virtual void TestIngestExternalFile( | |
3714 | ThreadState* /* thread */, | |
3715 | const std::vector<int>& /* rand_column_families */, | |
3716 | const std::vector<int64_t>& /* rand_keys */, | |
3717 | std::unique_ptr<MutexLock>& /* lock */) { | |
3718 | assert(false); | |
3719 | fprintf(stderr, | |
3720 | "AtomicFlushStressTest does not support TestIngestExternalFile " | |
3721 | "because it's not possible to verify the result\n"); | |
3722 | std::terminate(); | |
3723 | } | |
3724 | ||
3725 | virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions, | |
3726 | const std::vector<int>& rand_column_families, | |
3727 | const std::vector<int64_t>& rand_keys) { | |
3728 | std::string key_str = Key(rand_keys[0]); | |
3729 | Slice key = key_str; | |
3730 | auto cfh = | |
3731 | column_families_[rand_column_families[thread->rand.Next() % | |
3732 | rand_column_families.size()]]; | |
3733 | std::string from_db; | |
3734 | Status s = db_->Get(readoptions, cfh, key, &from_db); | |
3735 | if (s.ok()) { | |
3736 | thread->stats.AddGets(1, 1); | |
3737 | } else if (s.IsNotFound()) { | |
3738 | thread->stats.AddGets(1, 0); | |
3739 | } else { | |
3740 | thread->stats.AddErrors(1); | |
3741 | } | |
3742 | return s; | |
3743 | } | |
3744 | ||
3745 | virtual Status TestPrefixScan(ThreadState* thread, | |
3746 | const ReadOptions& readoptions, | |
3747 | const std::vector<int>& rand_column_families, | |
3748 | const std::vector<int64_t>& rand_keys) { | |
3749 | std::string key_str = Key(rand_keys[0]); | |
3750 | Slice key = key_str; | |
3751 | Slice prefix = Slice(key.data(), FLAGS_prefix_size); | |
3752 | ||
3753 | std::string upper_bound; | |
3754 | Slice ub_slice; | |
3755 | ReadOptions ro_copy = readoptions; | |
3756 | if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) { | |
3757 | ub_slice = Slice(upper_bound); | |
3758 | ro_copy.iterate_upper_bound = &ub_slice; | |
3759 | } | |
3760 | auto cfh = | |
3761 | column_families_[rand_column_families[thread->rand.Next() % | |
3762 | rand_column_families.size()]]; | |
3763 | Iterator* iter = db_->NewIterator(ro_copy, cfh); | |
3764 | long count = 0; | |
3765 | for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); | |
3766 | iter->Next()) { | |
3767 | ++count; | |
3768 | } | |
3769 | assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8))); | |
3770 | Status s = iter->status(); | |
3771 | if (s.ok()) { | |
3772 | thread->stats.AddPrefixes(1, count); | |
3773 | } else { | |
3774 | thread->stats.AddErrors(1); | |
3775 | } | |
3776 | delete iter; | |
3777 | return s; | |
3778 | } | |
3779 | ||
3780 | #ifdef ROCKSDB_LITE | |
3781 | virtual Status TestCheckpoint( | |
3782 | ThreadState* /* thread */, | |
3783 | const std::vector<int>& /* rand_column_families */, | |
3784 | const std::vector<int64_t>& /* rand_keys */) { | |
3785 | assert(false); | |
3786 | fprintf(stderr, | |
3787 | "RocksDB lite does not support " | |
3788 | "TestCheckpoint\n"); | |
3789 | std::terminate(); | |
3790 | } | |
3791 | #else | |
3792 | virtual Status TestCheckpoint( | |
3793 | ThreadState* thread, const std::vector<int>& /* rand_column_families */, | |
3794 | const std::vector<int64_t>& /* rand_keys */) { | |
3795 | std::string checkpoint_dir = | |
3796 | FLAGS_db + "/.checkpoint" + ToString(thread->tid); | |
3797 | DestroyDB(checkpoint_dir, Options()); | |
3798 | Checkpoint* checkpoint = nullptr; | |
3799 | Status s = Checkpoint::Create(db_, &checkpoint); | |
3800 | if (s.ok()) { | |
3801 | s = checkpoint->CreateCheckpoint(checkpoint_dir); | |
3802 | } | |
3803 | std::vector<ColumnFamilyHandle*> cf_handles; | |
3804 | DB* checkpoint_db = nullptr; | |
3805 | if (s.ok()) { | |
3806 | delete checkpoint; | |
3807 | checkpoint = nullptr; | |
3808 | Options options(options_); | |
3809 | options.listeners.clear(); | |
3810 | std::vector<ColumnFamilyDescriptor> cf_descs; | |
3811 | // TODO(ajkr): `column_family_names_` is not safe to access here when | |
3812 | // `clear_column_family_one_in != 0`. But we can't easily switch to | |
3813 | // `ListColumnFamilies` to get names because it won't necessarily give | |
3814 | // the same order as `column_family_names_`. | |
3815 | if (FLAGS_clear_column_family_one_in == 0) { | |
3816 | for (const auto& name : column_family_names_) { | |
3817 | cf_descs.emplace_back(name, ColumnFamilyOptions(options)); | |
3818 | } | |
3819 | s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs, | |
3820 | &cf_handles, &checkpoint_db); | |
3821 | } | |
3822 | } | |
3823 | if (checkpoint_db != nullptr) { | |
3824 | for (auto cfh : cf_handles) { | |
3825 | delete cfh; | |
3826 | } | |
3827 | cf_handles.clear(); | |
3828 | delete checkpoint_db; | |
3829 | checkpoint_db = nullptr; | |
3830 | } | |
3831 | DestroyDB(checkpoint_dir, Options()); | |
3832 | if (!s.ok()) { | |
3833 | fprintf(stderr, "A checkpoint operation failed with: %s\n", | |
3834 | s.ToString().c_str()); | |
3835 | } | |
3836 | return s; | |
3837 | } | |
3838 | #endif // !ROCKSDB_LITE | |
3839 | ||
3840 | virtual void VerifyDb(ThreadState* thread) const { | |
3841 | ReadOptions options(FLAGS_verify_checksum, true); | |
3842 | // We must set total_order_seek to true because we are doing a SeekToFirst | |
3843 | // on a column family whose memtables may support (by default) prefix-based | |
3844 | // iterator. In this case, NewIterator with options.total_order_seek being | |
3845 | // false returns a prefix-based iterator. Calling SeekToFirst using this | |
3846 | // iterator causes the iterator to become invalid. That means we cannot | |
3847 | // iterate the memtable using this iterator any more, although the memtable | |
3848 | // contains the most up-to-date key-values. | |
3849 | options.total_order_seek = true; | |
3850 | assert(thread != nullptr); | |
3851 | auto shared = thread->shared; | |
3852 | std::vector<std::unique_ptr<Iterator> > iters(column_families_.size()); | |
3853 | for (size_t i = 0; i != column_families_.size(); ++i) { | |
3854 | iters[i].reset(db_->NewIterator(options, column_families_[i])); | |
3855 | } | |
3856 | for (auto& iter : iters) { | |
3857 | iter->SeekToFirst(); | |
3858 | } | |
3859 | size_t num = column_families_.size(); | |
3860 | assert(num == iters.size()); | |
3861 | std::vector<Status> statuses(num, Status::OK()); | |
3862 | do { | |
3863 | size_t valid_cnt = 0; | |
3864 | size_t idx = 0; | |
3865 | for (auto& iter : iters) { | |
3866 | if (iter->Valid()) { | |
3867 | ++valid_cnt; | |
3868 | } else { | |
3869 | statuses[idx] = iter->status(); | |
3870 | } | |
3871 | ++idx; | |
3872 | } | |
3873 | if (valid_cnt == 0) { | |
3874 | Status status; | |
3875 | for (size_t i = 0; i != num; ++i) { | |
3876 | const auto& s = statuses[i]; | |
3877 | if (!s.ok()) { | |
3878 | status = s; | |
3879 | fprintf(stderr, "Iterator on cf %s has error: %s\n", | |
3880 | column_families_[i]->GetName().c_str(), | |
3881 | s.ToString().c_str()); | |
3882 | shared->SetVerificationFailure(); | |
3883 | } | |
3884 | } | |
3885 | if (status.ok()) { | |
3886 | fprintf(stdout, "Finished scanning all column families.\n"); | |
3887 | } | |
3888 | break; | |
3889 | } else if (valid_cnt != iters.size()) { | |
3890 | for (size_t i = 0; i != num; ++i) { | |
3891 | if (!iters[i]->Valid()) { | |
3892 | if (statuses[i].ok()) { | |
3893 | fprintf(stderr, "Finished scanning cf %s\n", | |
3894 | column_families_[i]->GetName().c_str()); | |
3895 | } else { | |
3896 | fprintf(stderr, "Iterator on cf %s has error: %s\n", | |
3897 | column_families_[i]->GetName().c_str(), | |
3898 | statuses[i].ToString().c_str()); | |
3899 | } | |
3900 | } else { | |
3901 | fprintf(stderr, "cf %s has remaining data to scan\n", | |
3902 | column_families_[i]->GetName().c_str()); | |
3903 | } | |
3904 | } | |
3905 | shared->SetVerificationFailure(); | |
3906 | break; | |
3907 | } | |
3908 | // If the program reaches here, then all column families' iterators are | |
3909 | // still valid. | |
3910 | Slice key; | |
3911 | Slice value; | |
3912 | for (size_t i = 0; i != num; ++i) { | |
3913 | if (i == 0) { | |
3914 | key = iters[i]->key(); | |
3915 | value = iters[i]->value(); | |
3916 | } else { | |
3917 | if (key.compare(iters[i]->key()) != 0) { | |
3918 | fprintf(stderr, "Verification failed\n"); | |
3919 | fprintf(stderr, "cf%s: %s => %s\n", | |
3920 | column_families_[0]->GetName().c_str(), | |
3921 | key.ToString(true /* hex */).c_str(), | |
3922 | value.ToString(/* hex */).c_str()); | |
3923 | fprintf(stderr, "cf%s: %s => %s\n", | |
3924 | column_families_[i]->GetName().c_str(), | |
3925 | iters[i]->key().ToString(true /* hex */).c_str(), | |
3926 | iters[i]->value().ToString(true /* hex */).c_str()); | |
3927 | shared->SetVerificationFailure(); | |
3928 | } | |
3929 | } | |
3930 | } | |
3931 | for (auto& iter : iters) { | |
3932 | iter->Next(); | |
3933 | } | |
3934 | } while (true); | |
3935 | } | |
3936 | ||
3937 | virtual std::vector<int> GenerateColumnFamilies( | |
3938 | const int /* num_column_families */, int /* rand_column_family */) const { | |
3939 | std::vector<int> ret; | |
3940 | int num = static_cast<int>(column_families_.size()); | |
3941 | int k = 0; | |
3942 | std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; }); | |
3943 | return ret; | |
3944 | } | |
3945 | ||
3946 | private: | |
3947 | std::atomic<int64_t> batch_id_; | |
3948 | }; | |
3949 | ||
11fdf7f2 TL |
3950 | } // namespace rocksdb |
3951 | ||
3952 | int main(int argc, char** argv) { | |
3953 | SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + | |
3954 | " [OPTIONS]..."); | |
3955 | ParseCommandLineFlags(&argc, &argv, true); | |
3956 | ||
3957 | if (FLAGS_statistics) { | |
3958 | dbstats = rocksdb::CreateDBStatistics(); | |
3959 | } | |
3960 | FLAGS_compression_type_e = | |
3961 | StringToCompressionType(FLAGS_compression_type.c_str()); | |
3962 | FLAGS_checksum_type_e = StringToChecksumType(FLAGS_checksum_type.c_str()); | |
3963 | if (!FLAGS_hdfs.empty()) { | |
3964 | FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs); | |
3965 | } | |
3966 | FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str()); | |
3967 | ||
3968 | // The number of background threads should be at least as much the | |
3969 | // max number of concurrent compactions. | |
3970 | FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); | |
3971 | FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads, | |
3972 | rocksdb::Env::Priority::BOTTOM); | |
3973 | if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size <= 0) { | |
3974 | fprintf(stderr, | |
3975 | "Error: prefixpercent is non-zero while prefix_size is " | |
3976 | "not positive!\n"); | |
3977 | exit(1); | |
3978 | } | |
3979 | if (FLAGS_test_batches_snapshots && FLAGS_prefix_size <= 0) { | |
3980 | fprintf(stderr, | |
3981 | "Error: please specify prefix_size for " | |
3982 | "test_batches_snapshots test!\n"); | |
3983 | exit(1); | |
3984 | } | |
3985 | if (FLAGS_memtable_prefix_bloom_size_ratio > 0.0 && FLAGS_prefix_size <= 0) { | |
3986 | fprintf(stderr, | |
3987 | "Error: please specify positive prefix_size in order to use " | |
3988 | "memtable_prefix_bloom_size_ratio\n"); | |
3989 | exit(1); | |
7c673cae FG |
3990 | } |
3991 | if ((FLAGS_readpercent + FLAGS_prefixpercent + | |
3992 | FLAGS_writepercent + FLAGS_delpercent + FLAGS_delrangepercent + | |
3993 | FLAGS_iterpercent) != 100) { | |
3994 | fprintf(stderr, | |
3995 | "Error: Read+Prefix+Write+Delete+DeleteRange+Iterate percents != " | |
3996 | "100!\n"); | |
3997 | exit(1); | |
3998 | } | |
3999 | if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) { | |
11fdf7f2 TL |
4000 | fprintf(stderr, "Error: Db cannot reopen safely with disable_wal set!\n"); |
4001 | exit(1); | |
7c673cae FG |
4002 | } |
4003 | if ((unsigned)FLAGS_reopen >= FLAGS_ops_per_thread) { | |
4004 | fprintf(stderr, | |
4005 | "Error: #DB-reopens should be < ops_per_thread\n" | |
4006 | "Provided reopens = %d and ops_per_thread = %lu\n", | |
4007 | FLAGS_reopen, | |
4008 | (unsigned long)FLAGS_ops_per_thread); | |
4009 | exit(1); | |
4010 | } | |
4011 | if (FLAGS_test_batches_snapshots && FLAGS_delrangepercent > 0) { | |
4012 | fprintf(stderr, "Error: nonzero delrangepercent unsupported in " | |
4013 | "test_batches_snapshots mode\n"); | |
4014 | exit(1); | |
4015 | } | |
11fdf7f2 TL |
4016 | if (FLAGS_active_width > FLAGS_max_key) { |
4017 | fprintf(stderr, "Error: active_width can be at most max_key\n"); | |
4018 | exit(1); | |
4019 | } else if (FLAGS_active_width == 0) { | |
4020 | FLAGS_active_width = FLAGS_max_key; | |
4021 | } | |
4022 | if (FLAGS_value_size_mult * kRandomValueMaxFactor > kValueMaxLen) { | |
4023 | fprintf(stderr, "Error: value_size_mult can be at most %d\n", | |
4024 | kValueMaxLen / kRandomValueMaxFactor); | |
4025 | exit(1); | |
4026 | } | |
4027 | if (FLAGS_use_merge && FLAGS_nooverwritepercent == 100) { | |
4028 | fprintf( | |
4029 | stderr, | |
4030 | "Error: nooverwritepercent must not be 100 when using merge operands"); | |
4031 | exit(1); | |
4032 | } | |
4033 | if (FLAGS_ingest_external_file_one_in > 0 && FLAGS_nooverwritepercent > 0) { | |
4034 | fprintf(stderr, | |
4035 | "Error: nooverwritepercent must be 0 when using file ingestion\n"); | |
4036 | exit(1); | |
4037 | } | |
494da23a TL |
4038 | if (FLAGS_clear_column_family_one_in > 0 && FLAGS_backup_one_in > 0) { |
4039 | fprintf(stderr, | |
4040 | "Error: clear_column_family_one_in must be 0 when using backup\n"); | |
4041 | exit(1); | |
4042 | } | |
4043 | if (FLAGS_test_atomic_flush) { | |
4044 | FLAGS_atomic_flush = true; | |
4045 | } | |
4046 | if (FLAGS_read_only) { | |
4047 | if (FLAGS_writepercent != 0 || FLAGS_delpercent != 0 || | |
4048 | FLAGS_delrangepercent != 0) { | |
4049 | fprintf(stderr, "Error: updates are not supported in read only mode\n"); | |
4050 | exit(1); | |
4051 | } else if (FLAGS_checkpoint_one_in > 0 && | |
4052 | FLAGS_clear_column_family_one_in > 0) { | |
4053 | fprintf(stdout, | |
4054 | "Warn: checkpoint won't be validated since column families may " | |
4055 | "be dropped.\n"); | |
4056 | } | |
4057 | } | |
7c673cae FG |
4058 | |
4059 | // Choose a location for the test database if none given with --db=<path> | |
4060 | if (FLAGS_db.empty()) { | |
4061 | std::string default_db_path; | |
4062 | rocksdb::Env::Default()->GetTestDirectory(&default_db_path); | |
4063 | default_db_path += "/dbstress"; | |
4064 | FLAGS_db = default_db_path; | |
4065 | } | |
4066 | ||
4067 | rocksdb_kill_odds = FLAGS_kill_random_test; | |
4068 | rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist); | |
4069 | ||
11fdf7f2 | 4070 | std::unique_ptr<rocksdb::StressTest> stress; |
494da23a TL |
4071 | if (FLAGS_test_atomic_flush) { |
4072 | stress.reset(new rocksdb::AtomicFlushStressTest()); | |
4073 | } else if (FLAGS_test_batches_snapshots) { | |
11fdf7f2 TL |
4074 | stress.reset(new rocksdb::BatchedOpsStressTest()); |
4075 | } else { | |
4076 | stress.reset(new rocksdb::NonBatchedOpsStressTest()); | |
4077 | } | |
4078 | if (stress->Run()) { | |
7c673cae FG |
4079 | return 0; |
4080 | } else { | |
4081 | return 1; | |
4082 | } | |
4083 | } | |
4084 | ||
4085 | #endif // GFLAGS |