]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | // | |
10 | ||
1e59de90 TL |
11 | #include <ios> |
12 | ||
13 | #include "util/compression.h" | |
f67539c2 TL |
14 | #ifdef GFLAGS |
15 | #include "db_stress_tool/db_stress_common.h" | |
20effc67 | 16 | #include "db_stress_tool/db_stress_compaction_filter.h" |
f67539c2 | 17 | #include "db_stress_tool/db_stress_driver.h" |
20effc67 | 18 | #include "db_stress_tool/db_stress_table_properties_collector.h" |
f67539c2 | 19 | #include "rocksdb/convenience.h" |
1e59de90 TL |
20 | #include "rocksdb/filter_policy.h" |
21 | #include "rocksdb/secondary_cache.h" | |
20effc67 | 22 | #include "rocksdb/sst_file_manager.h" |
1e59de90 TL |
23 | #include "rocksdb/types.h" |
24 | #include "rocksdb/utilities/object_registry.h" | |
25 | #include "rocksdb/utilities/write_batch_with_index.h" | |
26 | #include "test_util/testutil.h" | |
20effc67 | 27 | #include "util/cast_util.h" |
1e59de90 | 28 | #include "utilities/backup/backup_engine_impl.h" |
20effc67 | 29 | #include "utilities/fault_injection_fs.h" |
1e59de90 | 30 | #include "utilities/fault_injection_secondary_cache.h" |
f67539c2 TL |
31 | |
32 | namespace ROCKSDB_NAMESPACE { | |
1e59de90 TL |
33 | |
34 | namespace { | |
35 | ||
36 | std::shared_ptr<const FilterPolicy> CreateFilterPolicy() { | |
37 | if (FLAGS_bloom_bits < 0) { | |
38 | return BlockBasedTableOptions().filter_policy; | |
39 | } | |
40 | const FilterPolicy* new_policy; | |
41 | if (FLAGS_ribbon_starting_level >= 999) { | |
42 | // Use Bloom API | |
43 | new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, false); | |
44 | } else { | |
45 | new_policy = NewRibbonFilterPolicy( | |
46 | FLAGS_bloom_bits, /* bloom_before_level */ FLAGS_ribbon_starting_level); | |
47 | } | |
48 | return std::shared_ptr<const FilterPolicy>(new_policy); | |
49 | } | |
50 | ||
51 | } // namespace | |
52 | ||
f67539c2 | 53 | StressTest::StressTest() |
1e59de90 TL |
54 | : cache_(NewCache(FLAGS_cache_size, FLAGS_cache_numshardbits)), |
55 | compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size, | |
56 | FLAGS_compressed_cache_numshardbits)), | |
57 | filter_policy_(CreateFilterPolicy()), | |
f67539c2 TL |
58 | db_(nullptr), |
59 | #ifndef ROCKSDB_LITE | |
60 | txn_db_(nullptr), | |
61 | #endif | |
1e59de90 TL |
62 | db_aptr_(nullptr), |
63 | clock_(db_stress_env->GetSystemClock().get()), | |
f67539c2 TL |
64 | new_column_family_name_(1), |
65 | num_times_reopened_(0), | |
66 | db_preload_finished_(false), | |
1e59de90 TL |
67 | cmp_db_(nullptr), |
68 | is_db_stopped_(false) { | |
f67539c2 TL |
69 | if (FLAGS_destroy_db_initially) { |
70 | std::vector<std::string> files; | |
71 | db_stress_env->GetChildren(FLAGS_db, &files); | |
72 | for (unsigned int i = 0; i < files.size(); i++) { | |
73 | if (Slice(files[i]).starts_with("heap-")) { | |
74 | db_stress_env->DeleteFile(FLAGS_db + "/" + files[i]); | |
75 | } | |
76 | } | |
77 | ||
78 | Options options; | |
20effc67 | 79 | options.env = db_stress_env; |
f67539c2 TL |
80 | // Remove files without preserving manfiest files |
81 | #ifndef ROCKSDB_LITE | |
82 | const Status s = !FLAGS_use_blob_db | |
83 | ? DestroyDB(FLAGS_db, options) | |
84 | : blob_db::DestroyBlobDB(FLAGS_db, options, | |
85 | blob_db::BlobDBOptions()); | |
86 | #else | |
87 | const Status s = DestroyDB(FLAGS_db, options); | |
88 | #endif // !ROCKSDB_LITE | |
89 | ||
90 | if (!s.ok()) { | |
91 | fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str()); | |
92 | exit(1); | |
93 | } | |
94 | } | |
95 | } | |
96 | ||
97 | StressTest::~StressTest() { | |
98 | for (auto cf : column_families_) { | |
99 | delete cf; | |
100 | } | |
101 | column_families_.clear(); | |
102 | delete db_; | |
103 | ||
f67539c2 TL |
104 | for (auto* cf : cmp_cfhs_) { |
105 | delete cf; | |
106 | } | |
107 | cmp_cfhs_.clear(); | |
108 | delete cmp_db_; | |
109 | } | |
110 | ||
1e59de90 TL |
111 | std::shared_ptr<Cache> StressTest::NewCache(size_t capacity, |
112 | int32_t num_shard_bits) { | |
113 | ConfigOptions config_options; | |
f67539c2 TL |
114 | if (capacity <= 0) { |
115 | return nullptr; | |
116 | } | |
1e59de90 TL |
117 | |
118 | if (FLAGS_cache_type == "clock_cache") { | |
119 | fprintf(stderr, "Old clock cache implementation has been removed.\n"); | |
120 | exit(1); | |
121 | } else if (FLAGS_cache_type == "hyper_clock_cache") { | |
122 | return HyperClockCacheOptions(static_cast<size_t>(capacity), | |
123 | FLAGS_block_size /*estimated_entry_charge*/, | |
124 | num_shard_bits) | |
125 | .MakeSharedCache(); | |
126 | } else if (FLAGS_cache_type == "lru_cache") { | |
127 | LRUCacheOptions opts; | |
128 | opts.capacity = capacity; | |
129 | opts.num_shard_bits = num_shard_bits; | |
130 | #ifndef ROCKSDB_LITE | |
131 | std::shared_ptr<SecondaryCache> secondary_cache; | |
132 | if (!FLAGS_secondary_cache_uri.empty()) { | |
133 | Status s = SecondaryCache::CreateFromString( | |
134 | config_options, FLAGS_secondary_cache_uri, &secondary_cache); | |
135 | if (secondary_cache == nullptr) { | |
136 | fprintf(stderr, | |
137 | "No secondary cache registered matching string: %s status=%s\n", | |
138 | FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str()); | |
139 | exit(1); | |
140 | } | |
141 | if (FLAGS_secondary_cache_fault_one_in > 0) { | |
142 | secondary_cache = std::make_shared<FaultInjectionSecondaryCache>( | |
143 | secondary_cache, static_cast<uint32_t>(FLAGS_seed), | |
144 | FLAGS_secondary_cache_fault_one_in); | |
145 | } | |
146 | opts.secondary_cache = secondary_cache; | |
f67539c2 | 147 | } |
1e59de90 TL |
148 | #endif |
149 | return NewLRUCache(opts); | |
f67539c2 | 150 | } else { |
1e59de90 TL |
151 | fprintf(stderr, "Cache type not supported."); |
152 | exit(1); | |
153 | } | |
154 | } | |
155 | ||
156 | std::vector<std::string> StressTest::GetBlobCompressionTags() { | |
157 | std::vector<std::string> compression_tags{"kNoCompression"}; | |
158 | ||
159 | if (Snappy_Supported()) { | |
160 | compression_tags.emplace_back("kSnappyCompression"); | |
161 | } | |
162 | if (LZ4_Supported()) { | |
163 | compression_tags.emplace_back("kLZ4Compression"); | |
164 | } | |
165 | if (ZSTD_Supported()) { | |
166 | compression_tags.emplace_back("kZSTD"); | |
f67539c2 | 167 | } |
1e59de90 TL |
168 | |
169 | return compression_tags; | |
f67539c2 TL |
170 | } |
171 | ||
172 | bool StressTest::BuildOptionsTable() { | |
173 | if (FLAGS_set_options_one_in <= 0) { | |
174 | return true; | |
175 | } | |
176 | ||
177 | std::unordered_map<std::string, std::vector<std::string>> options_tbl = { | |
178 | {"write_buffer_size", | |
1e59de90 TL |
179 | {std::to_string(options_.write_buffer_size), |
180 | std::to_string(options_.write_buffer_size * 2), | |
181 | std::to_string(options_.write_buffer_size * 4)}}, | |
f67539c2 | 182 | {"max_write_buffer_number", |
1e59de90 TL |
183 | {std::to_string(options_.max_write_buffer_number), |
184 | std::to_string(options_.max_write_buffer_number * 2), | |
185 | std::to_string(options_.max_write_buffer_number * 4)}}, | |
f67539c2 TL |
186 | {"arena_block_size", |
187 | { | |
1e59de90 TL |
188 | std::to_string(options_.arena_block_size), |
189 | std::to_string(options_.write_buffer_size / 4), | |
190 | std::to_string(options_.write_buffer_size / 8), | |
f67539c2 | 191 | }}, |
1e59de90 | 192 | {"memtable_huge_page_size", {"0", std::to_string(2 * 1024 * 1024)}}, |
f67539c2 TL |
193 | {"max_successive_merges", {"0", "2", "4"}}, |
194 | {"inplace_update_num_locks", {"100", "200", "300"}}, | |
1e59de90 TL |
195 | // TODO: re-enable once internal task T124324915 is fixed. |
196 | // {"experimental_mempurge_threshold", {"0.0", "1.0"}}, | |
f67539c2 TL |
197 | // TODO(ljin): enable test for this option |
198 | // {"disable_auto_compactions", {"100", "200", "300"}}, | |
f67539c2 TL |
199 | {"level0_file_num_compaction_trigger", |
200 | { | |
1e59de90 TL |
201 | std::to_string(options_.level0_file_num_compaction_trigger), |
202 | std::to_string(options_.level0_file_num_compaction_trigger + 2), | |
203 | std::to_string(options_.level0_file_num_compaction_trigger + 4), | |
f67539c2 TL |
204 | }}, |
205 | {"level0_slowdown_writes_trigger", | |
206 | { | |
1e59de90 TL |
207 | std::to_string(options_.level0_slowdown_writes_trigger), |
208 | std::to_string(options_.level0_slowdown_writes_trigger + 2), | |
209 | std::to_string(options_.level0_slowdown_writes_trigger + 4), | |
f67539c2 TL |
210 | }}, |
211 | {"level0_stop_writes_trigger", | |
212 | { | |
1e59de90 TL |
213 | std::to_string(options_.level0_stop_writes_trigger), |
214 | std::to_string(options_.level0_stop_writes_trigger + 2), | |
215 | std::to_string(options_.level0_stop_writes_trigger + 4), | |
f67539c2 TL |
216 | }}, |
217 | {"max_compaction_bytes", | |
218 | { | |
1e59de90 TL |
219 | std::to_string(options_.target_file_size_base * 5), |
220 | std::to_string(options_.target_file_size_base * 15), | |
221 | std::to_string(options_.target_file_size_base * 100), | |
f67539c2 TL |
222 | }}, |
223 | {"target_file_size_base", | |
224 | { | |
1e59de90 TL |
225 | std::to_string(options_.target_file_size_base), |
226 | std::to_string(options_.target_file_size_base * 2), | |
227 | std::to_string(options_.target_file_size_base * 4), | |
f67539c2 TL |
228 | }}, |
229 | {"target_file_size_multiplier", | |
230 | { | |
1e59de90 | 231 | std::to_string(options_.target_file_size_multiplier), |
f67539c2 TL |
232 | "1", |
233 | "2", | |
234 | }}, | |
235 | {"max_bytes_for_level_base", | |
236 | { | |
1e59de90 TL |
237 | std::to_string(options_.max_bytes_for_level_base / 2), |
238 | std::to_string(options_.max_bytes_for_level_base), | |
239 | std::to_string(options_.max_bytes_for_level_base * 2), | |
f67539c2 TL |
240 | }}, |
241 | {"max_bytes_for_level_multiplier", | |
242 | { | |
1e59de90 | 243 | std::to_string(options_.max_bytes_for_level_multiplier), |
f67539c2 TL |
244 | "1", |
245 | "2", | |
246 | }}, | |
247 | {"max_sequential_skip_in_iterations", {"4", "8", "12"}}, | |
248 | }; | |
249 | ||
1e59de90 TL |
250 | if (FLAGS_allow_setting_blob_options_dynamically) { |
251 | options_tbl.emplace("enable_blob_files", | |
252 | std::vector<std::string>{"false", "true"}); | |
253 | options_tbl.emplace("min_blob_size", | |
254 | std::vector<std::string>{"0", "8", "16"}); | |
255 | options_tbl.emplace("blob_file_size", | |
256 | std::vector<std::string>{"1M", "16M", "256M", "1G"}); | |
257 | options_tbl.emplace("blob_compression_type", GetBlobCompressionTags()); | |
258 | options_tbl.emplace("enable_blob_garbage_collection", | |
259 | std::vector<std::string>{"false", "true"}); | |
260 | options_tbl.emplace( | |
261 | "blob_garbage_collection_age_cutoff", | |
262 | std::vector<std::string>{"0.0", "0.25", "0.5", "0.75", "1.0"}); | |
263 | options_tbl.emplace("blob_garbage_collection_force_threshold", | |
264 | std::vector<std::string>{"0.5", "0.75", "1.0"}); | |
265 | options_tbl.emplace("blob_compaction_readahead_size", | |
266 | std::vector<std::string>{"0", "1M", "4M"}); | |
267 | options_tbl.emplace("blob_file_starting_level", | |
268 | std::vector<std::string>{"0", "1", "2"}); | |
269 | options_tbl.emplace("prepopulate_blob_cache", | |
270 | std::vector<std::string>{"kDisable", "kFlushOnly"}); | |
271 | } | |
272 | ||
f67539c2 TL |
273 | options_table_ = std::move(options_tbl); |
274 | ||
275 | for (const auto& iter : options_table_) { | |
276 | options_index_.push_back(iter.first); | |
277 | } | |
278 | return true; | |
279 | } | |
280 | ||
1e59de90 TL |
281 | void StressTest::InitDb(SharedState* shared) { |
282 | uint64_t now = clock_->NowMicros(); | |
f67539c2 | 283 | fprintf(stdout, "%s Initializing db_stress\n", |
1e59de90 | 284 | clock_->TimeToString(now / 1000000).c_str()); |
f67539c2 | 285 | PrintEnv(); |
1e59de90 | 286 | Open(shared); |
f67539c2 TL |
287 | BuildOptionsTable(); |
288 | } | |
289 | ||
20effc67 TL |
290 | void StressTest::FinishInitDb(SharedState* shared) { |
291 | if (FLAGS_read_only) { | |
1e59de90 | 292 | uint64_t now = clock_->NowMicros(); |
20effc67 | 293 | fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n", |
1e59de90 | 294 | clock_->TimeToString(now / 1000000).c_str(), FLAGS_max_key); |
20effc67 TL |
295 | PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared); |
296 | } | |
1e59de90 TL |
297 | |
298 | if (shared->HasHistory()) { | |
299 | // The way it works right now is, if there's any history, that means the | |
300 | // previous run mutating the DB had all its operations traced, in which case | |
301 | // we should always be able to `Restore()` the expected values to match the | |
302 | // `db_`'s current seqno. | |
303 | Status s = shared->Restore(db_); | |
304 | if (!s.ok()) { | |
305 | fprintf(stderr, "Error restoring historical expected values: %s\n", | |
306 | s.ToString().c_str()); | |
307 | exit(1); | |
308 | } | |
309 | } | |
310 | #ifndef ROCKSDB_LITE | |
311 | if (FLAGS_use_txn) { | |
312 | // It's OK here without sync because unsynced data cannot be lost at this | |
313 | // point | |
314 | // - even with sync_fault_injection=1 as the | |
315 | // file is still directly writable until after FinishInitDb() | |
316 | ProcessRecoveredPreparedTxns(shared); | |
317 | } | |
318 | #endif | |
20effc67 | 319 | if (FLAGS_enable_compaction_filter) { |
1e59de90 TL |
320 | auto* compaction_filter_factory = |
321 | reinterpret_cast<DbStressCompactionFilterFactory*>( | |
322 | options_.compaction_filter_factory.get()); | |
323 | assert(compaction_filter_factory); | |
324 | // This must be called only after any potential `SharedState::Restore()` has | |
325 | // completed in order for the `compaction_filter_factory` to operate on the | |
326 | // correct latest values file. | |
327 | compaction_filter_factory->SetSharedState(shared); | |
328 | fprintf(stdout, "Compaction filter factory: %s\n", | |
329 | compaction_filter_factory->Name()); | |
20effc67 | 330 | } |
f67539c2 TL |
331 | } |
332 | ||
1e59de90 TL |
333 | void StressTest::TrackExpectedState(SharedState* shared) { |
334 | // For `FLAGS_manual_wal_flush_one_inWAL` | |
335 | // data can be lost when `manual_wal_flush_one_in > 0` and `FlushWAL()` is not | |
336 | // explictly called by users of RocksDB (in our case, db stress). | |
337 | // Therefore recovery from such potential WAL data loss is a prefix recovery | |
338 | // that requires tracing | |
339 | if ((FLAGS_sync_fault_injection || FLAGS_disable_wal || | |
340 | FLAGS_manual_wal_flush_one_in > 0) && | |
341 | IsStateTracked()) { | |
342 | Status s = shared->SaveAtAndAfter(db_); | |
f67539c2 | 343 | if (!s.ok()) { |
1e59de90 TL |
344 | fprintf(stderr, "Error enabling history tracing: %s\n", |
345 | s.ToString().c_str()); | |
346 | exit(1); | |
f67539c2 TL |
347 | } |
348 | } | |
f67539c2 TL |
349 | } |
350 | ||
351 | Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf, | |
352 | ThreadState::SnapshotState& snap_state) { | |
353 | Status s; | |
354 | if (cf->GetName() != snap_state.cf_at_name) { | |
355 | return s; | |
356 | } | |
1e59de90 TL |
357 | // This `ReadOptions` is for validation purposes. Ignore |
358 | // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. | |
f67539c2 TL |
359 | ReadOptions ropt; |
360 | ropt.snapshot = snap_state.snapshot; | |
1e59de90 TL |
361 | Slice ts; |
362 | if (!snap_state.timestamp.empty()) { | |
363 | ts = snap_state.timestamp; | |
364 | ropt.timestamp = &ts; | |
365 | } | |
f67539c2 TL |
366 | PinnableSlice exp_v(&snap_state.value); |
367 | exp_v.PinSelf(); | |
368 | PinnableSlice v; | |
369 | s = db->Get(ropt, cf, snap_state.key, &v); | |
370 | if (!s.ok() && !s.IsNotFound()) { | |
371 | return s; | |
372 | } | |
373 | if (snap_state.status != s) { | |
374 | return Status::Corruption( | |
375 | "The snapshot gave inconsistent results for key " + | |
1e59de90 | 376 | std::to_string(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) + |
f67539c2 TL |
377 | " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() + |
378 | ") vs. (" + s.ToString() + ")"); | |
379 | } | |
380 | if (s.ok()) { | |
381 | if (exp_v != v) { | |
382 | return Status::Corruption("The snapshot gave inconsistent values: (" + | |
383 | exp_v.ToString() + ") vs. (" + v.ToString() + | |
384 | ")"); | |
385 | } | |
386 | } | |
387 | if (snap_state.key_vec != nullptr) { | |
388 | // When `prefix_extractor` is set, seeking to beginning and scanning | |
389 | // across prefixes are only supported with `total_order_seek` set. | |
390 | ropt.total_order_seek = true; | |
391 | std::unique_ptr<Iterator> iterator(db->NewIterator(ropt)); | |
392 | std::unique_ptr<std::vector<bool>> tmp_bitvec( | |
393 | new std::vector<bool>(FLAGS_max_key)); | |
394 | for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { | |
395 | uint64_t key_val; | |
396 | if (GetIntVal(iterator->key().ToString(), &key_val)) { | |
397 | (*tmp_bitvec.get())[key_val] = true; | |
398 | } | |
399 | } | |
400 | if (!std::equal(snap_state.key_vec->begin(), snap_state.key_vec->end(), | |
401 | tmp_bitvec.get()->begin())) { | |
402 | return Status::Corruption("Found inconsistent keys at this snapshot"); | |
403 | } | |
404 | } | |
405 | return Status::OK(); | |
406 | } | |
407 | ||
408 | void StressTest::VerificationAbort(SharedState* shared, std::string msg, | |
409 | Status s) const { | |
410 | fprintf(stderr, "Verification failed: %s. Status is %s\n", msg.c_str(), | |
411 | s.ToString().c_str()); | |
412 | shared->SetVerificationFailure(); | |
413 | } | |
414 | ||
415 | void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf, | |
416 | int64_t key) const { | |
20effc67 TL |
417 | auto key_str = Key(key); |
418 | Slice key_slice = key_str; | |
f67539c2 | 419 | fprintf(stderr, |
20effc67 TL |
420 | "Verification failed for column family %d key %s (%" PRIi64 "): %s\n", |
421 | cf, key_slice.ToString(true).c_str(), key, msg.c_str()); | |
f67539c2 TL |
422 | shared->SetVerificationFailure(); |
423 | } | |
424 | ||
1e59de90 TL |
425 | void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf, |
426 | int64_t key, Slice value_from_db, | |
427 | Slice value_from_expected) const { | |
428 | auto key_str = Key(key); | |
429 | fprintf(stderr, | |
430 | "Verification failed for column family %d key %s (%" PRIi64 | |
431 | "): value_from_db: %s, value_from_expected: %s, msg: %s\n", | |
432 | cf, Slice(key_str).ToString(true).c_str(), key, | |
433 | value_from_db.ToString(true).c_str(), | |
434 | value_from_expected.ToString(true).c_str(), msg.c_str()); | |
435 | shared->SetVerificationFailure(); | |
436 | } | |
437 | ||
438 | void StressTest::VerificationAbort(SharedState* shared, int cf, int64_t key, | |
439 | const Slice& value, | |
440 | const WideColumns& columns, | |
441 | const WideColumns& expected_columns) const { | |
442 | assert(shared); | |
443 | ||
444 | auto key_str = Key(key); | |
445 | ||
446 | fprintf(stderr, | |
447 | "Verification failed for column family %d key %s (%" PRIi64 | |
448 | "): Value and columns inconsistent: %s\n", | |
449 | cf, Slice(key_str).ToString(/* hex */ true).c_str(), key, | |
450 | DebugString(value, columns, expected_columns).c_str()); | |
451 | ||
452 | shared->SetVerificationFailure(); | |
453 | } | |
454 | ||
455 | std::string StressTest::DebugString(const Slice& value, | |
456 | const WideColumns& columns, | |
457 | const WideColumns& expected_columns) { | |
458 | std::ostringstream oss; | |
459 | ||
460 | oss << "value: " << value.ToString(/* hex */ true); | |
461 | ||
462 | auto dump = [](const WideColumns& cols, std::ostream& os) { | |
463 | if (cols.empty()) { | |
464 | return; | |
465 | } | |
466 | ||
467 | os << std::hex; | |
468 | ||
469 | auto it = cols.begin(); | |
470 | os << *it; | |
471 | for (++it; it != cols.end(); ++it) { | |
472 | os << ' ' << *it; | |
473 | } | |
474 | }; | |
475 | ||
476 | oss << ", columns: "; | |
477 | dump(columns, oss); | |
478 | ||
479 | oss << ", expected_columns: "; | |
480 | dump(expected_columns, oss); | |
481 | ||
482 | return oss.str(); | |
483 | } | |
484 | ||
f67539c2 TL |
485 | void StressTest::PrintStatistics() { |
486 | if (dbstats) { | |
487 | fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); | |
488 | } | |
489 | if (dbstats_secondaries) { | |
490 | fprintf(stdout, "Secondary instances STATISTICS:\n%s\n", | |
491 | dbstats_secondaries->ToString().c_str()); | |
492 | } | |
493 | } | |
494 | ||
495 | // Currently PreloadDb has to be single-threaded. | |
496 | void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, | |
497 | SharedState* shared) { | |
498 | WriteOptions write_opts; | |
499 | write_opts.disableWAL = FLAGS_disable_wal; | |
500 | if (FLAGS_sync) { | |
501 | write_opts.sync = true; | |
502 | } | |
1e59de90 TL |
503 | if (FLAGS_rate_limit_auto_wal_flush) { |
504 | write_opts.rate_limiter_priority = Env::IO_USER; | |
505 | } | |
f67539c2 TL |
506 | char value[100]; |
507 | int cf_idx = 0; | |
508 | Status s; | |
509 | for (auto cfh : column_families_) { | |
510 | for (int64_t k = 0; k != number_of_keys; ++k) { | |
1e59de90 TL |
511 | const std::string key = Key(k); |
512 | ||
513 | constexpr uint32_t value_base = 0; | |
514 | const size_t sz = GenerateValue(value_base, value, sizeof(value)); | |
515 | ||
516 | const Slice v(value, sz); | |
517 | ||
518 | shared->Put(cf_idx, k, value_base, true /* pending */); | |
519 | ||
520 | std::string ts; | |
521 | if (FLAGS_user_timestamp_size > 0) { | |
522 | ts = GetNowNanos(); | |
523 | } | |
f67539c2 TL |
524 | |
525 | if (FLAGS_use_merge) { | |
526 | if (!FLAGS_use_txn) { | |
1e59de90 TL |
527 | if (FLAGS_user_timestamp_size > 0) { |
528 | s = db_->Merge(write_opts, cfh, key, ts, v); | |
529 | } else { | |
530 | s = db_->Merge(write_opts, cfh, key, v); | |
531 | } | |
f67539c2 TL |
532 | } else { |
533 | #ifndef ROCKSDB_LITE | |
534 | Transaction* txn; | |
535 | s = NewTxn(write_opts, &txn); | |
536 | if (s.ok()) { | |
537 | s = txn->Merge(cfh, key, v); | |
538 | if (s.ok()) { | |
539 | s = CommitTxn(txn); | |
540 | } | |
541 | } | |
542 | #endif | |
543 | } | |
1e59de90 TL |
544 | } else if (FLAGS_use_put_entity_one_in > 0) { |
545 | s = db_->PutEntity(write_opts, cfh, key, | |
546 | GenerateWideColumns(value_base, v)); | |
f67539c2 TL |
547 | } else { |
548 | if (!FLAGS_use_txn) { | |
1e59de90 TL |
549 | if (FLAGS_user_timestamp_size > 0) { |
550 | s = db_->Put(write_opts, cfh, key, ts, v); | |
551 | } else { | |
552 | s = db_->Put(write_opts, cfh, key, v); | |
553 | } | |
f67539c2 TL |
554 | } else { |
555 | #ifndef ROCKSDB_LITE | |
556 | Transaction* txn; | |
557 | s = NewTxn(write_opts, &txn); | |
558 | if (s.ok()) { | |
559 | s = txn->Put(cfh, key, v); | |
560 | if (s.ok()) { | |
561 | s = CommitTxn(txn); | |
562 | } | |
563 | } | |
564 | #endif | |
565 | } | |
566 | } | |
567 | ||
1e59de90 | 568 | shared->Put(cf_idx, k, value_base, false /* pending */); |
f67539c2 TL |
569 | if (!s.ok()) { |
570 | break; | |
571 | } | |
572 | } | |
573 | if (!s.ok()) { | |
574 | break; | |
575 | } | |
576 | ++cf_idx; | |
577 | } | |
578 | if (s.ok()) { | |
579 | s = db_->Flush(FlushOptions(), column_families_); | |
580 | } | |
581 | if (s.ok()) { | |
582 | for (auto cf : column_families_) { | |
583 | delete cf; | |
584 | } | |
585 | column_families_.clear(); | |
586 | delete db_; | |
587 | db_ = nullptr; | |
588 | #ifndef ROCKSDB_LITE | |
589 | txn_db_ = nullptr; | |
590 | #endif | |
591 | ||
592 | db_preload_finished_.store(true); | |
1e59de90 | 593 | auto now = clock_->NowMicros(); |
f67539c2 | 594 | fprintf(stdout, "%s Reopening database in read-only\n", |
1e59de90 | 595 | clock_->TimeToString(now / 1000000).c_str()); |
f67539c2 | 596 | // Reopen as read-only, can ignore all options related to updates |
1e59de90 | 597 | Open(shared); |
f67539c2 TL |
598 | } else { |
599 | fprintf(stderr, "Failed to preload db"); | |
600 | exit(1); | |
601 | } | |
602 | } | |
603 | ||
604 | Status StressTest::SetOptions(ThreadState* thread) { | |
605 | assert(FLAGS_set_options_one_in > 0); | |
606 | std::unordered_map<std::string, std::string> opts; | |
607 | std::string name = | |
608 | options_index_[thread->rand.Next() % options_index_.size()]; | |
609 | int value_idx = thread->rand.Next() % options_table_[name].size(); | |
1e59de90 TL |
610 | if (name == "level0_file_num_compaction_trigger" || |
611 | name == "level0_slowdown_writes_trigger" || | |
612 | name == "level0_stop_writes_trigger") { | |
f67539c2 TL |
613 | opts["level0_file_num_compaction_trigger"] = |
614 | options_table_["level0_file_num_compaction_trigger"][value_idx]; | |
615 | opts["level0_slowdown_writes_trigger"] = | |
616 | options_table_["level0_slowdown_writes_trigger"][value_idx]; | |
617 | opts["level0_stop_writes_trigger"] = | |
618 | options_table_["level0_stop_writes_trigger"][value_idx]; | |
619 | } else { | |
620 | opts[name] = options_table_[name][value_idx]; | |
621 | } | |
622 | ||
623 | int rand_cf_idx = thread->rand.Next() % FLAGS_column_families; | |
624 | auto cfh = column_families_[rand_cf_idx]; | |
625 | return db_->SetOptions(cfh, opts); | |
626 | } | |
627 | ||
628 | #ifndef ROCKSDB_LITE | |
1e59de90 TL |
629 | void StressTest::ProcessRecoveredPreparedTxns(SharedState* shared) { |
630 | assert(txn_db_); | |
631 | std::vector<Transaction*> recovered_prepared_trans; | |
632 | txn_db_->GetAllPreparedTransactions(&recovered_prepared_trans); | |
633 | for (Transaction* txn : recovered_prepared_trans) { | |
634 | ProcessRecoveredPreparedTxnsHelper(txn, shared); | |
635 | delete txn; | |
636 | } | |
637 | recovered_prepared_trans.clear(); | |
638 | txn_db_->GetAllPreparedTransactions(&recovered_prepared_trans); | |
639 | assert(recovered_prepared_trans.size() == 0); | |
640 | } | |
641 | ||
642 | void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn, | |
643 | SharedState* shared) { | |
644 | thread_local Random rand(static_cast<uint32_t>(FLAGS_seed)); | |
645 | for (size_t i = 0; i < column_families_.size(); ++i) { | |
646 | std::unique_ptr<WBWIIterator> wbwi_iter( | |
647 | txn->GetWriteBatch()->NewIterator(column_families_[i])); | |
648 | for (wbwi_iter->SeekToFirst(); wbwi_iter->Valid(); wbwi_iter->Next()) { | |
649 | uint64_t key_val; | |
650 | if (GetIntVal(wbwi_iter->Entry().key.ToString(), &key_val)) { | |
651 | shared->Put(static_cast<int>(i) /* cf_idx */, key_val, | |
652 | 0 /* value_base */, true /* pending */); | |
653 | } | |
654 | } | |
655 | } | |
656 | if (rand.OneIn(2)) { | |
657 | Status s = txn->Commit(); | |
658 | assert(s.ok()); | |
659 | } else { | |
660 | Status s = txn->Rollback(); | |
661 | assert(s.ok()); | |
662 | } | |
663 | } | |
664 | ||
f67539c2 TL |
665 | Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) { |
666 | if (!FLAGS_use_txn) { | |
667 | return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); | |
668 | } | |
1e59de90 | 669 | write_opts.disableWAL = FLAGS_disable_wal; |
f67539c2 TL |
670 | static std::atomic<uint64_t> txn_id = {0}; |
671 | TransactionOptions txn_options; | |
1e59de90 TL |
672 | txn_options.use_only_the_last_commit_time_batch_for_recovery = |
673 | FLAGS_use_only_the_last_commit_time_batch_for_recovery; | |
674 | txn_options.lock_timeout = 600000; // 10 min | |
20effc67 | 675 | txn_options.deadlock_detect = true; |
f67539c2 TL |
676 | *txn = txn_db_->BeginTransaction(write_opts, txn_options); |
677 | auto istr = std::to_string(txn_id.fetch_add(1)); | |
678 | Status s = (*txn)->SetName("xid" + istr); | |
679 | return s; | |
680 | } | |
681 | ||
1e59de90 | 682 | Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) { |
f67539c2 TL |
683 | if (!FLAGS_use_txn) { |
684 | return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); | |
685 | } | |
1e59de90 | 686 | assert(txn_db_); |
f67539c2 | 687 | Status s = txn->Prepare(); |
1e59de90 | 688 | std::shared_ptr<const Snapshot> timestamped_snapshot; |
f67539c2 | 689 | if (s.ok()) { |
1e59de90 TL |
690 | if (thread && FLAGS_create_timestamped_snapshot_one_in && |
691 | thread->rand.OneIn(FLAGS_create_timestamped_snapshot_one_in)) { | |
692 | uint64_t ts = db_stress_env->NowNanos(); | |
693 | s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, | |
694 | ×tamped_snapshot); | |
695 | ||
696 | std::pair<Status, std::shared_ptr<const Snapshot>> res; | |
697 | if (thread->tid == 0) { | |
698 | uint64_t now = db_stress_env->NowNanos(); | |
699 | res = txn_db_->CreateTimestampedSnapshot(now); | |
700 | if (res.first.ok()) { | |
701 | assert(res.second); | |
702 | assert(res.second->GetTimestamp() == now); | |
703 | if (timestamped_snapshot) { | |
704 | assert(res.second->GetTimestamp() > | |
705 | timestamped_snapshot->GetTimestamp()); | |
706 | } | |
707 | } else { | |
708 | assert(!res.second); | |
709 | } | |
710 | } | |
711 | } else { | |
712 | s = txn->Commit(); | |
713 | } | |
714 | } | |
715 | if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 && | |
716 | thread->rand.OneInOpt(50000)) { | |
717 | uint64_t now = db_stress_env->NowNanos(); | |
718 | constexpr uint64_t time_diff = static_cast<uint64_t>(1000) * 1000 * 1000; | |
719 | txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff); | |
f67539c2 TL |
720 | } |
721 | delete txn; | |
722 | return s; | |
723 | } | |
724 | ||
725 | Status StressTest::RollbackTxn(Transaction* txn) { | |
726 | if (!FLAGS_use_txn) { | |
727 | return Status::InvalidArgument( | |
728 | "RollbackTxn when FLAGS_use_txn is not" | |
729 | " set"); | |
730 | } | |
731 | Status s = txn->Rollback(); | |
732 | delete txn; | |
733 | return s; | |
734 | } | |
735 | #endif | |
736 | ||
737 | void StressTest::OperateDb(ThreadState* thread) { | |
738 | ReadOptions read_opts(FLAGS_verify_checksum, true); | |
1e59de90 TL |
739 | read_opts.rate_limiter_priority = |
740 | FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; | |
741 | read_opts.async_io = FLAGS_async_io; | |
742 | read_opts.adaptive_readahead = FLAGS_adaptive_readahead; | |
743 | read_opts.readahead_size = FLAGS_readahead_size; | |
f67539c2 | 744 | WriteOptions write_opts; |
1e59de90 TL |
745 | if (FLAGS_rate_limit_auto_wal_flush) { |
746 | write_opts.rate_limiter_priority = Env::IO_USER; | |
747 | } | |
f67539c2 TL |
748 | auto shared = thread->shared; |
749 | char value[100]; | |
750 | std::string from_db; | |
751 | if (FLAGS_sync) { | |
752 | write_opts.sync = true; | |
753 | } | |
754 | write_opts.disableWAL = FLAGS_disable_wal; | |
1e59de90 TL |
755 | write_opts.protection_bytes_per_key = FLAGS_batch_protection_bytes_per_key; |
756 | const int prefix_bound = static_cast<int>(FLAGS_readpercent) + | |
757 | static_cast<int>(FLAGS_prefixpercent); | |
758 | const int write_bound = prefix_bound + static_cast<int>(FLAGS_writepercent); | |
759 | const int del_bound = write_bound + static_cast<int>(FLAGS_delpercent); | |
760 | const int delrange_bound = | |
761 | del_bound + static_cast<int>(FLAGS_delrangepercent); | |
762 | const int iterate_bound = | |
763 | delrange_bound + static_cast<int>(FLAGS_iterpercent); | |
764 | ||
f67539c2 TL |
765 | const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1); |
766 | ||
20effc67 TL |
767 | #ifndef NDEBUG |
768 | if (FLAGS_read_fault_one_in) { | |
769 | fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(), | |
1e59de90 TL |
770 | FLAGS_read_fault_one_in); |
771 | } | |
772 | #endif // NDEBUG | |
773 | if (FLAGS_write_fault_one_in) { | |
774 | IOStatus error_msg; | |
775 | if (FLAGS_injest_error_severity <= 1 || FLAGS_injest_error_severity > 2) { | |
776 | error_msg = IOStatus::IOError("Retryable IO Error"); | |
777 | error_msg.SetRetryable(true); | |
778 | } else if (FLAGS_injest_error_severity == 2) { | |
779 | // Ingest the fatal error | |
780 | error_msg = IOStatus::IOError("Fatal IO Error"); | |
781 | error_msg.SetDataLoss(true); | |
782 | } | |
783 | std::vector<FileType> types = {FileType::kTableFile, | |
784 | FileType::kDescriptorFile, | |
785 | FileType::kCurrentFile}; | |
786 | fault_fs_guard->SetRandomWriteError( | |
787 | thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg, | |
788 | /*inject_for_all_file_types=*/false, types); | |
20effc67 | 789 | } |
f67539c2 TL |
790 | thread->stats.Start(); |
791 | for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) { | |
792 | if (thread->shared->HasVerificationFailedYet() || | |
793 | thread->shared->ShouldStopTest()) { | |
794 | break; | |
795 | } | |
796 | if (open_cnt != 0) { | |
797 | thread->stats.FinishedSingleOp(); | |
798 | MutexLock l(thread->shared->GetMutex()); | |
799 | while (!thread->snapshot_queue.empty()) { | |
800 | db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot); | |
801 | delete thread->snapshot_queue.front().second.key_vec; | |
802 | thread->snapshot_queue.pop(); | |
803 | } | |
804 | thread->shared->IncVotedReopen(); | |
805 | if (thread->shared->AllVotedReopen()) { | |
806 | thread->shared->GetStressTest()->Reopen(thread); | |
807 | thread->shared->GetCondVar()->SignalAll(); | |
808 | } else { | |
809 | thread->shared->GetCondVar()->Wait(); | |
810 | } | |
811 | // Commenting this out as we don't want to reset stats on each open. | |
812 | // thread->stats.Start(); | |
813 | } | |
814 | ||
815 | for (uint64_t i = 0; i < ops_per_open; i++) { | |
816 | if (thread->shared->HasVerificationFailedYet()) { | |
817 | break; | |
818 | } | |
819 | ||
820 | // Change Options | |
821 | if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) { | |
822 | SetOptions(thread); | |
823 | } | |
824 | ||
825 | if (thread->rand.OneInOpt(FLAGS_set_in_place_one_in)) { | |
826 | options_.inplace_update_support ^= options_.inplace_update_support; | |
827 | } | |
828 | ||
829 | if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 && | |
830 | thread->rand.OneIn(FLAGS_verify_db_one_in)) { | |
831 | ContinuouslyVerifyDb(thread); | |
832 | if (thread->shared->ShouldStopTest()) { | |
833 | break; | |
834 | } | |
835 | } | |
836 | ||
837 | MaybeClearOneColumnFamily(thread); | |
838 | ||
1e59de90 TL |
839 | if (thread->rand.OneInOpt(FLAGS_manual_wal_flush_one_in)) { |
840 | bool sync = thread->rand.OneIn(2) ? true : false; | |
841 | Status s = db_->FlushWAL(sync); | |
842 | if (!s.ok() && !(sync && s.IsNotSupported())) { | |
843 | fprintf(stderr, "FlushWAL(sync=%s) failed: %s\n", | |
844 | (sync ? "true" : "false"), s.ToString().c_str()); | |
845 | } | |
846 | } | |
847 | ||
f67539c2 TL |
848 | if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) { |
849 | Status s = db_->SyncWAL(); | |
850 | if (!s.ok() && !s.IsNotSupported()) { | |
851 | fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str()); | |
852 | } | |
853 | } | |
854 | ||
855 | int rand_column_family = thread->rand.Next() % FLAGS_column_families; | |
856 | ColumnFamilyHandle* column_family = column_families_[rand_column_family]; | |
857 | ||
858 | if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) { | |
859 | TestCompactFiles(thread, column_family); | |
860 | } | |
861 | ||
862 | int64_t rand_key = GenerateOneKey(thread, i); | |
863 | std::string keystr = Key(rand_key); | |
864 | Slice key = keystr; | |
f67539c2 TL |
865 | |
866 | if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) { | |
867 | TestCompactRange(thread, rand_key, key, column_family); | |
868 | if (thread->shared->HasVerificationFailedYet()) { | |
869 | break; | |
870 | } | |
871 | } | |
872 | ||
873 | std::vector<int> rand_column_families = | |
874 | GenerateColumnFamilies(FLAGS_column_families, rand_column_family); | |
875 | ||
876 | if (thread->rand.OneInOpt(FLAGS_flush_one_in)) { | |
877 | Status status = TestFlush(rand_column_families); | |
878 | if (!status.ok()) { | |
879 | fprintf(stdout, "Unable to perform Flush(): %s\n", | |
880 | status.ToString().c_str()); | |
881 | } | |
882 | } | |
883 | ||
884 | #ifndef ROCKSDB_LITE | |
20effc67 | 885 | // Verify GetLiveFiles with a 1 in N chance. |
1e59de90 TL |
886 | if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in) && |
887 | !FLAGS_write_fault_one_in) { | |
20effc67 | 888 | Status status = VerifyGetLiveFiles(); |
f67539c2 | 889 | if (!status.ok()) { |
20effc67 TL |
890 | VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status); |
891 | } | |
892 | } | |
893 | ||
894 | // Verify GetSortedWalFiles with a 1 in N chance. | |
895 | if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) { | |
896 | Status status = VerifyGetSortedWalFiles(); | |
897 | if (!status.ok()) { | |
898 | VerificationAbort(shared, "VerifyGetSortedWalFiles status not OK", | |
899 | status); | |
900 | } | |
901 | } | |
902 | ||
903 | // Verify GetCurrentWalFile with a 1 in N chance. | |
904 | if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) { | |
905 | Status status = VerifyGetCurrentWalFile(); | |
906 | if (!status.ok()) { | |
907 | VerificationAbort(shared, "VerifyGetCurrentWalFile status not OK", | |
f67539c2 TL |
908 | status); |
909 | } | |
910 | } | |
911 | #endif // !ROCKSDB_LITE | |
912 | ||
913 | if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) { | |
914 | Status status = TestPauseBackground(thread); | |
915 | if (!status.ok()) { | |
916 | VerificationAbort( | |
917 | shared, "Pause/ContinueBackgroundWork status not OK", status); | |
918 | } | |
919 | } | |
920 | ||
921 | #ifndef ROCKSDB_LITE | |
922 | if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) { | |
923 | Status status = db_->VerifyChecksum(); | |
924 | if (!status.ok()) { | |
925 | VerificationAbort(shared, "VerifyChecksum status not OK", status); | |
926 | } | |
927 | } | |
20effc67 TL |
928 | |
929 | if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) { | |
930 | TestGetProperty(thread); | |
931 | } | |
f67539c2 TL |
932 | #endif |
933 | ||
934 | std::vector<int64_t> rand_keys = GenerateKeys(rand_key); | |
935 | ||
936 | if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) { | |
1e59de90 | 937 | TestIngestExternalFile(thread, rand_column_families, rand_keys); |
f67539c2 TL |
938 | } |
939 | ||
940 | if (thread->rand.OneInOpt(FLAGS_backup_one_in)) { | |
20effc67 TL |
941 | // Beyond a certain DB size threshold, this test becomes heavier than |
942 | // it's worth. | |
943 | uint64_t total_size = 0; | |
944 | if (FLAGS_backup_max_size > 0) { | |
945 | std::vector<FileAttributes> files; | |
946 | db_stress_env->GetChildrenFileAttributes(FLAGS_db, &files); | |
947 | for (auto& file : files) { | |
948 | total_size += file.size_bytes; | |
949 | } | |
950 | } | |
951 | ||
952 | if (total_size <= FLAGS_backup_max_size) { | |
953 | Status s = TestBackupRestore(thread, rand_column_families, rand_keys); | |
954 | if (!s.ok()) { | |
955 | VerificationAbort(shared, "Backup/restore gave inconsistent state", | |
956 | s); | |
957 | } | |
f67539c2 TL |
958 | } |
959 | } | |
960 | ||
961 | if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) { | |
962 | Status s = TestCheckpoint(thread, rand_column_families, rand_keys); | |
963 | if (!s.ok()) { | |
964 | VerificationAbort(shared, "Checkpoint gave inconsistent state", s); | |
965 | } | |
966 | } | |
967 | ||
968 | #ifndef ROCKSDB_LITE | |
969 | if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) { | |
970 | Status s = | |
971 | TestApproximateSize(thread, i, rand_column_families, rand_keys); | |
972 | if (!s.ok()) { | |
973 | VerificationAbort(shared, "ApproximateSize Failed", s); | |
974 | } | |
975 | } | |
976 | #endif // !ROCKSDB_LITE | |
977 | if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) { | |
978 | TestAcquireSnapshot(thread, rand_column_family, keystr, i); | |
979 | } | |
980 | ||
981 | /*always*/ { | |
982 | Status s = MaybeReleaseSnapshots(thread, i); | |
983 | if (!s.ok()) { | |
984 | VerificationAbort(shared, "Snapshot gave inconsistent state", s); | |
985 | } | |
986 | } | |
987 | ||
1e59de90 TL |
988 | // Assign timestamps if necessary. |
989 | std::string read_ts_str; | |
990 | Slice read_ts; | |
991 | if (FLAGS_user_timestamp_size > 0) { | |
992 | read_ts_str = GetNowNanos(); | |
993 | read_ts = read_ts_str; | |
994 | read_opts.timestamp = &read_ts; | |
995 | } | |
996 | ||
f67539c2 TL |
997 | int prob_op = thread->rand.Uniform(100); |
998 | // Reset this in case we pick something other than a read op. We don't | |
999 | // want to use a stale value when deciding at the beginning of the loop | |
1000 | // whether to vote to reopen | |
1001 | if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) { | |
1002 | assert(0 <= prob_op); | |
1003 | // OPERATION read | |
1004 | if (FLAGS_use_multiget) { | |
1005 | // Leave room for one more iteration of the loop with a single key | |
1006 | // batch. This is to ensure that each thread does exactly the same | |
1007 | // number of ops | |
1008 | int multiget_batch_size = static_cast<int>( | |
1009 | std::min(static_cast<uint64_t>(thread->rand.Uniform(64)), | |
1010 | FLAGS_ops_per_thread - i - 1)); | |
1011 | // If its the last iteration, ensure that multiget_batch_size is 1 | |
1012 | multiget_batch_size = std::max(multiget_batch_size, 1); | |
1013 | rand_keys = GenerateNKeys(thread, multiget_batch_size, i); | |
1014 | TestMultiGet(thread, read_opts, rand_column_families, rand_keys); | |
1015 | i += multiget_batch_size - 1; | |
1016 | } else { | |
1017 | TestGet(thread, read_opts, rand_column_families, rand_keys); | |
1018 | } | |
1e59de90 | 1019 | } else if (prob_op < prefix_bound) { |
f67539c2 TL |
1020 | assert(static_cast<int>(FLAGS_readpercent) <= prob_op); |
1021 | // OPERATION prefix scan | |
1022 | // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are | |
1023 | // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will | |
1024 | // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same | |
1025 | // prefix | |
1026 | TestPrefixScan(thread, read_opts, rand_column_families, rand_keys); | |
1e59de90 TL |
1027 | } else if (prob_op < write_bound) { |
1028 | assert(prefix_bound <= prob_op); | |
f67539c2 TL |
1029 | // OPERATION write |
1030 | TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys, | |
1e59de90 TL |
1031 | value); |
1032 | } else if (prob_op < del_bound) { | |
1033 | assert(write_bound <= prob_op); | |
f67539c2 | 1034 | // OPERATION delete |
1e59de90 TL |
1035 | TestDelete(thread, write_opts, rand_column_families, rand_keys); |
1036 | } else if (prob_op < delrange_bound) { | |
1037 | assert(del_bound <= prob_op); | |
f67539c2 | 1038 | // OPERATION delete range |
1e59de90 TL |
1039 | TestDeleteRange(thread, write_opts, rand_column_families, rand_keys); |
1040 | } else if (prob_op < iterate_bound) { | |
1041 | assert(delrange_bound <= prob_op); | |
f67539c2 | 1042 | // OPERATION iterate |
1e59de90 TL |
1043 | if (!FLAGS_skip_verifydb && |
1044 | thread->rand.OneInOpt( | |
1045 | FLAGS_verify_iterator_with_expected_state_one_in)) { | |
1046 | TestIterateAgainstExpected(thread, read_opts, rand_column_families, | |
1047 | rand_keys); | |
1048 | } else { | |
1049 | int num_seeks = static_cast<int>( | |
1050 | std::min(static_cast<uint64_t>(thread->rand.Uniform(4)), | |
1051 | FLAGS_ops_per_thread - i - 1)); | |
1052 | rand_keys = GenerateNKeys(thread, num_seeks, i); | |
1053 | i += num_seeks - 1; | |
1054 | TestIterate(thread, read_opts, rand_column_families, rand_keys); | |
f67539c2 | 1055 | } |
1e59de90 TL |
1056 | } else { |
1057 | assert(iterate_bound <= prob_op); | |
1058 | TestCustomOperations(thread, rand_column_families); | |
f67539c2 | 1059 | } |
1e59de90 | 1060 | thread->stats.FinishedSingleOp(); |
f67539c2 TL |
1061 | } |
1062 | } | |
1063 | while (!thread->snapshot_queue.empty()) { | |
1064 | db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot); | |
1065 | delete thread->snapshot_queue.front().second.key_vec; | |
1066 | thread->snapshot_queue.pop(); | |
1067 | } | |
1068 | ||
1069 | thread->stats.Stop(); | |
1070 | } | |
1071 | ||
1072 | #ifndef ROCKSDB_LITE | |
1073 | // Generated a list of keys that close to boundaries of SST keys. | |
1074 | // If there isn't any SST file in the DB, return empty list. | |
1075 | std::vector<std::string> StressTest::GetWhiteBoxKeys(ThreadState* thread, | |
1076 | DB* db, | |
1077 | ColumnFamilyHandle* cfh, | |
1078 | size_t num_keys) { | |
1079 | ColumnFamilyMetaData cfmd; | |
1080 | db->GetColumnFamilyMetaData(cfh, &cfmd); | |
1081 | std::vector<std::string> boundaries; | |
1082 | for (const LevelMetaData& lmd : cfmd.levels) { | |
1083 | for (const SstFileMetaData& sfmd : lmd.files) { | |
1e59de90 TL |
1084 | // If FLAGS_user_timestamp_size > 0, then both smallestkey and largestkey |
1085 | // have timestamps. | |
1086 | const auto& skey = sfmd.smallestkey; | |
1087 | const auto& lkey = sfmd.largestkey; | |
1088 | assert(skey.size() >= FLAGS_user_timestamp_size); | |
1089 | assert(lkey.size() >= FLAGS_user_timestamp_size); | |
1090 | boundaries.push_back( | |
1091 | skey.substr(0, skey.size() - FLAGS_user_timestamp_size)); | |
1092 | boundaries.push_back( | |
1093 | lkey.substr(0, lkey.size() - FLAGS_user_timestamp_size)); | |
f67539c2 TL |
1094 | } |
1095 | } | |
1096 | if (boundaries.empty()) { | |
1097 | return {}; | |
1098 | } | |
1099 | ||
1100 | std::vector<std::string> ret; | |
1101 | for (size_t j = 0; j < num_keys; j++) { | |
1102 | std::string k = | |
1103 | boundaries[thread->rand.Uniform(static_cast<int>(boundaries.size()))]; | |
1104 | if (thread->rand.OneIn(3)) { | |
1105 | // Reduce one byte from the string | |
1106 | for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) { | |
1107 | uint8_t cur = k[i]; | |
1108 | if (cur > 0) { | |
1109 | k[i] = static_cast<char>(cur - 1); | |
1110 | break; | |
1111 | } else if (i > 0) { | |
1112 | k[i] = 0xFFu; | |
1113 | } | |
1114 | } | |
1115 | } else if (thread->rand.OneIn(2)) { | |
1116 | // Add one byte to the string | |
1117 | for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) { | |
1118 | uint8_t cur = k[i]; | |
1119 | if (cur < 255) { | |
1120 | k[i] = static_cast<char>(cur + 1); | |
1121 | break; | |
1122 | } else if (i > 0) { | |
1123 | k[i] = 0x00; | |
1124 | } | |
1125 | } | |
1126 | } | |
1127 | ret.push_back(k); | |
1128 | } | |
1129 | return ret; | |
1130 | } | |
1131 | #endif // !ROCKSDB_LITE | |
1132 | ||
1133 | // Given a key K, this creates an iterator which scans to K and then | |
1134 | // does a random sequence of Next/Prev operations. | |
1135 | Status StressTest::TestIterate(ThreadState* thread, | |
1136 | const ReadOptions& read_opts, | |
1137 | const std::vector<int>& rand_column_families, | |
1138 | const std::vector<int64_t>& rand_keys) { | |
1e59de90 TL |
1139 | assert(!rand_column_families.empty()); |
1140 | assert(!rand_keys.empty()); | |
1141 | ||
1142 | ManagedSnapshot snapshot_guard(db_); | |
1143 | ||
1144 | ReadOptions ro = read_opts; | |
1145 | ro.snapshot = snapshot_guard.snapshot(); | |
1146 | ||
1147 | std::string read_ts_str; | |
1148 | Slice read_ts_slice; | |
1149 | MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice, ro); | |
f67539c2 TL |
1150 | |
1151 | bool expect_total_order = false; | |
1152 | if (thread->rand.OneIn(16)) { | |
1153 | // When prefix extractor is used, it's useful to cover total order seek. | |
1e59de90 | 1154 | ro.total_order_seek = true; |
f67539c2 TL |
1155 | expect_total_order = true; |
1156 | } else if (thread->rand.OneIn(4)) { | |
1e59de90 TL |
1157 | ro.total_order_seek = false; |
1158 | ro.auto_prefix_mode = true; | |
f67539c2 TL |
1159 | expect_total_order = true; |
1160 | } else if (options_.prefix_extractor.get() == nullptr) { | |
1161 | expect_total_order = true; | |
1162 | } | |
1163 | ||
1164 | std::string upper_bound_str; | |
1165 | Slice upper_bound; | |
1166 | if (thread->rand.OneIn(16)) { | |
1e59de90 TL |
1167 | // With a 1/16 chance, set an iterator upper bound. |
1168 | // Note: upper_bound can be smaller than the seek key. | |
1169 | const int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread); | |
f67539c2 TL |
1170 | upper_bound_str = Key(rand_upper_key); |
1171 | upper_bound = Slice(upper_bound_str); | |
1e59de90 | 1172 | ro.iterate_upper_bound = &upper_bound; |
f67539c2 TL |
1173 | } |
1174 | std::string lower_bound_str; | |
1175 | Slice lower_bound; | |
1176 | if (thread->rand.OneIn(16)) { | |
1e59de90 TL |
1177 | // With a 1/16 chance, enable iterator lower bound. |
1178 | // Note: lower_bound can be greater than the seek key. | |
1179 | const int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread); | |
f67539c2 TL |
1180 | lower_bound_str = Key(rand_lower_key); |
1181 | lower_bound = Slice(lower_bound_str); | |
1e59de90 | 1182 | ro.iterate_lower_bound = &lower_bound; |
f67539c2 TL |
1183 | } |
1184 | ||
1e59de90 TL |
1185 | ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]]; |
1186 | assert(cfh); | |
f67539c2 | 1187 | |
1e59de90 TL |
1188 | std::unique_ptr<Iterator> iter(db_->NewIterator(ro, cfh)); |
1189 | ||
1190 | std::vector<std::string> key_strs; | |
f67539c2 TL |
1191 | if (thread->rand.OneIn(16)) { |
1192 | // Generate keys close to lower or upper bound of SST files. | |
1e59de90 | 1193 | key_strs = GetWhiteBoxKeys(thread, db_, cfh, rand_keys.size()); |
f67539c2 | 1194 | } |
1e59de90 TL |
1195 | if (key_strs.empty()) { |
1196 | // Use the random keys passed in. | |
f67539c2 | 1197 | for (int64_t rkey : rand_keys) { |
1e59de90 | 1198 | key_strs.push_back(Key(rkey)); |
f67539c2 TL |
1199 | } |
1200 | } | |
1201 | ||
1202 | std::string op_logs; | |
1e59de90 | 1203 | constexpr size_t kOpLogsLimit = 10000; |
f67539c2 | 1204 | |
1e59de90 | 1205 | for (const std::string& key_str : key_strs) { |
f67539c2 TL |
1206 | if (op_logs.size() > kOpLogsLimit) { |
1207 | // Shouldn't take too much memory for the history log. Clear it. | |
1208 | op_logs = "(cleared...)\n"; | |
1209 | } | |
1210 | ||
1e59de90 TL |
1211 | if (ro.iterate_upper_bound != nullptr && thread->rand.OneIn(2)) { |
1212 | // With a 1/2 chance, change the upper bound. | |
1213 | // It is possible that it is changed before first use, but there is no | |
f67539c2 | 1214 | // problem with that. |
1e59de90 TL |
1215 | const int64_t rand_upper_key = |
1216 | GenerateOneKey(thread, FLAGS_ops_per_thread); | |
f67539c2 TL |
1217 | upper_bound_str = Key(rand_upper_key); |
1218 | upper_bound = Slice(upper_bound_str); | |
1e59de90 TL |
1219 | } |
1220 | if (ro.iterate_lower_bound != nullptr && thread->rand.OneIn(4)) { | |
1221 | // With a 1/4 chance, change the lower bound. | |
1222 | // It is possible that it is changed before first use, but there is no | |
f67539c2 | 1223 | // problem with that. |
1e59de90 TL |
1224 | const int64_t rand_lower_key = |
1225 | GenerateOneKey(thread, FLAGS_ops_per_thread); | |
f67539c2 TL |
1226 | lower_bound_str = Key(rand_lower_key); |
1227 | lower_bound = Slice(lower_bound_str); | |
1228 | } | |
1229 | ||
1e59de90 | 1230 | // Record some options to op_logs |
f67539c2 | 1231 | op_logs += "total_order_seek: "; |
1e59de90 | 1232 | op_logs += (ro.total_order_seek ? "1 " : "0 "); |
f67539c2 | 1233 | op_logs += "auto_prefix_mode: "; |
1e59de90 TL |
1234 | op_logs += (ro.auto_prefix_mode ? "1 " : "0 "); |
1235 | if (ro.iterate_upper_bound != nullptr) { | |
f67539c2 TL |
1236 | op_logs += "ub: " + upper_bound.ToString(true) + " "; |
1237 | } | |
1e59de90 | 1238 | if (ro.iterate_lower_bound != nullptr) { |
f67539c2 TL |
1239 | op_logs += "lb: " + lower_bound.ToString(true) + " "; |
1240 | } | |
1241 | ||
1e59de90 TL |
1242 | // Set up an iterator, perform the same operations without bounds and with |
1243 | // total order seek, and compare the results. This is to identify bugs | |
1244 | // related to bounds, prefix extractor, or reseeking. Sometimes we are | |
1245 | // comparing iterators with the same set-up, and it doesn't hurt to check | |
1246 | // them to be equal. | |
1247 | // | |
1248 | // This `ReadOptions` is for validation purposes. Ignore | |
1249 | // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. | |
f67539c2 | 1250 | ReadOptions cmp_ro; |
1e59de90 TL |
1251 | cmp_ro.timestamp = ro.timestamp; |
1252 | cmp_ro.iter_start_ts = ro.iter_start_ts; | |
1253 | cmp_ro.snapshot = snapshot_guard.snapshot(); | |
f67539c2 | 1254 | cmp_ro.total_order_seek = true; |
1e59de90 TL |
1255 | |
1256 | ColumnFamilyHandle* const cmp_cfh = | |
f67539c2 | 1257 | GetControlCfh(thread, rand_column_families[0]); |
1e59de90 TL |
1258 | assert(cmp_cfh); |
1259 | ||
f67539c2 | 1260 | std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh)); |
1e59de90 | 1261 | |
f67539c2 TL |
1262 | bool diverged = false; |
1263 | ||
1e59de90 TL |
1264 | Slice key(key_str); |
1265 | ||
1266 | const bool support_seek_first_or_last = expect_total_order; | |
f67539c2 TL |
1267 | |
1268 | LastIterateOp last_op; | |
1269 | if (support_seek_first_or_last && thread->rand.OneIn(100)) { | |
1270 | iter->SeekToFirst(); | |
1271 | cmp_iter->SeekToFirst(); | |
1272 | last_op = kLastOpSeekToFirst; | |
1273 | op_logs += "STF "; | |
1274 | } else if (support_seek_first_or_last && thread->rand.OneIn(100)) { | |
1275 | iter->SeekToLast(); | |
1276 | cmp_iter->SeekToLast(); | |
1277 | last_op = kLastOpSeekToLast; | |
1278 | op_logs += "STL "; | |
1279 | } else if (thread->rand.OneIn(8)) { | |
1280 | iter->SeekForPrev(key); | |
1281 | cmp_iter->SeekForPrev(key); | |
1282 | last_op = kLastOpSeekForPrev; | |
1283 | op_logs += "SFP " + key.ToString(true) + " "; | |
1284 | } else { | |
1285 | iter->Seek(key); | |
1286 | cmp_iter->Seek(key); | |
1287 | last_op = kLastOpSeek; | |
1288 | op_logs += "S " + key.ToString(true) + " "; | |
1289 | } | |
f67539c2 | 1290 | |
1e59de90 TL |
1291 | VerifyIterator(thread, cmp_cfh, ro, iter.get(), cmp_iter.get(), last_op, |
1292 | key, op_logs, &diverged); | |
1293 | ||
1294 | const bool no_reverse = | |
f67539c2 | 1295 | (FLAGS_memtablerep == "prefix_hash" && !expect_total_order); |
1e59de90 | 1296 | for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); ++i) { |
f67539c2 TL |
1297 | if (no_reverse || thread->rand.OneIn(2)) { |
1298 | iter->Next(); | |
1299 | if (!diverged) { | |
1300 | assert(cmp_iter->Valid()); | |
1301 | cmp_iter->Next(); | |
1302 | } | |
1303 | op_logs += "N"; | |
1304 | } else { | |
1305 | iter->Prev(); | |
1306 | if (!diverged) { | |
1307 | assert(cmp_iter->Valid()); | |
1308 | cmp_iter->Prev(); | |
1309 | } | |
1310 | op_logs += "P"; | |
1311 | } | |
1e59de90 | 1312 | |
f67539c2 | 1313 | last_op = kLastOpNextOrPrev; |
f67539c2 | 1314 | |
1e59de90 TL |
1315 | VerifyIterator(thread, cmp_cfh, ro, iter.get(), cmp_iter.get(), last_op, |
1316 | key, op_logs, &diverged); | |
f67539c2 TL |
1317 | } |
1318 | ||
1e59de90 TL |
1319 | thread->stats.AddIterations(1); |
1320 | ||
f67539c2 TL |
1321 | op_logs += "; "; |
1322 | } | |
1323 | ||
1e59de90 | 1324 | return Status::OK(); |
f67539c2 TL |
1325 | } |
1326 | ||
1327 | #ifndef ROCKSDB_LITE | |
20effc67 TL |
1328 | // Test the return status of GetLiveFiles. |
1329 | Status StressTest::VerifyGetLiveFiles() const { | |
1330 | std::vector<std::string> live_file; | |
1331 | uint64_t manifest_size = 0; | |
1332 | return db_->GetLiveFiles(live_file, &manifest_size); | |
1333 | } | |
f67539c2 | 1334 | |
20effc67 TL |
1335 | // Test the return status of GetSortedWalFiles. |
1336 | Status StressTest::VerifyGetSortedWalFiles() const { | |
1337 | VectorLogPtr log_ptr; | |
1338 | return db_->GetSortedWalFiles(log_ptr); | |
1339 | } | |
f67539c2 | 1340 | |
20effc67 TL |
1341 | // Test the return status of GetCurrentWalFile. |
1342 | Status StressTest::VerifyGetCurrentWalFile() const { | |
1343 | std::unique_ptr<LogFile> cur_wal_file; | |
1344 | return db_->GetCurrentWalFile(&cur_wal_file); | |
f67539c2 TL |
1345 | } |
1346 | #endif // !ROCKSDB_LITE | |
1347 | ||
1348 | // Compare the two iterator, iter and cmp_iter are in the same position, | |
1349 | // unless iter might be made invalidate or undefined because of | |
1350 | // upper or lower bounds, or prefix extractor. | |
1351 | // Will flag failure if the verification fails. | |
1352 | // diverged = true if the two iterator is already diverged. | |
1353 | // True if verification passed, false if not. | |
1354 | void StressTest::VerifyIterator(ThreadState* thread, | |
1355 | ColumnFamilyHandle* cmp_cfh, | |
1356 | const ReadOptions& ro, Iterator* iter, | |
1357 | Iterator* cmp_iter, LastIterateOp op, | |
1358 | const Slice& seek_key, | |
1359 | const std::string& op_logs, bool* diverged) { | |
1e59de90 TL |
1360 | assert(diverged); |
1361 | ||
f67539c2 TL |
1362 | if (*diverged) { |
1363 | return; | |
1364 | } | |
1365 | ||
1e59de90 TL |
1366 | if (ro.iter_start_ts != nullptr) { |
1367 | assert(FLAGS_user_timestamp_size > 0); | |
1368 | // We currently do not verify iterator when dumping history of internal | |
1369 | // keys. | |
1370 | *diverged = true; | |
1371 | return; | |
1372 | } | |
1373 | ||
f67539c2 TL |
1374 | if (op == kLastOpSeekToFirst && ro.iterate_lower_bound != nullptr) { |
1375 | // SeekToFirst() with lower bound is not well defined. | |
1376 | *diverged = true; | |
1377 | return; | |
1378 | } else if (op == kLastOpSeekToLast && ro.iterate_upper_bound != nullptr) { | |
1379 | // SeekToLast() with higher bound is not well defined. | |
1380 | *diverged = true; | |
1381 | return; | |
1382 | } else if (op == kLastOpSeek && ro.iterate_lower_bound != nullptr && | |
1e59de90 TL |
1383 | (options_.comparator->CompareWithoutTimestamp( |
1384 | *ro.iterate_lower_bound, /*a_has_ts=*/false, seek_key, | |
1385 | /*b_has_ts=*/false) >= 0 || | |
f67539c2 | 1386 | (ro.iterate_upper_bound != nullptr && |
1e59de90 TL |
1387 | options_.comparator->CompareWithoutTimestamp( |
1388 | *ro.iterate_lower_bound, /*a_has_ts=*/false, | |
1389 | *ro.iterate_upper_bound, /*b_has_ts*/ false) >= 0))) { | |
f67539c2 TL |
1390 | // Lower bound behavior is not well defined if it is larger than |
1391 | // seek key or upper bound. Disable the check for now. | |
1392 | *diverged = true; | |
1393 | return; | |
1394 | } else if (op == kLastOpSeekForPrev && ro.iterate_upper_bound != nullptr && | |
1e59de90 TL |
1395 | (options_.comparator->CompareWithoutTimestamp( |
1396 | *ro.iterate_upper_bound, /*a_has_ts=*/false, seek_key, | |
1397 | /*b_has_ts=*/false) <= 0 || | |
f67539c2 | 1398 | (ro.iterate_lower_bound != nullptr && |
1e59de90 TL |
1399 | options_.comparator->CompareWithoutTimestamp( |
1400 | *ro.iterate_lower_bound, /*a_has_ts=*/false, | |
1401 | *ro.iterate_upper_bound, /*b_has_ts=*/false) >= 0))) { | |
f67539c2 TL |
1402 | // Uppder bound behavior is not well defined if it is smaller than |
1403 | // seek key or lower bound. Disable the check for now. | |
1404 | *diverged = true; | |
1405 | return; | |
1406 | } | |
1407 | ||
1408 | const SliceTransform* pe = (ro.total_order_seek || ro.auto_prefix_mode) | |
1409 | ? nullptr | |
1410 | : options_.prefix_extractor.get(); | |
1411 | const Comparator* cmp = options_.comparator; | |
1412 | ||
1413 | if (iter->Valid() && !cmp_iter->Valid()) { | |
1414 | if (pe != nullptr) { | |
1415 | if (!pe->InDomain(seek_key)) { | |
1416 | // Prefix seek a non-in-domain key is undefined. Skip checking for | |
1417 | // this scenario. | |
1418 | *diverged = true; | |
1419 | return; | |
1420 | } else if (!pe->InDomain(iter->key())) { | |
1421 | // out of range is iterator key is not in domain anymore. | |
1422 | *diverged = true; | |
1423 | return; | |
1424 | } else if (pe->Transform(iter->key()) != pe->Transform(seek_key)) { | |
1425 | *diverged = true; | |
1426 | return; | |
1427 | } | |
1428 | } | |
1429 | fprintf(stderr, | |
1430 | "Control interator is invalid but iterator has key %s " | |
1431 | "%s\n", | |
1432 | iter->key().ToString(true).c_str(), op_logs.c_str()); | |
1433 | ||
1434 | *diverged = true; | |
1435 | } else if (cmp_iter->Valid()) { | |
1436 | // Iterator is not valid. It can be legimate if it has already been | |
1437 | // out of upper or lower bound, or filtered out by prefix iterator. | |
1438 | const Slice& total_order_key = cmp_iter->key(); | |
1439 | ||
1440 | if (pe != nullptr) { | |
1441 | if (!pe->InDomain(seek_key)) { | |
1442 | // Prefix seek a non-in-domain key is undefined. Skip checking for | |
1443 | // this scenario. | |
1444 | *diverged = true; | |
1445 | return; | |
1446 | } | |
1447 | ||
1448 | if (!pe->InDomain(total_order_key) || | |
1449 | pe->Transform(total_order_key) != pe->Transform(seek_key)) { | |
1450 | // If the prefix is exhausted, the only thing needs to check | |
1451 | // is the iterator isn't return a position in prefix. | |
1452 | // Either way, checking can stop from here. | |
1453 | *diverged = true; | |
1454 | if (!iter->Valid() || !pe->InDomain(iter->key()) || | |
1455 | pe->Transform(iter->key()) != pe->Transform(seek_key)) { | |
1456 | return; | |
1457 | } | |
1458 | fprintf(stderr, | |
1459 | "Iterator stays in prefix but contol doesn't" | |
1460 | " iterator key %s control iterator key %s %s\n", | |
1461 | iter->key().ToString(true).c_str(), | |
1462 | cmp_iter->key().ToString(true).c_str(), op_logs.c_str()); | |
1463 | } | |
1464 | } | |
1465 | // Check upper or lower bounds. | |
1466 | if (!*diverged) { | |
1467 | if ((iter->Valid() && iter->key() != cmp_iter->key()) || | |
1468 | (!iter->Valid() && | |
1469 | (ro.iterate_upper_bound == nullptr || | |
1e59de90 TL |
1470 | cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false, |
1471 | *ro.iterate_upper_bound, | |
1472 | /*b_has_ts=*/false) < 0) && | |
f67539c2 | 1473 | (ro.iterate_lower_bound == nullptr || |
1e59de90 TL |
1474 | cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false, |
1475 | *ro.iterate_lower_bound, | |
1476 | /*b_has_ts=*/false) > 0))) { | |
f67539c2 TL |
1477 | fprintf(stderr, |
1478 | "Iterator diverged from control iterator which" | |
1479 | " has value %s %s\n", | |
1480 | total_order_key.ToString(true).c_str(), op_logs.c_str()); | |
1481 | if (iter->Valid()) { | |
1482 | fprintf(stderr, "iterator has value %s\n", | |
1483 | iter->key().ToString(true).c_str()); | |
1484 | } else { | |
1485 | fprintf(stderr, "iterator is not valid\n"); | |
1486 | } | |
1487 | *diverged = true; | |
1488 | } | |
1489 | } | |
1490 | } | |
1e59de90 TL |
1491 | |
1492 | if (!*diverged && iter->Valid()) { | |
1493 | const WideColumns expected_columns = | |
1494 | GenerateExpectedWideColumns(GetValueBase(iter->value()), iter->value()); | |
1495 | if (iter->columns() != expected_columns) { | |
1496 | fprintf(stderr, "Value and columns inconsistent for iterator: %s\n", | |
1497 | DebugString(iter->value(), iter->columns(), expected_columns) | |
1498 | .c_str()); | |
1499 | ||
1500 | *diverged = true; | |
1501 | } | |
1502 | } | |
1503 | ||
f67539c2 TL |
1504 | if (*diverged) { |
1505 | fprintf(stderr, "Control CF %s\n", cmp_cfh->GetName().c_str()); | |
1506 | thread->stats.AddErrors(1); | |
1507 | // Fail fast to preserve the DB state. | |
1508 | thread->shared->SetVerificationFailure(); | |
1509 | } | |
1510 | } | |
1511 | ||
1512 | #ifdef ROCKSDB_LITE | |
1513 | Status StressTest::TestBackupRestore( | |
1514 | ThreadState* /* thread */, | |
1515 | const std::vector<int>& /* rand_column_families */, | |
1516 | const std::vector<int64_t>& /* rand_keys */) { | |
1517 | assert(false); | |
1518 | fprintf(stderr, | |
1519 | "RocksDB lite does not support " | |
1520 | "TestBackupRestore\n"); | |
1521 | std::terminate(); | |
1522 | } | |
1523 | ||
1524 | Status StressTest::TestCheckpoint( | |
1525 | ThreadState* /* thread */, | |
1526 | const std::vector<int>& /* rand_column_families */, | |
1527 | const std::vector<int64_t>& /* rand_keys */) { | |
1528 | assert(false); | |
1529 | fprintf(stderr, | |
1530 | "RocksDB lite does not support " | |
1531 | "TestCheckpoint\n"); | |
1532 | std::terminate(); | |
1533 | } | |
1534 | ||
1535 | void StressTest::TestCompactFiles(ThreadState* /* thread */, | |
1536 | ColumnFamilyHandle* /* column_family */) { | |
1537 | assert(false); | |
1538 | fprintf(stderr, | |
1539 | "RocksDB lite does not support " | |
1540 | "CompactFiles\n"); | |
1541 | std::terminate(); | |
1542 | } | |
1543 | #else // ROCKSDB_LITE | |
1544 | Status StressTest::TestBackupRestore( | |
1545 | ThreadState* thread, const std::vector<int>& rand_column_families, | |
1546 | const std::vector<int64_t>& rand_keys) { | |
1e59de90 TL |
1547 | std::vector<std::unique_ptr<MutexLock>> locks; |
1548 | if (ShouldAcquireMutexOnKey()) { | |
1549 | for (int rand_column_family : rand_column_families) { | |
1550 | // `rand_keys[0]` on each chosen CF will be verified. | |
1551 | locks.emplace_back(new MutexLock( | |
1552 | thread->shared->GetMutexForKey(rand_column_family, rand_keys[0]))); | |
1553 | } | |
1554 | } | |
1555 | ||
1556 | const std::string backup_dir = | |
1557 | FLAGS_db + "/.backup" + std::to_string(thread->tid); | |
1558 | const std::string restore_dir = | |
1559 | FLAGS_db + "/.restore" + std::to_string(thread->tid); | |
1560 | BackupEngineOptions backup_opts(backup_dir); | |
20effc67 TL |
1561 | // For debugging, get info_log from live options |
1562 | backup_opts.info_log = db_->GetDBOptions().info_log.get(); | |
20effc67 TL |
1563 | if (thread->rand.OneIn(10)) { |
1564 | backup_opts.share_table_files = false; | |
1565 | } else { | |
1566 | backup_opts.share_table_files = true; | |
1567 | if (thread->rand.OneIn(5)) { | |
1568 | backup_opts.share_files_with_checksum = false; | |
1569 | } else { | |
1570 | backup_opts.share_files_with_checksum = true; | |
1571 | if (thread->rand.OneIn(2)) { | |
1572 | // old | |
1573 | backup_opts.share_files_with_checksum_naming = | |
1e59de90 | 1574 | BackupEngineOptions::kLegacyCrc32cAndFileSize; |
20effc67 TL |
1575 | } else { |
1576 | // new | |
1577 | backup_opts.share_files_with_checksum_naming = | |
1e59de90 | 1578 | BackupEngineOptions::kUseDbSessionId; |
20effc67 TL |
1579 | } |
1580 | if (thread->rand.OneIn(2)) { | |
1581 | backup_opts.share_files_with_checksum_naming = | |
1582 | backup_opts.share_files_with_checksum_naming | | |
1e59de90 | 1583 | BackupEngineOptions::kFlagIncludeFileSize; |
20effc67 TL |
1584 | } |
1585 | } | |
1586 | } | |
1e59de90 TL |
1587 | if (thread->rand.OneIn(2)) { |
1588 | backup_opts.schema_version = 1; | |
1589 | } else { | |
1590 | backup_opts.schema_version = 2; | |
1591 | } | |
f67539c2 | 1592 | BackupEngine* backup_engine = nullptr; |
20effc67 | 1593 | std::string from = "a backup/restore operation"; |
f67539c2 | 1594 | Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine); |
20effc67 TL |
1595 | if (!s.ok()) { |
1596 | from = "BackupEngine::Open"; | |
1597 | } | |
f67539c2 | 1598 | if (s.ok()) { |
1e59de90 TL |
1599 | if (backup_opts.schema_version >= 2 && thread->rand.OneIn(2)) { |
1600 | TEST_BackupMetaSchemaOptions test_opts; | |
1601 | test_opts.crc32c_checksums = thread->rand.OneIn(2) == 0; | |
1602 | test_opts.file_sizes = thread->rand.OneIn(2) == 0; | |
1603 | TEST_SetBackupMetaSchemaOptions(backup_engine, test_opts); | |
1604 | } | |
1605 | CreateBackupOptions create_opts; | |
1606 | if (FLAGS_disable_wal) { | |
1607 | // The verification can only work when latest value of `key` is backed up, | |
1608 | // which requires flushing in case of WAL disabled. | |
1609 | // | |
1610 | // Note this triggers a flush with a key lock held. Meanwhile, operations | |
1611 | // like flush/compaction may attempt to grab key locks like in | |
1612 | // `DbStressCompactionFilter`. The philosophy around preventing deadlock | |
1613 | // is the background operation key lock acquisition only tries but does | |
1614 | // not wait for the lock. So here in the foreground it is OK to hold the | |
1615 | // lock and wait on a background operation (flush). | |
1616 | create_opts.flush_before_backup = true; | |
1617 | } | |
1618 | s = backup_engine->CreateNewBackup(create_opts, db_); | |
20effc67 TL |
1619 | if (!s.ok()) { |
1620 | from = "BackupEngine::CreateNewBackup"; | |
1621 | } | |
f67539c2 TL |
1622 | } |
1623 | if (s.ok()) { | |
1624 | delete backup_engine; | |
1625 | backup_engine = nullptr; | |
1626 | s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine); | |
20effc67 TL |
1627 | if (!s.ok()) { |
1628 | from = "BackupEngine::Open (again)"; | |
1629 | } | |
f67539c2 | 1630 | } |
20effc67 | 1631 | std::vector<BackupInfo> backup_info; |
1e59de90 TL |
1632 | // If inplace_not_restore, we verify the backup by opening it as a |
1633 | // read-only DB. If !inplace_not_restore, we restore it to a temporary | |
1634 | // directory for verification. | |
1635 | bool inplace_not_restore = thread->rand.OneIn(3); | |
f67539c2 | 1636 | if (s.ok()) { |
1e59de90 TL |
1637 | backup_engine->GetBackupInfo(&backup_info, |
1638 | /*include_file_details*/ inplace_not_restore); | |
20effc67 TL |
1639 | if (backup_info.empty()) { |
1640 | s = Status::NotFound("no backups found"); | |
1641 | from = "BackupEngine::GetBackupInfo"; | |
1642 | } | |
1643 | } | |
1644 | if (s.ok() && thread->rand.OneIn(2)) { | |
1645 | s = backup_engine->VerifyBackup( | |
1646 | backup_info.front().backup_id, | |
1647 | thread->rand.OneIn(2) /* verify_with_checksum */); | |
1648 | if (!s.ok()) { | |
1649 | from = "BackupEngine::VerifyBackup"; | |
1650 | } | |
1651 | } | |
1652 | const bool allow_persistent = thread->tid == 0; // not too many | |
1653 | bool from_latest = false; | |
1e59de90 TL |
1654 | int count = static_cast<int>(backup_info.size()); |
1655 | if (s.ok() && !inplace_not_restore) { | |
20effc67 TL |
1656 | if (count > 1) { |
1657 | s = backup_engine->RestoreDBFromBackup( | |
1658 | RestoreOptions(), backup_info[thread->rand.Uniform(count)].backup_id, | |
1659 | restore_dir /* db_dir */, restore_dir /* wal_dir */); | |
1660 | if (!s.ok()) { | |
1661 | from = "BackupEngine::RestoreDBFromBackup"; | |
1662 | } | |
1663 | } else { | |
1664 | from_latest = true; | |
1665 | s = backup_engine->RestoreDBFromLatestBackup(RestoreOptions(), | |
1666 | restore_dir /* db_dir */, | |
1667 | restore_dir /* wal_dir */); | |
1668 | if (!s.ok()) { | |
1669 | from = "BackupEngine::RestoreDBFromLatestBackup"; | |
1670 | } | |
1671 | } | |
f67539c2 | 1672 | } |
1e59de90 TL |
1673 | if (s.ok() && !inplace_not_restore) { |
1674 | // Purge early if restoring, to ensure the restored directory doesn't | |
1675 | // have some secret dependency on the backup directory. | |
20effc67 TL |
1676 | uint32_t to_keep = 0; |
1677 | if (allow_persistent) { | |
1678 | // allow one thread to keep up to 2 backups | |
1679 | to_keep = thread->rand.Uniform(3); | |
1680 | } | |
1681 | s = backup_engine->PurgeOldBackups(to_keep); | |
1682 | if (!s.ok()) { | |
1683 | from = "BackupEngine::PurgeOldBackups"; | |
1684 | } | |
f67539c2 TL |
1685 | } |
1686 | DB* restored_db = nullptr; | |
1687 | std::vector<ColumnFamilyHandle*> restored_cf_handles; | |
20effc67 TL |
1688 | // Not yet implemented: opening restored BlobDB or TransactionDB |
1689 | if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) { | |
f67539c2 | 1690 | Options restore_options(options_); |
1e59de90 | 1691 | restore_options.best_efforts_recovery = false; |
f67539c2 | 1692 | restore_options.listeners.clear(); |
20effc67 TL |
1693 | // Avoid dangling/shared file descriptors, for reliable destroy |
1694 | restore_options.sst_file_manager = nullptr; | |
f67539c2 TL |
1695 | std::vector<ColumnFamilyDescriptor> cf_descriptors; |
1696 | // TODO(ajkr): `column_family_names_` is not safe to access here when | |
1697 | // `clear_column_family_one_in != 0`. But we can't easily switch to | |
1698 | // `ListColumnFamilies` to get names because it won't necessarily give | |
1699 | // the same order as `column_family_names_`. | |
1700 | assert(FLAGS_clear_column_family_one_in == 0); | |
1701 | for (auto name : column_family_names_) { | |
1702 | cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options)); | |
1703 | } | |
1e59de90 TL |
1704 | if (inplace_not_restore) { |
1705 | BackupInfo& info = backup_info[thread->rand.Uniform(count)]; | |
1706 | restore_options.env = info.env_for_open.get(); | |
1707 | s = DB::OpenForReadOnly(DBOptions(restore_options), info.name_for_open, | |
1708 | cf_descriptors, &restored_cf_handles, | |
1709 | &restored_db); | |
1710 | if (!s.ok()) { | |
1711 | from = "DB::OpenForReadOnly in backup/restore"; | |
1712 | } | |
1713 | } else { | |
1714 | s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors, | |
1715 | &restored_cf_handles, &restored_db); | |
1716 | if (!s.ok()) { | |
1717 | from = "DB::Open in backup/restore"; | |
1718 | } | |
20effc67 | 1719 | } |
f67539c2 | 1720 | } |
20effc67 TL |
1721 | // Note the column families chosen by `rand_column_families` cannot be |
1722 | // dropped while the locks for `rand_keys` are held. So we should not have | |
1723 | // to worry about accessing those column families throughout this function. | |
1724 | // | |
1725 | // For simplicity, currently only verifies existence/non-existence of a | |
1726 | // single key | |
1727 | for (size_t i = 0; restored_db && s.ok() && i < rand_column_families.size(); | |
1728 | ++i) { | |
1729 | std::string key_str = Key(rand_keys[0]); | |
f67539c2 TL |
1730 | Slice key = key_str; |
1731 | std::string restored_value; | |
1e59de90 TL |
1732 | // This `ReadOptions` is for validation purposes. Ignore |
1733 | // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. | |
1734 | ReadOptions read_opts; | |
1735 | std::string ts_str; | |
1736 | Slice ts; | |
1737 | if (FLAGS_user_timestamp_size > 0) { | |
1738 | ts_str = GetNowNanos(); | |
1739 | ts = ts_str; | |
1740 | read_opts.timestamp = &ts; | |
1741 | } | |
f67539c2 | 1742 | Status get_status = restored_db->Get( |
1e59de90 | 1743 | read_opts, restored_cf_handles[rand_column_families[i]], key, |
f67539c2 | 1744 | &restored_value); |
20effc67 | 1745 | bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]); |
f67539c2 | 1746 | if (get_status.ok()) { |
20effc67 | 1747 | if (!exists && from_latest && ShouldAcquireMutexOnKey()) { |
1e59de90 TL |
1748 | std::ostringstream oss; |
1749 | oss << "0x" << key.ToString(true) | |
1750 | << " exists in restore but not in original db"; | |
1751 | s = Status::Corruption(oss.str()); | |
f67539c2 TL |
1752 | } |
1753 | } else if (get_status.IsNotFound()) { | |
20effc67 | 1754 | if (exists && from_latest && ShouldAcquireMutexOnKey()) { |
1e59de90 TL |
1755 | std::ostringstream oss; |
1756 | oss << "0x" << key.ToString(true) | |
1757 | << " exists in original db but not in restore"; | |
1758 | s = Status::Corruption(oss.str()); | |
f67539c2 TL |
1759 | } |
1760 | } else { | |
1761 | s = get_status; | |
20effc67 TL |
1762 | if (!s.ok()) { |
1763 | from = "DB::Get in backup/restore"; | |
1764 | } | |
f67539c2 TL |
1765 | } |
1766 | } | |
f67539c2 TL |
1767 | if (restored_db != nullptr) { |
1768 | for (auto* cf_handle : restored_cf_handles) { | |
1769 | restored_db->DestroyColumnFamilyHandle(cf_handle); | |
1770 | } | |
1771 | delete restored_db; | |
1772 | restored_db = nullptr; | |
1773 | } | |
1e59de90 TL |
1774 | if (s.ok() && inplace_not_restore) { |
1775 | // Purge late if inplace open read-only | |
1776 | uint32_t to_keep = 0; | |
1777 | if (allow_persistent) { | |
1778 | // allow one thread to keep up to 2 backups | |
1779 | to_keep = thread->rand.Uniform(3); | |
1780 | } | |
1781 | s = backup_engine->PurgeOldBackups(to_keep); | |
1782 | if (!s.ok()) { | |
1783 | from = "BackupEngine::PurgeOldBackups"; | |
1784 | } | |
1785 | } | |
1786 | if (backup_engine != nullptr) { | |
1787 | delete backup_engine; | |
1788 | backup_engine = nullptr; | |
1789 | } | |
20effc67 TL |
1790 | if (s.ok()) { |
1791 | // Preserve directories on failure, or allowed persistent backup | |
1792 | if (!allow_persistent) { | |
1793 | s = DestroyDir(db_stress_env, backup_dir); | |
1794 | if (!s.ok()) { | |
1795 | from = "Destroy backup dir"; | |
1796 | } | |
1797 | } | |
1798 | } | |
1799 | if (s.ok()) { | |
1800 | s = DestroyDir(db_stress_env, restore_dir); | |
1801 | if (!s.ok()) { | |
1802 | from = "Destroy restore dir"; | |
1803 | } | |
1804 | } | |
f67539c2 | 1805 | if (!s.ok()) { |
20effc67 | 1806 | fprintf(stderr, "Failure in %s with: %s\n", from.c_str(), |
f67539c2 TL |
1807 | s.ToString().c_str()); |
1808 | } | |
1809 | return s; | |
1810 | } | |
1811 | ||
f67539c2 TL |
1812 | Status StressTest::TestApproximateSize( |
1813 | ThreadState* thread, uint64_t iteration, | |
1814 | const std::vector<int>& rand_column_families, | |
1815 | const std::vector<int64_t>& rand_keys) { | |
1816 | // rand_keys likely only has one key. Just use the first one. | |
1817 | assert(!rand_keys.empty()); | |
1818 | assert(!rand_column_families.empty()); | |
1819 | int64_t key1 = rand_keys[0]; | |
1820 | int64_t key2; | |
1821 | if (thread->rand.OneIn(2)) { | |
1822 | // Two totally random keys. This tends to cover large ranges. | |
1823 | key2 = GenerateOneKey(thread, iteration); | |
1824 | if (key2 < key1) { | |
1825 | std::swap(key1, key2); | |
1826 | } | |
1827 | } else { | |
1828 | // Unless users pass a very large FLAGS_max_key, it we should not worry | |
1829 | // about overflow. It is for testing, so we skip the overflow checking | |
1830 | // for simplicity. | |
1831 | key2 = key1 + static_cast<int64_t>(thread->rand.Uniform(1000)); | |
1832 | } | |
1833 | std::string key1_str = Key(key1); | |
1834 | std::string key2_str = Key(key2); | |
1835 | Range range{Slice(key1_str), Slice(key2_str)}; | |
1836 | SizeApproximationOptions sao; | |
1e59de90 TL |
1837 | sao.include_memtables = thread->rand.OneIn(2); |
1838 | if (sao.include_memtables) { | |
f67539c2 TL |
1839 | sao.include_files = thread->rand.OneIn(2); |
1840 | } | |
1841 | if (thread->rand.OneIn(2)) { | |
1842 | if (thread->rand.OneIn(2)) { | |
1843 | sao.files_size_error_margin = 0.0; | |
1844 | } else { | |
1845 | sao.files_size_error_margin = | |
1846 | static_cast<double>(thread->rand.Uniform(3)); | |
1847 | } | |
1848 | } | |
1849 | uint64_t result; | |
1850 | return db_->GetApproximateSizes( | |
1851 | sao, column_families_[rand_column_families[0]], &range, 1, &result); | |
1852 | } | |
f67539c2 TL |
1853 | |
1854 | Status StressTest::TestCheckpoint(ThreadState* thread, | |
1855 | const std::vector<int>& rand_column_families, | |
1856 | const std::vector<int64_t>& rand_keys) { | |
1e59de90 TL |
1857 | std::vector<std::unique_ptr<MutexLock>> locks; |
1858 | if (ShouldAcquireMutexOnKey()) { | |
1859 | for (int rand_column_family : rand_column_families) { | |
1860 | // `rand_keys[0]` on each chosen CF will be verified. | |
1861 | locks.emplace_back(new MutexLock( | |
1862 | thread->shared->GetMutexForKey(rand_column_family, rand_keys[0]))); | |
1863 | } | |
1864 | } | |
1865 | ||
f67539c2 | 1866 | std::string checkpoint_dir = |
1e59de90 | 1867 | FLAGS_db + "/.checkpoint" + std::to_string(thread->tid); |
f67539c2 TL |
1868 | Options tmp_opts(options_); |
1869 | tmp_opts.listeners.clear(); | |
1e59de90 | 1870 | tmp_opts.env = db_stress_env; |
f67539c2 TL |
1871 | |
1872 | DestroyDB(checkpoint_dir, tmp_opts); | |
1873 | ||
20effc67 TL |
1874 | if (db_stress_env->FileExists(checkpoint_dir).ok()) { |
1875 | // If the directory might still exist, try to delete the files one by one. | |
1876 | // Likely a trash file is still there. | |
1877 | Status my_s = DestroyDir(db_stress_env, checkpoint_dir); | |
1878 | if (!my_s.ok()) { | |
1879 | fprintf(stderr, "Fail to destory directory before checkpoint: %s", | |
1880 | my_s.ToString().c_str()); | |
1881 | } | |
1882 | } | |
1883 | ||
f67539c2 TL |
1884 | Checkpoint* checkpoint = nullptr; |
1885 | Status s = Checkpoint::Create(db_, &checkpoint); | |
1886 | if (s.ok()) { | |
1887 | s = checkpoint->CreateCheckpoint(checkpoint_dir); | |
20effc67 TL |
1888 | if (!s.ok()) { |
1889 | fprintf(stderr, "Fail to create checkpoint to %s\n", | |
1890 | checkpoint_dir.c_str()); | |
1891 | std::vector<std::string> files; | |
1892 | Status my_s = db_stress_env->GetChildren(checkpoint_dir, &files); | |
1893 | if (my_s.ok()) { | |
1894 | for (const auto& f : files) { | |
1895 | fprintf(stderr, " %s\n", f.c_str()); | |
1896 | } | |
1897 | } else { | |
1898 | fprintf(stderr, "Fail to get files under the directory to %s\n", | |
1899 | my_s.ToString().c_str()); | |
1900 | } | |
1901 | } | |
f67539c2 | 1902 | } |
1e59de90 TL |
1903 | delete checkpoint; |
1904 | checkpoint = nullptr; | |
f67539c2 TL |
1905 | std::vector<ColumnFamilyHandle*> cf_handles; |
1906 | DB* checkpoint_db = nullptr; | |
1907 | if (s.ok()) { | |
f67539c2 | 1908 | Options options(options_); |
1e59de90 | 1909 | options.best_efforts_recovery = false; |
f67539c2 | 1910 | options.listeners.clear(); |
1e59de90 TL |
1911 | // Avoid race condition in trash handling after delete checkpoint_db |
1912 | options.sst_file_manager.reset(); | |
f67539c2 TL |
1913 | std::vector<ColumnFamilyDescriptor> cf_descs; |
1914 | // TODO(ajkr): `column_family_names_` is not safe to access here when | |
1915 | // `clear_column_family_one_in != 0`. But we can't easily switch to | |
1916 | // `ListColumnFamilies` to get names because it won't necessarily give | |
1917 | // the same order as `column_family_names_`. | |
20effc67 | 1918 | assert(FLAGS_clear_column_family_one_in == 0); |
f67539c2 TL |
1919 | if (FLAGS_clear_column_family_one_in == 0) { |
1920 | for (const auto& name : column_family_names_) { | |
1921 | cf_descs.emplace_back(name, ColumnFamilyOptions(options)); | |
1922 | } | |
1923 | s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs, | |
1924 | &cf_handles, &checkpoint_db); | |
1925 | } | |
1926 | } | |
1927 | if (checkpoint_db != nullptr) { | |
20effc67 TL |
1928 | // Note the column families chosen by `rand_column_families` cannot be |
1929 | // dropped while the locks for `rand_keys` are held. So we should not have | |
1930 | // to worry about accessing those column families throughout this function. | |
f67539c2 | 1931 | for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) { |
20effc67 | 1932 | std::string key_str = Key(rand_keys[0]); |
f67539c2 | 1933 | Slice key = key_str; |
1e59de90 TL |
1934 | std::string ts_str; |
1935 | Slice ts; | |
1936 | ReadOptions read_opts; | |
1937 | if (FLAGS_user_timestamp_size > 0) { | |
1938 | ts_str = GetNowNanos(); | |
1939 | ts = ts_str; | |
1940 | read_opts.timestamp = &ts; | |
1941 | } | |
f67539c2 TL |
1942 | std::string value; |
1943 | Status get_status = checkpoint_db->Get( | |
1e59de90 | 1944 | read_opts, cf_handles[rand_column_families[i]], key, &value); |
f67539c2 | 1945 | bool exists = |
20effc67 | 1946 | thread->shared->Exists(rand_column_families[i], rand_keys[0]); |
f67539c2 | 1947 | if (get_status.ok()) { |
20effc67 | 1948 | if (!exists && ShouldAcquireMutexOnKey()) { |
1e59de90 TL |
1949 | std::ostringstream oss; |
1950 | oss << "0x" << key.ToString(true) << " exists in checkpoint " | |
1951 | << checkpoint_dir << " but not in original db"; | |
1952 | s = Status::Corruption(oss.str()); | |
f67539c2 TL |
1953 | } |
1954 | } else if (get_status.IsNotFound()) { | |
20effc67 | 1955 | if (exists && ShouldAcquireMutexOnKey()) { |
1e59de90 TL |
1956 | std::ostringstream oss; |
1957 | oss << "0x" << key.ToString(true) | |
1958 | << " exists in original db but not in checkpoint " | |
1959 | << checkpoint_dir; | |
1960 | s = Status::Corruption(oss.str()); | |
f67539c2 TL |
1961 | } |
1962 | } else { | |
1963 | s = get_status; | |
1964 | } | |
1965 | } | |
1966 | for (auto cfh : cf_handles) { | |
1967 | delete cfh; | |
1968 | } | |
1969 | cf_handles.clear(); | |
1970 | delete checkpoint_db; | |
1971 | checkpoint_db = nullptr; | |
1972 | } | |
1973 | ||
f67539c2 TL |
1974 | if (!s.ok()) { |
1975 | fprintf(stderr, "A checkpoint operation failed with: %s\n", | |
1976 | s.ToString().c_str()); | |
20effc67 TL |
1977 | } else { |
1978 | DestroyDB(checkpoint_dir, tmp_opts); | |
f67539c2 TL |
1979 | } |
1980 | return s; | |
1981 | } | |
1982 | ||
20effc67 TL |
1983 | void StressTest::TestGetProperty(ThreadState* thread) const { |
1984 | std::unordered_set<std::string> levelPropertyNames = { | |
1985 | DB::Properties::kAggregatedTablePropertiesAtLevel, | |
1986 | DB::Properties::kCompressionRatioAtLevelPrefix, | |
1987 | DB::Properties::kNumFilesAtLevelPrefix, | |
1988 | }; | |
1989 | std::unordered_set<std::string> unknownPropertyNames = { | |
1990 | DB::Properties::kEstimateOldestKeyTime, | |
1991 | DB::Properties::kOptionsStatistics, | |
1e59de90 TL |
1992 | DB::Properties:: |
1993 | kLiveSstFilesSizeAtTemperature, // similar to levelPropertyNames, it | |
1994 | // requires a number suffix | |
20effc67 TL |
1995 | }; |
1996 | unknownPropertyNames.insert(levelPropertyNames.begin(), | |
1997 | levelPropertyNames.end()); | |
1998 | ||
1e59de90 TL |
1999 | std::unordered_set<std::string> blobCachePropertyNames = { |
2000 | DB::Properties::kBlobCacheCapacity, | |
2001 | DB::Properties::kBlobCacheUsage, | |
2002 | DB::Properties::kBlobCachePinnedUsage, | |
2003 | }; | |
2004 | if (db_->GetOptions().blob_cache == nullptr) { | |
2005 | unknownPropertyNames.insert(blobCachePropertyNames.begin(), | |
2006 | blobCachePropertyNames.end()); | |
2007 | } | |
2008 | ||
20effc67 TL |
2009 | std::string prop; |
2010 | for (const auto& ppt_name_and_info : InternalStats::ppt_name_to_info) { | |
2011 | bool res = db_->GetProperty(ppt_name_and_info.first, &prop); | |
2012 | if (unknownPropertyNames.find(ppt_name_and_info.first) == | |
2013 | unknownPropertyNames.end()) { | |
2014 | if (!res) { | |
2015 | fprintf(stderr, "Failed to get DB property: %s\n", | |
2016 | ppt_name_and_info.first.c_str()); | |
2017 | thread->shared->SetVerificationFailure(); | |
2018 | } | |
2019 | if (ppt_name_and_info.second.handle_int != nullptr) { | |
2020 | uint64_t prop_int; | |
2021 | if (!db_->GetIntProperty(ppt_name_and_info.first, &prop_int)) { | |
2022 | fprintf(stderr, "Failed to get Int property: %s\n", | |
2023 | ppt_name_and_info.first.c_str()); | |
2024 | thread->shared->SetVerificationFailure(); | |
2025 | } | |
2026 | } | |
1e59de90 TL |
2027 | if (ppt_name_and_info.second.handle_map != nullptr) { |
2028 | std::map<std::string, std::string> prop_map; | |
2029 | if (!db_->GetMapProperty(ppt_name_and_info.first, &prop_map)) { | |
2030 | fprintf(stderr, "Failed to get Map property: %s\n", | |
2031 | ppt_name_and_info.first.c_str()); | |
2032 | thread->shared->SetVerificationFailure(); | |
2033 | } | |
2034 | } | |
20effc67 TL |
2035 | } |
2036 | } | |
2037 | ||
2038 | ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data; | |
2039 | db_->GetColumnFamilyMetaData(&cf_meta_data); | |
2040 | int level_size = static_cast<int>(cf_meta_data.levels.size()); | |
2041 | for (int level = 0; level < level_size; level++) { | |
2042 | for (const auto& ppt_name : levelPropertyNames) { | |
2043 | bool res = db_->GetProperty(ppt_name + std::to_string(level), &prop); | |
2044 | if (!res) { | |
2045 | fprintf(stderr, "Failed to get DB property: %s\n", | |
2046 | (ppt_name + std::to_string(level)).c_str()); | |
2047 | thread->shared->SetVerificationFailure(); | |
2048 | } | |
2049 | } | |
2050 | } | |
2051 | ||
2052 | // Test for an invalid property name | |
2053 | if (thread->rand.OneIn(100)) { | |
2054 | if (db_->GetProperty("rocksdb.invalid_property_name", &prop)) { | |
2055 | fprintf(stderr, "Failed to return false for invalid property name\n"); | |
2056 | thread->shared->SetVerificationFailure(); | |
2057 | } | |
2058 | } | |
2059 | } | |
2060 | ||
f67539c2 TL |
2061 | void StressTest::TestCompactFiles(ThreadState* thread, |
2062 | ColumnFamilyHandle* column_family) { | |
2063 | ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data; | |
2064 | db_->GetColumnFamilyMetaData(column_family, &cf_meta_data); | |
2065 | ||
1e59de90 TL |
2066 | if (cf_meta_data.levels.empty()) { |
2067 | return; | |
2068 | } | |
2069 | ||
f67539c2 TL |
2070 | // Randomly compact up to three consecutive files from a level |
2071 | const int kMaxRetry = 3; | |
2072 | for (int attempt = 0; attempt < kMaxRetry; ++attempt) { | |
2073 | size_t random_level = | |
2074 | thread->rand.Uniform(static_cast<int>(cf_meta_data.levels.size())); | |
2075 | ||
2076 | const auto& files = cf_meta_data.levels[random_level].files; | |
2077 | if (files.size() > 0) { | |
2078 | size_t random_file_index = | |
2079 | thread->rand.Uniform(static_cast<int>(files.size())); | |
2080 | if (files[random_file_index].being_compacted) { | |
2081 | // Retry as the selected file is currently being compacted | |
2082 | continue; | |
2083 | } | |
2084 | ||
2085 | std::vector<std::string> input_files; | |
2086 | input_files.push_back(files[random_file_index].name); | |
2087 | if (random_file_index > 0 && | |
2088 | !files[random_file_index - 1].being_compacted) { | |
2089 | input_files.push_back(files[random_file_index - 1].name); | |
2090 | } | |
2091 | if (random_file_index + 1 < files.size() && | |
2092 | !files[random_file_index + 1].being_compacted) { | |
2093 | input_files.push_back(files[random_file_index + 1].name); | |
2094 | } | |
2095 | ||
2096 | size_t output_level = | |
2097 | std::min(random_level + 1, cf_meta_data.levels.size() - 1); | |
2098 | auto s = db_->CompactFiles(CompactionOptions(), column_family, | |
2099 | input_files, static_cast<int>(output_level)); | |
2100 | if (!s.ok()) { | |
2101 | fprintf(stdout, "Unable to perform CompactFiles(): %s\n", | |
2102 | s.ToString().c_str()); | |
2103 | thread->stats.AddNumCompactFilesFailed(1); | |
2104 | } else { | |
2105 | thread->stats.AddNumCompactFilesSucceed(1); | |
2106 | } | |
2107 | break; | |
2108 | } | |
2109 | } | |
2110 | } | |
2111 | #endif // ROCKSDB_LITE | |
2112 | ||
2113 | Status StressTest::TestFlush(const std::vector<int>& rand_column_families) { | |
2114 | FlushOptions flush_opts; | |
1e59de90 TL |
2115 | if (FLAGS_atomic_flush) { |
2116 | return db_->Flush(flush_opts, column_families_); | |
2117 | } | |
f67539c2 TL |
2118 | std::vector<ColumnFamilyHandle*> cfhs; |
2119 | std::for_each(rand_column_families.begin(), rand_column_families.end(), | |
2120 | [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); }); | |
2121 | return db_->Flush(flush_opts, cfhs); | |
2122 | } | |
2123 | ||
2124 | Status StressTest::TestPauseBackground(ThreadState* thread) { | |
2125 | Status status = db_->PauseBackgroundWork(); | |
2126 | if (!status.ok()) { | |
2127 | return status; | |
2128 | } | |
2129 | // To avoid stalling/deadlocking ourself in this thread, just | |
2130 | // sleep here during pause and let other threads do db operations. | |
2131 | // Sleep up to ~16 seconds (2**24 microseconds), but very skewed | |
2132 | // toward short pause. (1 chance in 25 of pausing >= 1s; | |
2133 | // 1 chance in 625 of pausing full 16s.) | |
2134 | int pwr2_micros = | |
2135 | std::min(thread->rand.Uniform(25), thread->rand.Uniform(25)); | |
1e59de90 | 2136 | clock_->SleepForMicroseconds(1 << pwr2_micros); |
f67539c2 TL |
2137 | return db_->ContinueBackgroundWork(); |
2138 | } | |
2139 | ||
2140 | void StressTest::TestAcquireSnapshot(ThreadState* thread, | |
2141 | int rand_column_family, | |
2142 | const std::string& keystr, uint64_t i) { | |
2143 | Slice key = keystr; | |
2144 | ColumnFamilyHandle* column_family = column_families_[rand_column_family]; | |
1e59de90 TL |
2145 | // This `ReadOptions` is for validation purposes. Ignore |
2146 | // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. | |
2147 | ReadOptions ropt; | |
f67539c2 | 2148 | #ifndef ROCKSDB_LITE |
20effc67 | 2149 | auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB()); |
f67539c2 TL |
2150 | const bool ww_snapshot = thread->rand.OneIn(10); |
2151 | const Snapshot* snapshot = | |
2152 | ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary() | |
2153 | : db_->GetSnapshot(); | |
2154 | #else | |
2155 | const Snapshot* snapshot = db_->GetSnapshot(); | |
2156 | #endif // !ROCKSDB_LITE | |
f67539c2 | 2157 | ropt.snapshot = snapshot; |
1e59de90 TL |
2158 | |
2159 | // Ideally, we want snapshot taking and timestamp generation to be atomic | |
2160 | // here, so that the snapshot corresponds to the timestamp. However, it is | |
2161 | // not possible with current GetSnapshot() API. | |
2162 | std::string ts_str; | |
2163 | Slice ts; | |
2164 | if (FLAGS_user_timestamp_size > 0) { | |
2165 | ts_str = GetNowNanos(); | |
2166 | ts = ts_str; | |
2167 | ropt.timestamp = &ts; | |
2168 | } | |
2169 | ||
f67539c2 TL |
2170 | std::string value_at; |
2171 | // When taking a snapshot, we also read a key from that snapshot. We | |
2172 | // will later read the same key before releasing the snapshot and | |
2173 | // verify that the results are the same. | |
2174 | auto status_at = db_->Get(ropt, column_family, key, &value_at); | |
2175 | std::vector<bool>* key_vec = nullptr; | |
2176 | ||
2177 | if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) { | |
2178 | key_vec = new std::vector<bool>(FLAGS_max_key); | |
2179 | // When `prefix_extractor` is set, seeking to beginning and scanning | |
2180 | // across prefixes are only supported with `total_order_seek` set. | |
2181 | ropt.total_order_seek = true; | |
2182 | std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt)); | |
2183 | for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { | |
2184 | uint64_t key_val; | |
2185 | if (GetIntVal(iterator->key().ToString(), &key_val)) { | |
2186 | (*key_vec)[key_val] = true; | |
2187 | } | |
2188 | } | |
2189 | } | |
2190 | ||
1e59de90 TL |
2191 | ThreadState::SnapshotState snap_state = {snapshot, |
2192 | rand_column_family, | |
2193 | column_family->GetName(), | |
2194 | keystr, | |
2195 | status_at, | |
2196 | value_at, | |
2197 | key_vec, | |
2198 | ts_str}; | |
f67539c2 TL |
2199 | uint64_t hold_for = FLAGS_snapshot_hold_ops; |
2200 | if (FLAGS_long_running_snapshots) { | |
2201 | // Hold 10% of snapshots for 10x more | |
2202 | if (thread->rand.OneIn(10)) { | |
1e59de90 | 2203 | assert(hold_for < std::numeric_limits<uint64_t>::max() / 10); |
f67539c2 TL |
2204 | hold_for *= 10; |
2205 | // Hold 1% of snapshots for 100x more | |
2206 | if (thread->rand.OneIn(10)) { | |
1e59de90 | 2207 | assert(hold_for < std::numeric_limits<uint64_t>::max() / 10); |
f67539c2 TL |
2208 | hold_for *= 10; |
2209 | } | |
2210 | } | |
2211 | } | |
2212 | uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for); | |
2213 | thread->snapshot_queue.emplace(release_at, snap_state); | |
2214 | } | |
2215 | ||
2216 | Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) { | |
2217 | while (!thread->snapshot_queue.empty() && | |
2218 | i >= thread->snapshot_queue.front().first) { | |
2219 | auto snap_state = thread->snapshot_queue.front().second; | |
2220 | assert(snap_state.snapshot); | |
2221 | // Note: this is unsafe as the cf might be dropped concurrently. But | |
2222 | // it is ok since unclean cf drop is cunnrently not supported by write | |
2223 | // prepared transactions. | |
2224 | Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state); | |
2225 | db_->ReleaseSnapshot(snap_state.snapshot); | |
2226 | delete snap_state.key_vec; | |
2227 | thread->snapshot_queue.pop(); | |
2228 | if (!s.ok()) { | |
2229 | return s; | |
2230 | } | |
2231 | } | |
2232 | return Status::OK(); | |
2233 | } | |
2234 | ||
2235 | void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, | |
2236 | const Slice& start_key, | |
2237 | ColumnFamilyHandle* column_family) { | |
2238 | int64_t end_key_num; | |
1e59de90 TL |
2239 | if (std::numeric_limits<int64_t>::max() - rand_key < |
2240 | FLAGS_compact_range_width) { | |
2241 | end_key_num = std::numeric_limits<int64_t>::max(); | |
f67539c2 TL |
2242 | } else { |
2243 | end_key_num = FLAGS_compact_range_width + rand_key; | |
2244 | } | |
2245 | std::string end_key_buf = Key(end_key_num); | |
2246 | Slice end_key(end_key_buf); | |
2247 | ||
2248 | CompactRangeOptions cro; | |
2249 | cro.exclusive_manual_compaction = static_cast<bool>(thread->rand.Next() % 2); | |
2250 | cro.change_level = static_cast<bool>(thread->rand.Next() % 2); | |
2251 | std::vector<BottommostLevelCompaction> bottom_level_styles = { | |
2252 | BottommostLevelCompaction::kSkip, | |
2253 | BottommostLevelCompaction::kIfHaveCompactionFilter, | |
2254 | BottommostLevelCompaction::kForce, | |
2255 | BottommostLevelCompaction::kForceOptimized}; | |
2256 | cro.bottommost_level_compaction = | |
2257 | bottom_level_styles[thread->rand.Next() % | |
2258 | static_cast<uint32_t>(bottom_level_styles.size())]; | |
2259 | cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2); | |
2260 | cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4); | |
1e59de90 TL |
2261 | std::vector<BlobGarbageCollectionPolicy> blob_gc_policies = { |
2262 | BlobGarbageCollectionPolicy::kForce, | |
2263 | BlobGarbageCollectionPolicy::kDisable, | |
2264 | BlobGarbageCollectionPolicy::kUseDefault}; | |
2265 | cro.blob_garbage_collection_policy = | |
2266 | blob_gc_policies[thread->rand.Next() % | |
2267 | static_cast<uint32_t>(blob_gc_policies.size())]; | |
2268 | cro.blob_garbage_collection_age_cutoff = | |
2269 | static_cast<double>(thread->rand.Next() % 100) / 100.0; | |
f67539c2 TL |
2270 | |
2271 | const Snapshot* pre_snapshot = nullptr; | |
2272 | uint32_t pre_hash = 0; | |
2273 | if (thread->rand.OneIn(2)) { | |
2274 | // Do some validation by declaring a snapshot and compare the data before | |
2275 | // and after the compaction | |
2276 | pre_snapshot = db_->GetSnapshot(); | |
2277 | pre_hash = | |
2278 | GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key); | |
2279 | } | |
2280 | ||
2281 | Status status = db_->CompactRange(cro, column_family, &start_key, &end_key); | |
2282 | ||
2283 | if (!status.ok()) { | |
2284 | fprintf(stdout, "Unable to perform CompactRange(): %s\n", | |
2285 | status.ToString().c_str()); | |
2286 | } | |
2287 | ||
2288 | if (pre_snapshot != nullptr) { | |
2289 | uint32_t post_hash = | |
2290 | GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key); | |
2291 | if (pre_hash != post_hash) { | |
2292 | fprintf(stderr, | |
2293 | "Data hash different before and after compact range " | |
2294 | "start_key %s end_key %s\n", | |
2295 | start_key.ToString(true).c_str(), end_key.ToString(true).c_str()); | |
2296 | thread->stats.AddErrors(1); | |
2297 | // Fail fast to preserve the DB state. | |
2298 | thread->shared->SetVerificationFailure(); | |
2299 | } | |
2300 | db_->ReleaseSnapshot(pre_snapshot); | |
2301 | } | |
2302 | } | |
2303 | ||
2304 | uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot, | |
2305 | ColumnFamilyHandle* column_family, | |
2306 | const Slice& start_key, | |
2307 | const Slice& end_key) { | |
1e59de90 TL |
2308 | // This `ReadOptions` is for validation purposes. Ignore |
2309 | // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. | |
f67539c2 TL |
2310 | ReadOptions ro; |
2311 | ro.snapshot = snapshot; | |
2312 | ro.total_order_seek = true; | |
1e59de90 TL |
2313 | std::string ts_str; |
2314 | Slice ts; | |
2315 | if (FLAGS_user_timestamp_size > 0) { | |
2316 | ts_str = GetNowNanos(); | |
2317 | ts = ts_str; | |
2318 | ro.timestamp = &ts; | |
2319 | } | |
2320 | ||
f67539c2 | 2321 | std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family)); |
1e59de90 TL |
2322 | |
2323 | constexpr char kCrcCalculatorSepearator = ';'; | |
2324 | ||
2325 | uint32_t crc = 0; | |
2326 | ||
f67539c2 TL |
2327 | for (it->Seek(start_key); |
2328 | it->Valid() && options_.comparator->Compare(it->key(), end_key) <= 0; | |
2329 | it->Next()) { | |
2330 | crc = crc32c::Extend(crc, it->key().data(), it->key().size()); | |
1e59de90 | 2331 | crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char)); |
f67539c2 | 2332 | crc = crc32c::Extend(crc, it->value().data(), it->value().size()); |
1e59de90 TL |
2333 | crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char)); |
2334 | ||
2335 | for (const auto& column : it->columns()) { | |
2336 | crc = crc32c::Extend(crc, column.name().data(), column.name().size()); | |
2337 | crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char)); | |
2338 | crc = crc32c::Extend(crc, column.value().data(), column.value().size()); | |
2339 | crc = crc32c::Extend(crc, &kCrcCalculatorSepearator, sizeof(char)); | |
2340 | } | |
f67539c2 | 2341 | } |
1e59de90 | 2342 | |
f67539c2 TL |
2343 | if (!it->status().ok()) { |
2344 | fprintf(stderr, "Iterator non-OK when calculating range CRC: %s\n", | |
2345 | it->status().ToString().c_str()); | |
2346 | thread->stats.AddErrors(1); | |
2347 | // Fail fast to preserve the DB state. | |
2348 | thread->shared->SetVerificationFailure(); | |
2349 | } | |
1e59de90 | 2350 | |
f67539c2 TL |
2351 | return crc; |
2352 | } | |
2353 | ||
2354 | void StressTest::PrintEnv() const { | |
2355 | fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion, | |
2356 | kMinorVersion); | |
2357 | fprintf(stdout, "Format version : %d\n", FLAGS_format_version); | |
2358 | fprintf(stdout, "TransactionDB : %s\n", | |
2359 | FLAGS_use_txn ? "true" : "false"); | |
1e59de90 TL |
2360 | |
2361 | if (FLAGS_use_txn) { | |
2362 | #ifndef ROCKSDB_LITE | |
2363 | fprintf(stdout, "Two write queues: : %s\n", | |
2364 | FLAGS_two_write_queues ? "true" : "false"); | |
2365 | fprintf(stdout, "Write policy : %d\n", | |
2366 | static_cast<int>(FLAGS_txn_write_policy)); | |
2367 | if (static_cast<uint64_t>(TxnDBWritePolicy::WRITE_PREPARED) == | |
2368 | FLAGS_txn_write_policy || | |
2369 | static_cast<uint64_t>(TxnDBWritePolicy::WRITE_UNPREPARED) == | |
2370 | FLAGS_txn_write_policy) { | |
2371 | fprintf(stdout, "Snapshot cache bits : %d\n", | |
2372 | static_cast<int>(FLAGS_wp_snapshot_cache_bits)); | |
2373 | fprintf(stdout, "Commit cache bits : %d\n", | |
2374 | static_cast<int>(FLAGS_wp_commit_cache_bits)); | |
2375 | } | |
2376 | fprintf(stdout, "last cwb for recovery : %s\n", | |
2377 | FLAGS_use_only_the_last_commit_time_batch_for_recovery ? "true" | |
2378 | : "false"); | |
2379 | #endif // !ROCKSDB_LITE | |
2380 | } | |
2381 | ||
f67539c2 | 2382 | #ifndef ROCKSDB_LITE |
1e59de90 | 2383 | fprintf(stdout, "Stacked BlobDB : %s\n", |
f67539c2 TL |
2384 | FLAGS_use_blob_db ? "true" : "false"); |
2385 | #endif // !ROCKSDB_LITE | |
2386 | fprintf(stdout, "Read only mode : %s\n", | |
2387 | FLAGS_read_only ? "true" : "false"); | |
2388 | fprintf(stdout, "Atomic flush : %s\n", | |
2389 | FLAGS_atomic_flush ? "true" : "false"); | |
1e59de90 TL |
2390 | fprintf(stdout, "Manual WAL flush : %s\n", |
2391 | FLAGS_manual_wal_flush_one_in > 0 ? "true" : "false"); | |
f67539c2 TL |
2392 | fprintf(stdout, "Column families : %d\n", FLAGS_column_families); |
2393 | if (!FLAGS_test_batches_snapshots) { | |
2394 | fprintf(stdout, "Clear CFs one in : %d\n", | |
2395 | FLAGS_clear_column_family_one_in); | |
2396 | } | |
2397 | fprintf(stdout, "Number of threads : %d\n", FLAGS_threads); | |
2398 | fprintf(stdout, "Ops per thread : %lu\n", | |
2399 | (unsigned long)FLAGS_ops_per_thread); | |
2400 | std::string ttl_state("unused"); | |
2401 | if (FLAGS_ttl > 0) { | |
1e59de90 | 2402 | ttl_state = std::to_string(FLAGS_ttl); |
f67539c2 TL |
2403 | } |
2404 | fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str()); | |
2405 | fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent); | |
2406 | fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent); | |
2407 | fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent); | |
2408 | fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent); | |
2409 | fprintf(stdout, "Delete range percentage : %d%%\n", FLAGS_delrangepercent); | |
2410 | fprintf(stdout, "No overwrite percentage : %d%%\n", | |
2411 | FLAGS_nooverwritepercent); | |
2412 | fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent); | |
1e59de90 | 2413 | fprintf(stdout, "Custom ops percentage : %d%%\n", FLAGS_customopspercent); |
f67539c2 TL |
2414 | fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n", |
2415 | FLAGS_db_write_buffer_size); | |
2416 | fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size); | |
2417 | fprintf(stdout, "Iterations : %lu\n", | |
2418 | (unsigned long)FLAGS_num_iterations); | |
2419 | fprintf(stdout, "Max key : %lu\n", | |
2420 | (unsigned long)FLAGS_max_key); | |
2421 | fprintf(stdout, "Ratio #ops/#keys : %f\n", | |
2422 | (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key); | |
2423 | fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen); | |
2424 | fprintf(stdout, "Batches/snapshots : %d\n", | |
2425 | FLAGS_test_batches_snapshots); | |
2426 | fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update); | |
2427 | fprintf(stdout, "Num keys per lock : %d\n", | |
2428 | 1 << FLAGS_log2_keys_per_lock); | |
2429 | std::string compression = CompressionTypeToString(compression_type_e); | |
2430 | fprintf(stdout, "Compression : %s\n", compression.c_str()); | |
2431 | std::string bottommost_compression = | |
2432 | CompressionTypeToString(bottommost_compression_type_e); | |
2433 | fprintf(stdout, "Bottommost Compression : %s\n", | |
2434 | bottommost_compression.c_str()); | |
2435 | std::string checksum = ChecksumTypeToString(checksum_type_e); | |
2436 | fprintf(stdout, "Checksum type : %s\n", checksum.c_str()); | |
20effc67 TL |
2437 | fprintf(stdout, "File checksum impl : %s\n", |
2438 | FLAGS_file_checksum_impl.c_str()); | |
f67539c2 TL |
2439 | fprintf(stdout, "Bloom bits / key : %s\n", |
2440 | FormatDoubleParam(FLAGS_bloom_bits).c_str()); | |
2441 | fprintf(stdout, "Max subcompactions : %" PRIu64 "\n", | |
2442 | FLAGS_subcompactions); | |
2443 | fprintf(stdout, "Use MultiGet : %s\n", | |
2444 | FLAGS_use_multiget ? "true" : "false"); | |
2445 | ||
2446 | const char* memtablerep = ""; | |
2447 | switch (FLAGS_rep_factory) { | |
2448 | case kSkipList: | |
2449 | memtablerep = "skip_list"; | |
2450 | break; | |
2451 | case kHashSkipList: | |
2452 | memtablerep = "prefix_hash"; | |
2453 | break; | |
2454 | case kVectorRep: | |
2455 | memtablerep = "vector"; | |
2456 | break; | |
2457 | } | |
2458 | ||
2459 | fprintf(stdout, "Memtablerep : %s\n", memtablerep); | |
2460 | ||
1e59de90 TL |
2461 | #ifndef NDEBUG |
2462 | KillPoint* kp = KillPoint::GetInstance(); | |
2463 | fprintf(stdout, "Test kill odd : %d\n", kp->rocksdb_kill_odds); | |
2464 | if (!kp->rocksdb_kill_exclude_prefixes.empty()) { | |
f67539c2 | 2465 | fprintf(stdout, "Skipping kill points prefixes:\n"); |
1e59de90 | 2466 | for (auto& p : kp->rocksdb_kill_exclude_prefixes) { |
f67539c2 TL |
2467 | fprintf(stdout, " %s\n", p.c_str()); |
2468 | } | |
2469 | } | |
1e59de90 | 2470 | #endif |
f67539c2 TL |
2471 | fprintf(stdout, "Periodic Compaction Secs : %" PRIu64 "\n", |
2472 | FLAGS_periodic_compaction_seconds); | |
2473 | fprintf(stdout, "Compaction TTL : %" PRIu64 "\n", | |
2474 | FLAGS_compaction_ttl); | |
1e59de90 TL |
2475 | const char* compaction_pri = ""; |
2476 | switch (FLAGS_compaction_pri) { | |
2477 | case kByCompensatedSize: | |
2478 | compaction_pri = "kByCompensatedSize"; | |
2479 | break; | |
2480 | case kOldestLargestSeqFirst: | |
2481 | compaction_pri = "kOldestLargestSeqFirst"; | |
2482 | break; | |
2483 | case kOldestSmallestSeqFirst: | |
2484 | compaction_pri = "kOldestSmallestSeqFirst"; | |
2485 | break; | |
2486 | case kMinOverlappingRatio: | |
2487 | compaction_pri = "kMinOverlappingRatio"; | |
2488 | break; | |
2489 | case kRoundRobin: | |
2490 | compaction_pri = "kRoundRobin"; | |
2491 | break; | |
2492 | } | |
2493 | fprintf(stdout, "Compaction Pri : %s\n", compaction_pri); | |
f67539c2 TL |
2494 | fprintf(stdout, "Background Purge : %d\n", |
2495 | static_cast<int>(FLAGS_avoid_unnecessary_blocking_io)); | |
2496 | fprintf(stdout, "Write DB ID to manifest : %d\n", | |
2497 | static_cast<int>(FLAGS_write_dbid_to_manifest)); | |
2498 | fprintf(stdout, "Max Write Batch Group Size: %" PRIu64 "\n", | |
2499 | FLAGS_max_write_batch_group_size_bytes); | |
2500 | fprintf(stdout, "Use dynamic level : %d\n", | |
2501 | static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes)); | |
20effc67 | 2502 | fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in); |
1e59de90 TL |
2503 | fprintf(stdout, "Write fault one in : %d\n", FLAGS_write_fault_one_in); |
2504 | fprintf(stdout, "Open metadata write fault one in:\n"); | |
2505 | fprintf(stdout, " %d\n", | |
2506 | FLAGS_open_metadata_write_fault_one_in); | |
2507 | fprintf(stdout, "Sync fault injection : %d\n", | |
2508 | FLAGS_sync_fault_injection); | |
20effc67 TL |
2509 | fprintf(stdout, "Best efforts recovery : %d\n", |
2510 | static_cast<int>(FLAGS_best_efforts_recovery)); | |
1e59de90 TL |
2511 | fprintf(stdout, "Fail if OPTIONS file error: %d\n", |
2512 | static_cast<int>(FLAGS_fail_if_options_file_error)); | |
2513 | fprintf(stdout, "User timestamp size bytes : %d\n", | |
2514 | static_cast<int>(FLAGS_user_timestamp_size)); | |
2515 | fprintf(stdout, "WAL compression : %s\n", | |
2516 | FLAGS_wal_compression.c_str()); | |
2517 | fprintf(stdout, "Try verify sst unique id : %d\n", | |
2518 | static_cast<int>(FLAGS_verify_sst_unique_id_in_manifest)); | |
f67539c2 TL |
2519 | |
2520 | fprintf(stdout, "------------------------------------------------\n"); | |
2521 | } | |
2522 | ||
1e59de90 | 2523 | void StressTest::Open(SharedState* shared) { |
f67539c2 TL |
2524 | assert(db_ == nullptr); |
2525 | #ifndef ROCKSDB_LITE | |
2526 | assert(txn_db_ == nullptr); | |
f67539c2 | 2527 | #else |
1e59de90 TL |
2528 | (void)shared; |
2529 | #endif | |
2530 | if (!InitializeOptionsFromFile(options_)) { | |
2531 | InitializeOptionsFromFlags(cache_, compressed_cache_, filter_policy_, | |
2532 | options_); | |
20effc67 | 2533 | } |
1e59de90 | 2534 | InitializeOptionsGeneral(cache_, compressed_cache_, filter_policy_, options_); |
f67539c2 TL |
2535 | |
2536 | if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) { | |
2537 | fprintf(stderr, | |
2538 | "prefeix_size cannot be zero if memtablerep == prefix_hash\n"); | |
2539 | exit(1); | |
2540 | } | |
2541 | if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) { | |
2542 | fprintf(stderr, | |
2543 | "WARNING: prefix_size is non-zero but " | |
2544 | "memtablerep != prefix_hash\n"); | |
2545 | } | |
1e59de90 TL |
2546 | |
2547 | if ((options_.enable_blob_files || options_.enable_blob_garbage_collection || | |
2548 | FLAGS_allow_setting_blob_options_dynamically) && | |
2549 | FLAGS_best_efforts_recovery) { | |
2550 | fprintf(stderr, | |
2551 | "Integrated BlobDB is currently incompatible with best-effort " | |
2552 | "recovery\n"); | |
2553 | exit(1); | |
f67539c2 TL |
2554 | } |
2555 | ||
1e59de90 TL |
2556 | fprintf(stdout, |
2557 | "Integrated BlobDB: blob files enabled %d, min blob size %" PRIu64 | |
2558 | ", blob file size %" PRIu64 | |
2559 | ", blob compression type %s, blob GC enabled %d, cutoff %f, force " | |
2560 | "threshold %f, blob compaction readahead size %" PRIu64 | |
2561 | ", blob file starting level %d\n", | |
2562 | options_.enable_blob_files, options_.min_blob_size, | |
2563 | options_.blob_file_size, | |
2564 | CompressionTypeToString(options_.blob_compression_type).c_str(), | |
2565 | options_.enable_blob_garbage_collection, | |
2566 | options_.blob_garbage_collection_age_cutoff, | |
2567 | options_.blob_garbage_collection_force_threshold, | |
2568 | options_.blob_compaction_readahead_size, | |
2569 | options_.blob_file_starting_level); | |
2570 | ||
2571 | if (FLAGS_use_blob_cache) { | |
2572 | fprintf(stdout, | |
2573 | "Integrated BlobDB: blob cache enabled" | |
2574 | ", block and blob caches shared: %d", | |
2575 | FLAGS_use_shared_block_and_blob_cache); | |
2576 | if (!FLAGS_use_shared_block_and_blob_cache) { | |
2577 | fprintf(stdout, | |
2578 | ", blob cache size %" PRIu64 ", blob cache num shard bits: %d", | |
2579 | FLAGS_blob_cache_size, FLAGS_blob_cache_numshardbits); | |
2580 | } | |
2581 | fprintf(stdout, ", blob cache prepopulated: %d\n", | |
2582 | FLAGS_prepopulate_blob_cache); | |
f67539c2 | 2583 | } else { |
1e59de90 | 2584 | fprintf(stdout, "Integrated BlobDB: blob cache disabled\n"); |
20effc67 | 2585 | } |
f67539c2 TL |
2586 | |
2587 | fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); | |
2588 | ||
2589 | Status s; | |
1e59de90 | 2590 | |
f67539c2 TL |
2591 | if (FLAGS_ttl == -1) { |
2592 | std::vector<std::string> existing_column_families; | |
2593 | s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db, | |
2594 | &existing_column_families); // ignore errors | |
2595 | if (!s.ok()) { | |
2596 | // DB doesn't exist | |
2597 | assert(existing_column_families.empty()); | |
2598 | assert(column_family_names_.empty()); | |
2599 | column_family_names_.push_back(kDefaultColumnFamilyName); | |
2600 | } else if (column_family_names_.empty()) { | |
2601 | // this is the first call to the function Open() | |
2602 | column_family_names_ = existing_column_families; | |
2603 | } else { | |
2604 | // this is a reopen. just assert that existing column_family_names are | |
2605 | // equivalent to what we remember | |
2606 | auto sorted_cfn = column_family_names_; | |
2607 | std::sort(sorted_cfn.begin(), sorted_cfn.end()); | |
2608 | std::sort(existing_column_families.begin(), | |
2609 | existing_column_families.end()); | |
2610 | if (sorted_cfn != existing_column_families) { | |
2611 | fprintf(stderr, "Expected column families differ from the existing:\n"); | |
2612 | fprintf(stderr, "Expected: {"); | |
2613 | for (auto cf : sorted_cfn) { | |
2614 | fprintf(stderr, "%s ", cf.c_str()); | |
2615 | } | |
2616 | fprintf(stderr, "}\n"); | |
2617 | fprintf(stderr, "Existing: {"); | |
2618 | for (auto cf : existing_column_families) { | |
2619 | fprintf(stderr, "%s ", cf.c_str()); | |
2620 | } | |
2621 | fprintf(stderr, "}\n"); | |
2622 | } | |
2623 | assert(sorted_cfn == existing_column_families); | |
2624 | } | |
2625 | std::vector<ColumnFamilyDescriptor> cf_descriptors; | |
2626 | for (auto name : column_family_names_) { | |
2627 | if (name != kDefaultColumnFamilyName) { | |
2628 | new_column_family_name_ = | |
2629 | std::max(new_column_family_name_.load(), std::stoi(name) + 1); | |
2630 | } | |
2631 | cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); | |
2632 | } | |
2633 | while (cf_descriptors.size() < (size_t)FLAGS_column_families) { | |
1e59de90 | 2634 | std::string name = std::to_string(new_column_family_name_.load()); |
f67539c2 TL |
2635 | new_column_family_name_++; |
2636 | cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); | |
2637 | column_family_names_.push_back(name); | |
2638 | } | |
1e59de90 | 2639 | |
f67539c2 | 2640 | options_.listeners.clear(); |
f67539c2 | 2641 | #ifndef ROCKSDB_LITE |
1e59de90 TL |
2642 | options_.listeners.emplace_back(new DbStressListener( |
2643 | FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env)); | |
2644 | #endif // !ROCKSDB_LITE | |
2645 | RegisterAdditionalListeners(); | |
2646 | ||
2647 | if (!FLAGS_use_txn) { | |
2648 | // Determine whether we need to ingest file metadata write failures | |
2649 | // during DB reopen. If it does, enable it. | |
2650 | // Only ingest metadata error if it is reopening, as initial open | |
2651 | // failure doesn't need to be handled. | |
2652 | // TODO cover transaction DB is not covered in this fault test too. | |
2653 | bool ingest_meta_error = false; | |
2654 | bool ingest_write_error = false; | |
2655 | bool ingest_read_error = false; | |
2656 | if ((FLAGS_open_metadata_write_fault_one_in || | |
2657 | FLAGS_open_write_fault_one_in || FLAGS_open_read_fault_one_in) && | |
2658 | fault_fs_guard | |
2659 | ->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr) | |
2660 | .ok()) { | |
2661 | if (!FLAGS_sync) { | |
2662 | // When DB Stress is not sync mode, we expect all WAL writes to | |
2663 | // WAL is durable. Buffering unsynced writes will cause false | |
2664 | // positive in crash tests. Before we figure out a way to | |
2665 | // solve it, skip WAL from failure injection. | |
2666 | fault_fs_guard->SetSkipDirectWritableTypes({kWalFile}); | |
2667 | } | |
2668 | ingest_meta_error = FLAGS_open_metadata_write_fault_one_in; | |
2669 | ingest_write_error = FLAGS_open_write_fault_one_in; | |
2670 | ingest_read_error = FLAGS_open_read_fault_one_in; | |
2671 | if (ingest_meta_error) { | |
2672 | fault_fs_guard->EnableMetadataWriteErrorInjection(); | |
2673 | fault_fs_guard->SetRandomMetadataWriteError( | |
2674 | FLAGS_open_metadata_write_fault_one_in); | |
2675 | } | |
2676 | if (ingest_write_error) { | |
2677 | fault_fs_guard->SetFilesystemDirectWritable(false); | |
2678 | fault_fs_guard->EnableWriteErrorInjection(); | |
2679 | fault_fs_guard->SetRandomWriteError( | |
2680 | static_cast<uint32_t>(FLAGS_seed), FLAGS_open_write_fault_one_in, | |
2681 | IOStatus::IOError("Injected Open Error"), | |
2682 | /*inject_for_all_file_types=*/true, /*types=*/{}); | |
f67539c2 | 2683 | } |
1e59de90 TL |
2684 | if (ingest_read_error) { |
2685 | fault_fs_guard->SetRandomReadError(FLAGS_open_read_fault_one_in); | |
2686 | } | |
2687 | } | |
2688 | while (true) { | |
2689 | #ifndef ROCKSDB_LITE | |
2690 | // StackableDB-based BlobDB | |
2691 | if (FLAGS_use_blob_db) { | |
2692 | blob_db::BlobDBOptions blob_db_options; | |
2693 | blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size; | |
2694 | blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync; | |
2695 | blob_db_options.blob_file_size = FLAGS_blob_db_file_size; | |
2696 | blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc; | |
2697 | blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff; | |
2698 | ||
2699 | blob_db::BlobDB* blob_db = nullptr; | |
2700 | s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db, | |
2701 | cf_descriptors, &column_families_, | |
2702 | &blob_db); | |
2703 | if (s.ok()) { | |
2704 | db_ = blob_db; | |
2705 | } | |
2706 | } else | |
f67539c2 | 2707 | #endif // !ROCKSDB_LITE |
1e59de90 TL |
2708 | { |
2709 | if (db_preload_finished_.load() && FLAGS_read_only) { | |
2710 | s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, | |
2711 | cf_descriptors, &column_families_, &db_); | |
2712 | } else { | |
2713 | s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, | |
2714 | &column_families_, &db_); | |
2715 | } | |
f67539c2 | 2716 | } |
1e59de90 TL |
2717 | |
2718 | if (ingest_meta_error || ingest_write_error || ingest_read_error) { | |
2719 | fault_fs_guard->SetFilesystemDirectWritable(true); | |
2720 | fault_fs_guard->DisableMetadataWriteErrorInjection(); | |
2721 | fault_fs_guard->DisableWriteErrorInjection(); | |
2722 | fault_fs_guard->SetSkipDirectWritableTypes({}); | |
2723 | fault_fs_guard->SetRandomReadError(0); | |
2724 | if (s.ok()) { | |
2725 | // Ingested errors might happen in background compactions. We | |
2726 | // wait for all compactions to finish to make sure DB is in | |
2727 | // clean state before executing queries. | |
2728 | s = static_cast_with_check<DBImpl>(db_->GetRootDB()) | |
2729 | ->WaitForCompact(true /* wait_unscheduled */); | |
2730 | if (!s.ok()) { | |
2731 | for (auto cf : column_families_) { | |
2732 | delete cf; | |
2733 | } | |
2734 | column_families_.clear(); | |
2735 | delete db_; | |
2736 | db_ = nullptr; | |
2737 | } | |
2738 | } | |
2739 | if (!s.ok()) { | |
2740 | // After failure to opening a DB due to IO error, retry should | |
2741 | // successfully open the DB with correct data if no IO error shows | |
2742 | // up. | |
2743 | ingest_meta_error = false; | |
2744 | ingest_write_error = false; | |
2745 | ingest_read_error = false; | |
2746 | ||
2747 | Random rand(static_cast<uint32_t>(FLAGS_seed)); | |
2748 | if (rand.OneIn(2)) { | |
2749 | fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(), | |
2750 | nullptr); | |
2751 | } | |
2752 | if (rand.OneIn(3)) { | |
2753 | fault_fs_guard->DropUnsyncedFileData(); | |
2754 | } else if (rand.OneIn(2)) { | |
2755 | fault_fs_guard->DropRandomUnsyncedFileData(&rand); | |
2756 | } | |
2757 | continue; | |
2758 | } | |
2759 | } | |
2760 | break; | |
f67539c2 TL |
2761 | } |
2762 | } else { | |
2763 | #ifndef ROCKSDB_LITE | |
2764 | TransactionDBOptions txn_db_options; | |
2765 | assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED); | |
2766 | txn_db_options.write_policy = | |
2767 | static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy); | |
2768 | if (FLAGS_unordered_write) { | |
2769 | assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED); | |
2770 | options_.unordered_write = true; | |
2771 | options_.two_write_queues = true; | |
2772 | txn_db_options.skip_concurrency_control = true; | |
1e59de90 TL |
2773 | } else { |
2774 | options_.two_write_queues = FLAGS_two_write_queues; | |
f67539c2 | 2775 | } |
1e59de90 TL |
2776 | txn_db_options.wp_snapshot_cache_bits = |
2777 | static_cast<size_t>(FLAGS_wp_snapshot_cache_bits); | |
2778 | txn_db_options.wp_commit_cache_bits = | |
2779 | static_cast<size_t>(FLAGS_wp_commit_cache_bits); | |
2780 | PrepareTxnDbOptions(shared, txn_db_options); | |
f67539c2 TL |
2781 | s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, |
2782 | cf_descriptors, &column_families_, &txn_db_); | |
2783 | if (!s.ok()) { | |
2784 | fprintf(stderr, "Error in opening the TransactionDB [%s]\n", | |
2785 | s.ToString().c_str()); | |
2786 | fflush(stderr); | |
2787 | } | |
2788 | assert(s.ok()); | |
1e59de90 TL |
2789 | |
2790 | // Do not swap the order of the following. | |
2791 | { | |
2792 | db_ = txn_db_; | |
2793 | db_aptr_.store(txn_db_, std::memory_order_release); | |
f67539c2 | 2794 | } |
f67539c2 TL |
2795 | #endif |
2796 | } | |
1e59de90 TL |
2797 | if (!s.ok()) { |
2798 | fprintf(stderr, "Error in opening the DB [%s]\n", s.ToString().c_str()); | |
2799 | fflush(stderr); | |
2800 | } | |
2801 | assert(s.ok()); | |
2802 | assert(column_families_.size() == | |
2803 | static_cast<size_t>(FLAGS_column_families)); | |
f67539c2 | 2804 | |
1e59de90 TL |
2805 | // Secondary instance does not support write-prepared/write-unprepared |
2806 | // transactions, thus just disable secondary instance if we use | |
2807 | // transaction. | |
2808 | if (s.ok() && FLAGS_test_secondary && !FLAGS_use_txn) { | |
f67539c2 | 2809 | #ifndef ROCKSDB_LITE |
f67539c2 TL |
2810 | Options tmp_opts; |
2811 | // TODO(yanqin) support max_open_files != -1 for secondary instance. | |
2812 | tmp_opts.max_open_files = -1; | |
f67539c2 | 2813 | tmp_opts.env = db_stress_env; |
1e59de90 TL |
2814 | const std::string& secondary_path = FLAGS_secondaries_base; |
2815 | s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, | |
2816 | cf_descriptors, &cmp_cfhs_, &cmp_db_); | |
f67539c2 | 2817 | assert(s.ok()); |
1e59de90 | 2818 | assert(cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families)); |
f67539c2 TL |
2819 | #else |
2820 | fprintf(stderr, "Secondary is not supported in RocksDBLite\n"); | |
2821 | exit(1); | |
1e59de90 | 2822 | #endif // !ROCKSDB_LITE |
f67539c2 TL |
2823 | } |
2824 | } else { | |
2825 | #ifndef ROCKSDB_LITE | |
2826 | DBWithTTL* db_with_ttl; | |
2827 | s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); | |
2828 | db_ = db_with_ttl; | |
f67539c2 TL |
2829 | #else |
2830 | fprintf(stderr, "TTL is not supported in RocksDBLite\n"); | |
2831 | exit(1); | |
2832 | #endif | |
2833 | } | |
1e59de90 TL |
2834 | |
2835 | if (FLAGS_preserve_unverified_changes) { | |
2836 | // Up until now, no live file should have become obsolete due to these | |
2837 | // options. After `DisableFileDeletions()` we can reenable auto compactions | |
2838 | // since, even if live files become obsolete, they won't be deleted. | |
2839 | assert(options_.avoid_flush_during_recovery); | |
2840 | assert(options_.disable_auto_compactions); | |
2841 | if (s.ok()) { | |
2842 | s = db_->DisableFileDeletions(); | |
2843 | } | |
2844 | if (s.ok()) { | |
2845 | s = db_->EnableAutoCompaction(column_families_); | |
2846 | } | |
2847 | } | |
2848 | ||
f67539c2 TL |
2849 | if (!s.ok()) { |
2850 | fprintf(stderr, "open error: %s\n", s.ToString().c_str()); | |
2851 | exit(1); | |
2852 | } | |
2853 | } | |
2854 | ||
2855 | void StressTest::Reopen(ThreadState* thread) { | |
2856 | #ifndef ROCKSDB_LITE | |
2857 | // BG jobs in WritePrepared must be canceled first because i) they can access | |
2858 | // the db via a callbac ii) they hold on to a snapshot and the upcoming | |
2859 | // ::Close would complain about it. | |
2860 | const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0; | |
1e59de90 | 2861 | bool bg_canceled __attribute__((unused)) = false; |
f67539c2 TL |
2862 | if (write_prepared || thread->rand.OneIn(2)) { |
2863 | const bool wait = | |
2864 | write_prepared || static_cast<bool>(thread->rand.OneIn(2)); | |
2865 | CancelAllBackgroundWork(db_, wait); | |
2866 | bg_canceled = wait; | |
2867 | } | |
2868 | assert(!write_prepared || bg_canceled); | |
f67539c2 | 2869 | #else |
1e59de90 | 2870 | (void)thread; |
f67539c2 TL |
2871 | #endif |
2872 | ||
2873 | for (auto cf : column_families_) { | |
2874 | delete cf; | |
2875 | } | |
2876 | column_families_.clear(); | |
2877 | ||
2878 | #ifndef ROCKSDB_LITE | |
2879 | if (thread->rand.OneIn(2)) { | |
2880 | Status s = db_->Close(); | |
2881 | if (!s.ok()) { | |
2882 | fprintf(stderr, "Non-ok close status: %s\n", s.ToString().c_str()); | |
2883 | fflush(stderr); | |
2884 | } | |
2885 | assert(s.ok()); | |
2886 | } | |
2887 | #endif | |
2888 | delete db_; | |
2889 | db_ = nullptr; | |
2890 | #ifndef ROCKSDB_LITE | |
2891 | txn_db_ = nullptr; | |
2892 | #endif | |
2893 | ||
1e59de90 TL |
2894 | num_times_reopened_++; |
2895 | auto now = clock_->NowMicros(); | |
2896 | fprintf(stdout, "%s Reopening database for the %dth time\n", | |
2897 | clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_); | |
2898 | Open(thread->shared); | |
2899 | ||
2900 | if ((FLAGS_sync_fault_injection || FLAGS_disable_wal || | |
2901 | FLAGS_manual_wal_flush_one_in > 0) && | |
2902 | IsStateTracked()) { | |
2903 | Status s = thread->shared->SaveAtAndAfter(db_); | |
2904 | if (!s.ok()) { | |
2905 | fprintf(stderr, "Error enabling history tracing: %s\n", | |
2906 | s.ToString().c_str()); | |
2907 | exit(1); | |
f67539c2 | 2908 | } |
f67539c2 | 2909 | } |
1e59de90 TL |
2910 | } |
2911 | ||
2912 | bool StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread, | |
2913 | std::string& ts_str, | |
2914 | Slice& ts_slice, | |
2915 | ReadOptions& read_opts) { | |
2916 | if (FLAGS_user_timestamp_size == 0) { | |
2917 | return false; | |
2918 | } | |
f67539c2 | 2919 | |
1e59de90 TL |
2920 | assert(thread); |
2921 | if (!thread->rand.OneInOpt(3)) { | |
2922 | return false; | |
2923 | } | |
2924 | ||
2925 | const SharedState* const shared = thread->shared; | |
2926 | assert(shared); | |
2927 | const uint64_t start_ts = shared->GetStartTimestamp(); | |
2928 | ||
2929 | uint64_t now = db_stress_env->NowNanos(); | |
2930 | ||
2931 | assert(now > start_ts); | |
2932 | uint64_t time_diff = now - start_ts; | |
2933 | uint64_t ts = start_ts + (thread->rand.Next64() % time_diff); | |
2934 | ts_str.clear(); | |
2935 | PutFixed64(&ts_str, ts); | |
2936 | ts_slice = ts_str; | |
2937 | read_opts.timestamp = &ts_slice; | |
2938 | return true; | |
2939 | } | |
2940 | ||
2941 | void StressTest::MaybeUseOlderTimestampForRangeScan(ThreadState* thread, | |
2942 | std::string& ts_str, | |
2943 | Slice& ts_slice, | |
2944 | ReadOptions& read_opts) { | |
2945 | if (FLAGS_user_timestamp_size == 0) { | |
2946 | return; | |
2947 | } | |
2948 | ||
2949 | assert(thread); | |
2950 | if (!thread->rand.OneInOpt(3)) { | |
2951 | return; | |
2952 | } | |
2953 | ||
2954 | const Slice* const saved_ts = read_opts.timestamp; | |
2955 | assert(saved_ts != nullptr); | |
2956 | ||
2957 | const SharedState* const shared = thread->shared; | |
2958 | assert(shared); | |
2959 | const uint64_t start_ts = shared->GetStartTimestamp(); | |
2960 | ||
2961 | uint64_t now = db_stress_env->NowNanos(); | |
2962 | ||
2963 | assert(now > start_ts); | |
2964 | uint64_t time_diff = now - start_ts; | |
2965 | uint64_t ts = start_ts + (thread->rand.Next64() % time_diff); | |
2966 | ts_str.clear(); | |
2967 | PutFixed64(&ts_str, ts); | |
2968 | ts_slice = ts_str; | |
2969 | read_opts.timestamp = &ts_slice; | |
2970 | ||
2971 | // TODO (yanqin): support Merge with iter_start_ts | |
2972 | if (!thread->rand.OneInOpt(3) || FLAGS_use_merge || FLAGS_use_full_merge_v1) { | |
2973 | return; | |
2974 | } | |
2975 | ||
2976 | ts_str.clear(); | |
2977 | PutFixed64(&ts_str, start_ts); | |
2978 | ts_slice = ts_str; | |
2979 | read_opts.iter_start_ts = &ts_slice; | |
2980 | read_opts.timestamp = saved_ts; | |
2981 | } | |
2982 | ||
2983 | void CheckAndSetOptionsForUserTimestamp(Options& options) { | |
2984 | assert(FLAGS_user_timestamp_size > 0); | |
2985 | const Comparator* const cmp = test::BytewiseComparatorWithU64TsWrapper(); | |
2986 | assert(cmp); | |
2987 | if (FLAGS_user_timestamp_size != cmp->timestamp_size()) { | |
2988 | fprintf(stderr, | |
2989 | "Only -user_timestamp_size=%d is supported in stress test.\n", | |
2990 | static_cast<int>(cmp->timestamp_size())); | |
2991 | exit(1); | |
2992 | } | |
2993 | if (FLAGS_use_txn) { | |
2994 | fprintf(stderr, "TransactionDB does not support timestamp yet.\n"); | |
2995 | exit(1); | |
2996 | } | |
2997 | #ifndef ROCKSDB_LITE | |
2998 | if (FLAGS_enable_blob_files || FLAGS_use_blob_db) { | |
2999 | fprintf(stderr, "BlobDB not supported with timestamp.\n"); | |
3000 | exit(1); | |
3001 | } | |
3002 | #endif // !ROCKSDB_LITE | |
3003 | if (FLAGS_test_cf_consistency || FLAGS_test_batches_snapshots) { | |
3004 | fprintf(stderr, | |
3005 | "Due to per-key ts-seq ordering constraint, only the (default) " | |
3006 | "non-batched test is supported with timestamp.\n"); | |
3007 | exit(1); | |
3008 | } | |
3009 | if (FLAGS_ingest_external_file_one_in > 0) { | |
3010 | fprintf(stderr, "Bulk loading may not support timestamp yet.\n"); | |
3011 | exit(1); | |
3012 | } | |
3013 | options.comparator = cmp; | |
3014 | } | |
3015 | ||
3016 | bool InitializeOptionsFromFile(Options& options) { | |
3017 | #ifndef ROCKSDB_LITE | |
3018 | DBOptions db_options; | |
3019 | std::vector<ColumnFamilyDescriptor> cf_descriptors; | |
3020 | if (!FLAGS_options_file.empty()) { | |
3021 | Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env, | |
3022 | &db_options, &cf_descriptors); | |
3023 | if (!s.ok()) { | |
3024 | fprintf(stderr, "Unable to load options file %s --- %s\n", | |
3025 | FLAGS_options_file.c_str(), s.ToString().c_str()); | |
3026 | exit(1); | |
3027 | } | |
3028 | db_options.env = new DbStressEnvWrapper(db_stress_env); | |
3029 | options = Options(db_options, cf_descriptors[0].options); | |
3030 | return true; | |
3031 | } | |
3032 | #else | |
3033 | (void)options; | |
3034 | fprintf(stderr, "--options_file not supported in lite mode\n"); | |
3035 | exit(1); | |
3036 | #endif //! ROCKSDB_LITE | |
3037 | return false; | |
f67539c2 | 3038 | } |
1e59de90 TL |
3039 | |
3040 | void InitializeOptionsFromFlags( | |
3041 | const std::shared_ptr<Cache>& cache, | |
3042 | const std::shared_ptr<Cache>& block_cache_compressed, | |
3043 | const std::shared_ptr<const FilterPolicy>& filter_policy, | |
3044 | Options& options) { | |
3045 | BlockBasedTableOptions block_based_options; | |
3046 | block_based_options.block_cache = cache; | |
3047 | block_based_options.cache_index_and_filter_blocks = | |
3048 | FLAGS_cache_index_and_filter_blocks; | |
3049 | block_based_options.metadata_cache_options.top_level_index_pinning = | |
3050 | static_cast<PinningTier>(FLAGS_top_level_index_pinning); | |
3051 | block_based_options.metadata_cache_options.partition_pinning = | |
3052 | static_cast<PinningTier>(FLAGS_partition_pinning); | |
3053 | block_based_options.metadata_cache_options.unpartitioned_pinning = | |
3054 | static_cast<PinningTier>(FLAGS_unpartitioned_pinning); | |
3055 | block_based_options.block_cache_compressed = block_cache_compressed; | |
3056 | block_based_options.checksum = checksum_type_e; | |
3057 | block_based_options.block_size = FLAGS_block_size; | |
3058 | block_based_options.cache_usage_options.options_overrides.insert( | |
3059 | {CacheEntryRole::kCompressionDictionaryBuildingBuffer, | |
3060 | {/*.charged = */ FLAGS_charge_compression_dictionary_building_buffer | |
3061 | ? CacheEntryRoleOptions::Decision::kEnabled | |
3062 | : CacheEntryRoleOptions::Decision::kDisabled}}); | |
3063 | block_based_options.cache_usage_options.options_overrides.insert( | |
3064 | {CacheEntryRole::kFilterConstruction, | |
3065 | {/*.charged = */ FLAGS_charge_filter_construction | |
3066 | ? CacheEntryRoleOptions::Decision::kEnabled | |
3067 | : CacheEntryRoleOptions::Decision::kDisabled}}); | |
3068 | block_based_options.cache_usage_options.options_overrides.insert( | |
3069 | {CacheEntryRole::kBlockBasedTableReader, | |
3070 | {/*.charged = */ FLAGS_charge_table_reader | |
3071 | ? CacheEntryRoleOptions::Decision::kEnabled | |
3072 | : CacheEntryRoleOptions::Decision::kDisabled}}); | |
3073 | block_based_options.cache_usage_options.options_overrides.insert( | |
3074 | {CacheEntryRole::kFileMetadata, | |
3075 | {/*.charged = */ FLAGS_charge_file_metadata | |
3076 | ? CacheEntryRoleOptions::Decision::kEnabled | |
3077 | : CacheEntryRoleOptions::Decision::kDisabled}}); | |
3078 | block_based_options.cache_usage_options.options_overrides.insert( | |
3079 | {CacheEntryRole::kBlobCache, | |
3080 | {/*.charged = */ FLAGS_charge_blob_cache | |
3081 | ? CacheEntryRoleOptions::Decision::kEnabled | |
3082 | : CacheEntryRoleOptions::Decision::kDisabled}}); | |
3083 | block_based_options.format_version = | |
3084 | static_cast<uint32_t>(FLAGS_format_version); | |
3085 | block_based_options.index_block_restart_interval = | |
3086 | static_cast<int32_t>(FLAGS_index_block_restart_interval); | |
3087 | block_based_options.filter_policy = filter_policy; | |
3088 | block_based_options.partition_filters = FLAGS_partition_filters; | |
3089 | block_based_options.optimize_filters_for_memory = | |
3090 | FLAGS_optimize_filters_for_memory; | |
3091 | block_based_options.detect_filter_construct_corruption = | |
3092 | FLAGS_detect_filter_construct_corruption; | |
3093 | block_based_options.index_type = | |
3094 | static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type); | |
3095 | block_based_options.data_block_index_type = | |
3096 | static_cast<BlockBasedTableOptions::DataBlockIndexType>( | |
3097 | FLAGS_data_block_index_type); | |
3098 | block_based_options.prepopulate_block_cache = | |
3099 | static_cast<BlockBasedTableOptions::PrepopulateBlockCache>( | |
3100 | FLAGS_prepopulate_block_cache); | |
3101 | block_based_options.initial_auto_readahead_size = | |
3102 | FLAGS_initial_auto_readahead_size; | |
3103 | block_based_options.max_auto_readahead_size = FLAGS_max_auto_readahead_size; | |
3104 | block_based_options.num_file_reads_for_auto_readahead = | |
3105 | FLAGS_num_file_reads_for_auto_readahead; | |
3106 | options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); | |
3107 | options.db_write_buffer_size = FLAGS_db_write_buffer_size; | |
3108 | options.write_buffer_size = FLAGS_write_buffer_size; | |
3109 | options.max_write_buffer_number = FLAGS_max_write_buffer_number; | |
3110 | options.min_write_buffer_number_to_merge = | |
3111 | FLAGS_min_write_buffer_number_to_merge; | |
3112 | options.max_write_buffer_number_to_maintain = | |
3113 | FLAGS_max_write_buffer_number_to_maintain; | |
3114 | options.max_write_buffer_size_to_maintain = | |
3115 | FLAGS_max_write_buffer_size_to_maintain; | |
3116 | options.memtable_prefix_bloom_size_ratio = | |
3117 | FLAGS_memtable_prefix_bloom_size_ratio; | |
3118 | options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering; | |
3119 | options.disable_auto_compactions = FLAGS_disable_auto_compactions; | |
3120 | options.max_background_compactions = FLAGS_max_background_compactions; | |
3121 | options.max_background_flushes = FLAGS_max_background_flushes; | |
3122 | options.compaction_style = | |
3123 | static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style); | |
3124 | options.compaction_pri = | |
3125 | static_cast<ROCKSDB_NAMESPACE::CompactionPri>(FLAGS_compaction_pri); | |
3126 | options.num_levels = FLAGS_num_levels; | |
3127 | if (FLAGS_prefix_size >= 0) { | |
3128 | options.prefix_extractor.reset(NewFixedPrefixTransform(FLAGS_prefix_size)); | |
3129 | } | |
3130 | options.max_open_files = FLAGS_open_files; | |
3131 | options.statistics = dbstats; | |
3132 | options.env = db_stress_env; | |
3133 | options.use_fsync = FLAGS_use_fsync; | |
3134 | options.compaction_readahead_size = FLAGS_compaction_readahead_size; | |
3135 | options.allow_mmap_reads = FLAGS_mmap_read; | |
3136 | options.allow_mmap_writes = FLAGS_mmap_write; | |
3137 | options.use_direct_reads = FLAGS_use_direct_reads; | |
3138 | options.use_direct_io_for_flush_and_compaction = | |
3139 | FLAGS_use_direct_io_for_flush_and_compaction; | |
3140 | options.recycle_log_file_num = | |
3141 | static_cast<size_t>(FLAGS_recycle_log_file_num); | |
3142 | options.target_file_size_base = FLAGS_target_file_size_base; | |
3143 | options.target_file_size_multiplier = FLAGS_target_file_size_multiplier; | |
3144 | options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base; | |
3145 | options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; | |
3146 | options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger; | |
3147 | options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; | |
3148 | options.level0_file_num_compaction_trigger = | |
3149 | FLAGS_level0_file_num_compaction_trigger; | |
3150 | options.compression = compression_type_e; | |
3151 | options.bottommost_compression = bottommost_compression_type_e; | |
3152 | options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes; | |
3153 | options.compression_opts.zstd_max_train_bytes = | |
3154 | FLAGS_compression_zstd_max_train_bytes; | |
3155 | options.compression_opts.parallel_threads = | |
3156 | FLAGS_compression_parallel_threads; | |
3157 | options.compression_opts.max_dict_buffer_bytes = | |
3158 | FLAGS_compression_max_dict_buffer_bytes; | |
3159 | if (ZSTD_FinalizeDictionarySupported()) { | |
3160 | options.compression_opts.use_zstd_dict_trainer = | |
3161 | FLAGS_compression_use_zstd_dict_trainer; | |
3162 | } else if (!FLAGS_compression_use_zstd_dict_trainer) { | |
3163 | fprintf( | |
3164 | stderr, | |
3165 | "WARNING: use_zstd_dict_trainer is false but zstd finalizeDictionary " | |
3166 | "cannot be used because ZSTD 1.4.5+ is not linked with the binary." | |
3167 | " zstd dictionary trainer will be used.\n"); | |
3168 | } | |
3169 | options.max_manifest_file_size = FLAGS_max_manifest_file_size; | |
3170 | options.inplace_update_support = FLAGS_in_place_update; | |
3171 | options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions); | |
3172 | options.allow_concurrent_memtable_write = | |
3173 | FLAGS_allow_concurrent_memtable_write; | |
3174 | options.experimental_mempurge_threshold = | |
3175 | FLAGS_experimental_mempurge_threshold; | |
3176 | options.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds; | |
3177 | options.stats_dump_period_sec = | |
3178 | static_cast<unsigned int>(FLAGS_stats_dump_period_sec); | |
3179 | options.ttl = FLAGS_compaction_ttl; | |
3180 | options.enable_pipelined_write = FLAGS_enable_pipelined_write; | |
3181 | options.enable_write_thread_adaptive_yield = | |
3182 | FLAGS_enable_write_thread_adaptive_yield; | |
3183 | options.compaction_options_universal.size_ratio = FLAGS_universal_size_ratio; | |
3184 | options.compaction_options_universal.min_merge_width = | |
3185 | FLAGS_universal_min_merge_width; | |
3186 | options.compaction_options_universal.max_merge_width = | |
3187 | FLAGS_universal_max_merge_width; | |
3188 | options.compaction_options_universal.max_size_amplification_percent = | |
3189 | FLAGS_universal_max_size_amplification_percent; | |
3190 | options.atomic_flush = FLAGS_atomic_flush; | |
3191 | options.manual_wal_flush = FLAGS_manual_wal_flush_one_in > 0 ? true : false; | |
3192 | options.avoid_unnecessary_blocking_io = FLAGS_avoid_unnecessary_blocking_io; | |
3193 | options.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest; | |
3194 | options.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery; | |
3195 | options.max_write_batch_group_size_bytes = | |
3196 | FLAGS_max_write_batch_group_size_bytes; | |
3197 | options.level_compaction_dynamic_level_bytes = | |
3198 | FLAGS_level_compaction_dynamic_level_bytes; | |
3199 | options.track_and_verify_wals_in_manifest = true; | |
3200 | options.verify_sst_unique_id_in_manifest = | |
3201 | FLAGS_verify_sst_unique_id_in_manifest; | |
3202 | options.memtable_protection_bytes_per_key = | |
3203 | FLAGS_memtable_protection_bytes_per_key; | |
3204 | ||
3205 | // Integrated BlobDB | |
3206 | options.enable_blob_files = FLAGS_enable_blob_files; | |
3207 | options.min_blob_size = FLAGS_min_blob_size; | |
3208 | options.blob_file_size = FLAGS_blob_file_size; | |
3209 | options.blob_compression_type = | |
3210 | StringToCompressionType(FLAGS_blob_compression_type.c_str()); | |
3211 | options.enable_blob_garbage_collection = FLAGS_enable_blob_garbage_collection; | |
3212 | options.blob_garbage_collection_age_cutoff = | |
3213 | FLAGS_blob_garbage_collection_age_cutoff; | |
3214 | options.blob_garbage_collection_force_threshold = | |
3215 | FLAGS_blob_garbage_collection_force_threshold; | |
3216 | options.blob_compaction_readahead_size = FLAGS_blob_compaction_readahead_size; | |
3217 | options.blob_file_starting_level = FLAGS_blob_file_starting_level; | |
3218 | ||
3219 | if (FLAGS_use_blob_cache) { | |
3220 | if (FLAGS_use_shared_block_and_blob_cache) { | |
3221 | options.blob_cache = cache; | |
3222 | } else { | |
3223 | if (FLAGS_blob_cache_size > 0) { | |
3224 | LRUCacheOptions co; | |
3225 | co.capacity = FLAGS_blob_cache_size; | |
3226 | co.num_shard_bits = FLAGS_blob_cache_numshardbits; | |
3227 | options.blob_cache = NewLRUCache(co); | |
3228 | } else { | |
3229 | fprintf(stderr, | |
3230 | "Unable to create a standalone blob cache if blob_cache_size " | |
3231 | "<= 0.\n"); | |
3232 | exit(1); | |
3233 | } | |
3234 | } | |
3235 | switch (FLAGS_prepopulate_blob_cache) { | |
3236 | case 0: | |
3237 | options.prepopulate_blob_cache = PrepopulateBlobCache::kDisable; | |
3238 | break; | |
3239 | case 1: | |
3240 | options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly; | |
3241 | break; | |
3242 | default: | |
3243 | fprintf(stderr, "Unknown prepopulate blob cache mode\n"); | |
3244 | exit(1); | |
3245 | } | |
3246 | } | |
3247 | ||
3248 | options.wal_compression = | |
3249 | StringToCompressionType(FLAGS_wal_compression.c_str()); | |
3250 | ||
3251 | if (FLAGS_enable_tiered_storage) { | |
3252 | options.bottommost_temperature = Temperature::kCold; | |
3253 | } | |
3254 | options.preclude_last_level_data_seconds = | |
3255 | FLAGS_preclude_last_level_data_seconds; | |
3256 | options.preserve_internal_time_seconds = FLAGS_preserve_internal_time_seconds; | |
3257 | ||
3258 | switch (FLAGS_rep_factory) { | |
3259 | case kSkipList: | |
3260 | // no need to do anything | |
3261 | break; | |
3262 | #ifndef ROCKSDB_LITE | |
3263 | case kHashSkipList: | |
3264 | options.memtable_factory.reset(NewHashSkipListRepFactory(10000)); | |
3265 | break; | |
3266 | case kVectorRep: | |
3267 | options.memtable_factory.reset(new VectorRepFactory()); | |
3268 | break; | |
3269 | #else | |
3270 | default: | |
3271 | fprintf(stderr, | |
3272 | "RocksdbLite only supports skip list mem table. Skip " | |
3273 | "--rep_factory\n"); | |
3274 | #endif // ROCKSDB_LITE | |
3275 | } | |
3276 | ||
3277 | if (FLAGS_use_full_merge_v1) { | |
3278 | options.merge_operator = MergeOperators::CreateDeprecatedPutOperator(); | |
3279 | } else { | |
3280 | options.merge_operator = MergeOperators::CreatePutOperator(); | |
3281 | } | |
3282 | ||
3283 | if (FLAGS_enable_compaction_filter) { | |
3284 | options.compaction_filter_factory = | |
3285 | std::make_shared<DbStressCompactionFilterFactory>(); | |
3286 | } | |
3287 | ||
3288 | options.best_efforts_recovery = FLAGS_best_efforts_recovery; | |
3289 | options.paranoid_file_checks = FLAGS_paranoid_file_checks; | |
3290 | options.fail_if_options_file_error = FLAGS_fail_if_options_file_error; | |
3291 | ||
3292 | if (FLAGS_user_timestamp_size > 0) { | |
3293 | CheckAndSetOptionsForUserTimestamp(options); | |
3294 | } | |
3295 | ||
3296 | options.allow_data_in_errors = FLAGS_allow_data_in_errors; | |
3297 | } | |
3298 | ||
3299 | void InitializeOptionsGeneral( | |
3300 | const std::shared_ptr<Cache>& cache, | |
3301 | const std::shared_ptr<Cache>& block_cache_compressed, | |
3302 | const std::shared_ptr<const FilterPolicy>& filter_policy, | |
3303 | Options& options) { | |
3304 | options.create_missing_column_families = true; | |
3305 | options.create_if_missing = true; | |
3306 | ||
3307 | if (!options.statistics) { | |
3308 | options.statistics = dbstats; | |
3309 | } | |
3310 | ||
3311 | if (options.env == Options().env) { | |
3312 | options.env = db_stress_env; | |
3313 | } | |
3314 | ||
3315 | assert(options.table_factory); | |
3316 | auto table_options = | |
3317 | options.table_factory->GetOptions<BlockBasedTableOptions>(); | |
3318 | if (table_options) { | |
3319 | if (FLAGS_cache_size > 0) { | |
3320 | table_options->block_cache = cache; | |
3321 | } | |
3322 | if (!table_options->block_cache_compressed && | |
3323 | FLAGS_compressed_cache_size > 0) { | |
3324 | table_options->block_cache_compressed = block_cache_compressed; | |
3325 | } | |
3326 | if (!table_options->filter_policy) { | |
3327 | table_options->filter_policy = filter_policy; | |
3328 | } | |
3329 | } | |
3330 | ||
3331 | // TODO: row_cache, thread-pool IO priority, CPU priority. | |
3332 | ||
3333 | if (!options.rate_limiter) { | |
3334 | if (FLAGS_rate_limiter_bytes_per_sec > 0) { | |
3335 | options.rate_limiter.reset(NewGenericRateLimiter( | |
3336 | FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */, | |
3337 | 10 /* fairness */, | |
3338 | FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly | |
3339 | : RateLimiter::Mode::kWritesOnly)); | |
3340 | } | |
3341 | } | |
3342 | ||
3343 | if (!options.file_checksum_gen_factory) { | |
3344 | options.file_checksum_gen_factory = | |
3345 | GetFileChecksumImpl(FLAGS_file_checksum_impl); | |
3346 | } | |
3347 | ||
3348 | if (FLAGS_sst_file_manager_bytes_per_sec > 0 || | |
3349 | FLAGS_sst_file_manager_bytes_per_truncate > 0) { | |
3350 | Status status; | |
3351 | options.sst_file_manager.reset(NewSstFileManager( | |
3352 | db_stress_env, options.info_log, "" /* trash_dir */, | |
3353 | static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec), | |
3354 | true /* delete_existing_trash */, &status, | |
3355 | 0.25 /* max_trash_db_ratio */, | |
3356 | FLAGS_sst_file_manager_bytes_per_truncate)); | |
3357 | if (!status.ok()) { | |
3358 | fprintf(stderr, "SstFileManager creation failed: %s\n", | |
3359 | status.ToString().c_str()); | |
3360 | exit(1); | |
3361 | } | |
3362 | } | |
3363 | ||
3364 | if (FLAGS_preserve_unverified_changes) { | |
3365 | if (!options.avoid_flush_during_recovery) { | |
3366 | fprintf(stderr, | |
3367 | "WARNING: flipping `avoid_flush_during_recovery` to true for " | |
3368 | "`preserve_unverified_changes` to keep all files\n"); | |
3369 | options.avoid_flush_during_recovery = true; | |
3370 | } | |
3371 | // Together with `avoid_flush_during_recovery == true`, this will prevent | |
3372 | // live files from becoming obsolete and deleted between `DB::Open()` and | |
3373 | // `DisableFileDeletions()` due to flush or compaction. We do not need to | |
3374 | // warn the user since we will reenable compaction soon. | |
3375 | options.disable_auto_compactions = true; | |
3376 | } | |
3377 | ||
3378 | options.table_properties_collector_factories.emplace_back( | |
3379 | std::make_shared<DbStressTablePropertiesCollectorFactory>()); | |
3380 | } | |
3381 | ||
f67539c2 TL |
3382 | } // namespace ROCKSDB_NAMESPACE |
3383 | #endif // GFLAGS |