]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_test_util.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_test_util.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11
12 #include "db/forward_iterator.h"
13 #include "rocksdb/convenience.h"
14 #include "rocksdb/env_encryption.h"
15 #include "rocksdb/utilities/object_registry.h"
16 #include "util/random.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
20 namespace {
21 int64_t MaybeCurrentTime(Env* env) {
22 int64_t time = 1337346000; // arbitrary fallback default
23 env->GetCurrentTime(&time).PermitUncheckedError();
24 return time;
25 }
26 } // namespace
27
28 // Special Env used to delay background operations
29
30 SpecialEnv::SpecialEnv(Env* base, bool time_elapse_only_sleep)
31 : EnvWrapper(base),
32 maybe_starting_time_(MaybeCurrentTime(base)),
33 rnd_(301),
34 sleep_counter_(this),
35 time_elapse_only_sleep_(time_elapse_only_sleep),
36 no_slowdown_(time_elapse_only_sleep) {
37 delay_sstable_sync_.store(false, std::memory_order_release);
38 drop_writes_.store(false, std::memory_order_release);
39 no_space_.store(false, std::memory_order_release);
40 non_writable_.store(false, std::memory_order_release);
41 count_random_reads_ = false;
42 count_sequential_reads_ = false;
43 manifest_sync_error_.store(false, std::memory_order_release);
44 manifest_write_error_.store(false, std::memory_order_release);
45 log_write_error_.store(false, std::memory_order_release);
46 random_file_open_counter_.store(0, std::memory_order_relaxed);
47 delete_count_.store(0, std::memory_order_relaxed);
48 num_open_wal_file_.store(0);
49 log_write_slowdown_ = 0;
50 bytes_written_ = 0;
51 sync_counter_ = 0;
52 non_writeable_rate_ = 0;
53 new_writable_count_ = 0;
54 non_writable_count_ = 0;
55 table_write_callback_ = nullptr;
56 }
57 DBTestBase::DBTestBase(const std::string path, bool env_do_fsync)
58 : mem_env_(nullptr), encrypted_env_(nullptr), option_config_(kDefault) {
59 Env* base_env = Env::Default();
60 #ifndef ROCKSDB_LITE
61 const char* test_env_uri = getenv("TEST_ENV_URI");
62 if (test_env_uri) {
63 Env* test_env = nullptr;
64 Status s = Env::LoadEnv(test_env_uri, &test_env, &env_guard_);
65 base_env = test_env;
66 EXPECT_OK(s);
67 EXPECT_NE(Env::Default(), base_env);
68 }
69 #endif // !ROCKSDB_LITE
70 EXPECT_NE(nullptr, base_env);
71 if (getenv("MEM_ENV")) {
72 mem_env_ = new MockEnv(base_env);
73 }
74 #ifndef ROCKSDB_LITE
75 if (getenv("ENCRYPTED_ENV")) {
76 std::shared_ptr<EncryptionProvider> provider;
77 Status s = EncryptionProvider::CreateFromString(
78 ConfigOptions(), std::string("test://") + getenv("ENCRYPTED_ENV"),
79 &provider);
80 encrypted_env_ = NewEncryptedEnv(mem_env_ ? mem_env_ : base_env, provider);
81 }
82 #endif // !ROCKSDB_LITE
83 env_ = new SpecialEnv(encrypted_env_ ? encrypted_env_
84 : (mem_env_ ? mem_env_ : base_env));
85 env_->SetBackgroundThreads(1, Env::LOW);
86 env_->SetBackgroundThreads(1, Env::HIGH);
87 env_->skip_fsync_ = !env_do_fsync;
88 dbname_ = test::PerThreadDBPath(env_, path);
89 alternative_wal_dir_ = dbname_ + "/wal";
90 alternative_db_log_dir_ = dbname_ + "/db_log_dir";
91 auto options = CurrentOptions();
92 options.env = env_;
93 auto delete_options = options;
94 delete_options.wal_dir = alternative_wal_dir_;
95 EXPECT_OK(DestroyDB(dbname_, delete_options));
96 // Destroy it for not alternative WAL dir is used.
97 EXPECT_OK(DestroyDB(dbname_, options));
98 db_ = nullptr;
99 Reopen(options);
100 Random::GetTLSInstance()->Reset(0xdeadbeef);
101 }
102
103 DBTestBase::~DBTestBase() {
104 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
105 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
106 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
107 Close();
108 Options options;
109 options.db_paths.emplace_back(dbname_, 0);
110 options.db_paths.emplace_back(dbname_ + "_2", 0);
111 options.db_paths.emplace_back(dbname_ + "_3", 0);
112 options.db_paths.emplace_back(dbname_ + "_4", 0);
113 options.env = env_;
114
115 if (getenv("KEEP_DB")) {
116 printf("DB is still at %s\n", dbname_.c_str());
117 } else {
118 EXPECT_OK(DestroyDB(dbname_, options));
119 }
120 delete env_;
121 }
122
123 bool DBTestBase::ShouldSkipOptions(int option_config, int skip_mask) {
124 #ifdef ROCKSDB_LITE
125 // These options are not supported in ROCKSDB_LITE
126 if (option_config == kHashSkipList ||
127 option_config == kPlainTableFirstBytePrefix ||
128 option_config == kPlainTableCappedPrefix ||
129 option_config == kPlainTableCappedPrefixNonMmap ||
130 option_config == kPlainTableAllBytesPrefix ||
131 option_config == kVectorRep || option_config == kHashLinkList ||
132 option_config == kUniversalCompaction ||
133 option_config == kUniversalCompactionMultiLevel ||
134 option_config == kUniversalSubcompactions ||
135 option_config == kFIFOCompaction ||
136 option_config == kConcurrentSkipList) {
137 return true;
138 }
139 #endif
140
141 if ((skip_mask & kSkipUniversalCompaction) &&
142 (option_config == kUniversalCompaction ||
143 option_config == kUniversalCompactionMultiLevel ||
144 option_config == kUniversalSubcompactions)) {
145 return true;
146 }
147 if ((skip_mask & kSkipMergePut) && option_config == kMergePut) {
148 return true;
149 }
150 if ((skip_mask & kSkipNoSeekToLast) &&
151 (option_config == kHashLinkList || option_config == kHashSkipList)) {
152 return true;
153 }
154 if ((skip_mask & kSkipPlainTable) &&
155 (option_config == kPlainTableAllBytesPrefix ||
156 option_config == kPlainTableFirstBytePrefix ||
157 option_config == kPlainTableCappedPrefix ||
158 option_config == kPlainTableCappedPrefixNonMmap)) {
159 return true;
160 }
161 if ((skip_mask & kSkipHashIndex) &&
162 (option_config == kBlockBasedTableWithPrefixHashIndex ||
163 option_config == kBlockBasedTableWithWholeKeyHashIndex)) {
164 return true;
165 }
166 if ((skip_mask & kSkipFIFOCompaction) && option_config == kFIFOCompaction) {
167 return true;
168 }
169 if ((skip_mask & kSkipMmapReads) && option_config == kWalDirAndMmapReads) {
170 return true;
171 }
172 return false;
173 }
174
175 // Switch to a fresh database with the next option configuration to
176 // test. Return false if there are no more configurations to test.
177 bool DBTestBase::ChangeOptions(int skip_mask) {
178 for (option_config_++; option_config_ < kEnd; option_config_++) {
179 if (ShouldSkipOptions(option_config_, skip_mask)) {
180 continue;
181 }
182 break;
183 }
184
185 if (option_config_ >= kEnd) {
186 Destroy(last_options_);
187 return false;
188 } else {
189 auto options = CurrentOptions();
190 options.create_if_missing = true;
191 DestroyAndReopen(options);
192 return true;
193 }
194 }
195
196 // Switch between different compaction styles.
197 bool DBTestBase::ChangeCompactOptions() {
198 if (option_config_ == kDefault) {
199 option_config_ = kUniversalCompaction;
200 Destroy(last_options_);
201 auto options = CurrentOptions();
202 options.create_if_missing = true;
203 TryReopen(options);
204 return true;
205 } else if (option_config_ == kUniversalCompaction) {
206 option_config_ = kUniversalCompactionMultiLevel;
207 Destroy(last_options_);
208 auto options = CurrentOptions();
209 options.create_if_missing = true;
210 TryReopen(options);
211 return true;
212 } else if (option_config_ == kUniversalCompactionMultiLevel) {
213 option_config_ = kLevelSubcompactions;
214 Destroy(last_options_);
215 auto options = CurrentOptions();
216 assert(options.max_subcompactions > 1);
217 TryReopen(options);
218 return true;
219 } else if (option_config_ == kLevelSubcompactions) {
220 option_config_ = kUniversalSubcompactions;
221 Destroy(last_options_);
222 auto options = CurrentOptions();
223 assert(options.max_subcompactions > 1);
224 TryReopen(options);
225 return true;
226 } else {
227 return false;
228 }
229 }
230
231 // Switch between different WAL settings
232 bool DBTestBase::ChangeWalOptions() {
233 if (option_config_ == kDefault) {
234 option_config_ = kDBLogDir;
235 Destroy(last_options_);
236 auto options = CurrentOptions();
237 Destroy(options);
238 options.create_if_missing = true;
239 TryReopen(options);
240 return true;
241 } else if (option_config_ == kDBLogDir) {
242 option_config_ = kWalDirAndMmapReads;
243 Destroy(last_options_);
244 auto options = CurrentOptions();
245 Destroy(options);
246 options.create_if_missing = true;
247 TryReopen(options);
248 return true;
249 } else if (option_config_ == kWalDirAndMmapReads) {
250 option_config_ = kRecycleLogFiles;
251 Destroy(last_options_);
252 auto options = CurrentOptions();
253 Destroy(options);
254 TryReopen(options);
255 return true;
256 } else {
257 return false;
258 }
259 }
260
261 // Switch between different filter policy
262 // Jump from kDefault to kFilter to kFullFilter
263 bool DBTestBase::ChangeFilterOptions() {
264 if (option_config_ == kDefault) {
265 option_config_ = kFilter;
266 } else if (option_config_ == kFilter) {
267 option_config_ = kFullFilterWithNewTableReaderForCompactions;
268 } else if (option_config_ == kFullFilterWithNewTableReaderForCompactions) {
269 option_config_ = kPartitionedFilterWithNewTableReaderForCompactions;
270 } else {
271 return false;
272 }
273 Destroy(last_options_);
274
275 auto options = CurrentOptions();
276 options.create_if_missing = true;
277 TryReopen(options);
278 return true;
279 }
280
281 // Switch between different DB options for file ingestion tests.
282 bool DBTestBase::ChangeOptionsForFileIngestionTest() {
283 if (option_config_ == kDefault) {
284 option_config_ = kUniversalCompaction;
285 Destroy(last_options_);
286 auto options = CurrentOptions();
287 options.create_if_missing = true;
288 TryReopen(options);
289 return true;
290 } else if (option_config_ == kUniversalCompaction) {
291 option_config_ = kUniversalCompactionMultiLevel;
292 Destroy(last_options_);
293 auto options = CurrentOptions();
294 options.create_if_missing = true;
295 TryReopen(options);
296 return true;
297 } else if (option_config_ == kUniversalCompactionMultiLevel) {
298 option_config_ = kLevelSubcompactions;
299 Destroy(last_options_);
300 auto options = CurrentOptions();
301 assert(options.max_subcompactions > 1);
302 TryReopen(options);
303 return true;
304 } else if (option_config_ == kLevelSubcompactions) {
305 option_config_ = kUniversalSubcompactions;
306 Destroy(last_options_);
307 auto options = CurrentOptions();
308 assert(options.max_subcompactions > 1);
309 TryReopen(options);
310 return true;
311 } else if (option_config_ == kUniversalSubcompactions) {
312 option_config_ = kDirectIO;
313 Destroy(last_options_);
314 auto options = CurrentOptions();
315 TryReopen(options);
316 return true;
317 } else {
318 return false;
319 }
320 }
321
322 // Return the current option configuration.
323 Options DBTestBase::CurrentOptions(
324 const anon::OptionsOverride& options_override) const {
325 return GetOptions(option_config_, GetDefaultOptions(), options_override);
326 }
327
328 Options DBTestBase::CurrentOptions(
329 const Options& default_options,
330 const anon::OptionsOverride& options_override) const {
331 return GetOptions(option_config_, default_options, options_override);
332 }
333
334 Options DBTestBase::GetDefaultOptions() const {
335 Options options;
336 options.write_buffer_size = 4090 * 4096;
337 options.target_file_size_base = 2 * 1024 * 1024;
338 options.max_bytes_for_level_base = 10 * 1024 * 1024;
339 options.max_open_files = 5000;
340 options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
341 options.compaction_pri = CompactionPri::kByCompensatedSize;
342 options.env = env_;
343 if (!env_->skip_fsync_) {
344 options.track_and_verify_wals_in_manifest = true;
345 }
346 return options;
347 }
348
349 Options DBTestBase::GetOptions(
350 int option_config, const Options& default_options,
351 const anon::OptionsOverride& options_override) const {
352 // this redundant copy is to minimize code change w/o having lint error.
353 Options options = default_options;
354 BlockBasedTableOptions table_options;
355 bool set_block_based_table_factory = true;
356 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \
357 !defined(OS_AIX)
358 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
359 "NewRandomAccessFile:O_DIRECT");
360 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
361 "NewWritableFile:O_DIRECT");
362 #endif
363
364 bool can_allow_mmap = IsMemoryMappedAccessSupported();
365 switch (option_config) {
366 #ifndef ROCKSDB_LITE
367 case kHashSkipList:
368 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
369 options.memtable_factory.reset(NewHashSkipListRepFactory(16));
370 options.allow_concurrent_memtable_write = false;
371 options.unordered_write = false;
372 break;
373 case kPlainTableFirstBytePrefix:
374 options.table_factory.reset(NewPlainTableFactory());
375 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
376 options.allow_mmap_reads = can_allow_mmap;
377 options.max_sequential_skip_in_iterations = 999999;
378 set_block_based_table_factory = false;
379 break;
380 case kPlainTableCappedPrefix:
381 options.table_factory.reset(NewPlainTableFactory());
382 options.prefix_extractor.reset(NewCappedPrefixTransform(8));
383 options.allow_mmap_reads = can_allow_mmap;
384 options.max_sequential_skip_in_iterations = 999999;
385 set_block_based_table_factory = false;
386 break;
387 case kPlainTableCappedPrefixNonMmap:
388 options.table_factory.reset(NewPlainTableFactory());
389 options.prefix_extractor.reset(NewCappedPrefixTransform(8));
390 options.allow_mmap_reads = false;
391 options.max_sequential_skip_in_iterations = 999999;
392 set_block_based_table_factory = false;
393 break;
394 case kPlainTableAllBytesPrefix:
395 options.table_factory.reset(NewPlainTableFactory());
396 options.prefix_extractor.reset(NewNoopTransform());
397 options.allow_mmap_reads = can_allow_mmap;
398 options.max_sequential_skip_in_iterations = 999999;
399 set_block_based_table_factory = false;
400 break;
401 case kVectorRep:
402 options.memtable_factory.reset(new VectorRepFactory(100));
403 options.allow_concurrent_memtable_write = false;
404 options.unordered_write = false;
405 break;
406 case kHashLinkList:
407 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
408 options.memtable_factory.reset(
409 NewHashLinkListRepFactory(4, 0, 3, true, 4));
410 options.allow_concurrent_memtable_write = false;
411 options.unordered_write = false;
412 break;
413 case kDirectIO: {
414 options.use_direct_reads = true;
415 options.use_direct_io_for_flush_and_compaction = true;
416 options.compaction_readahead_size = 2 * 1024 * 1024;
417 SetupSyncPointsToMockDirectIO();
418 break;
419 }
420 #endif // ROCKSDB_LITE
421 case kMergePut:
422 options.merge_operator = MergeOperators::CreatePutOperator();
423 break;
424 case kFilter:
425 table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
426 break;
427 case kFullFilterWithNewTableReaderForCompactions:
428 table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
429 options.new_table_reader_for_compaction_inputs = true;
430 options.compaction_readahead_size = 10 * 1024 * 1024;
431 break;
432 case kPartitionedFilterWithNewTableReaderForCompactions:
433 table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
434 table_options.partition_filters = true;
435 table_options.index_type =
436 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
437 options.new_table_reader_for_compaction_inputs = true;
438 options.compaction_readahead_size = 10 * 1024 * 1024;
439 break;
440 case kUncompressed:
441 options.compression = kNoCompression;
442 break;
443 case kNumLevel_3:
444 options.num_levels = 3;
445 break;
446 case kDBLogDir:
447 options.db_log_dir = alternative_db_log_dir_;
448 break;
449 case kWalDirAndMmapReads:
450 options.wal_dir = alternative_wal_dir_;
451 // mmap reads should be orthogonal to WalDir setting, so we piggyback to
452 // this option config to test mmap reads as well
453 options.allow_mmap_reads = can_allow_mmap;
454 break;
455 case kManifestFileSize:
456 options.max_manifest_file_size = 50; // 50 bytes
457 break;
458 case kPerfOptions:
459 options.soft_rate_limit = 2.0;
460 options.delayed_write_rate = 8 * 1024 * 1024;
461 options.report_bg_io_stats = true;
462 // TODO(3.13) -- test more options
463 break;
464 case kUniversalCompaction:
465 options.compaction_style = kCompactionStyleUniversal;
466 options.num_levels = 1;
467 break;
468 case kUniversalCompactionMultiLevel:
469 options.compaction_style = kCompactionStyleUniversal;
470 options.num_levels = 8;
471 break;
472 case kCompressedBlockCache:
473 options.allow_mmap_writes = can_allow_mmap;
474 table_options.block_cache_compressed = NewLRUCache(8 * 1024 * 1024);
475 break;
476 case kInfiniteMaxOpenFiles:
477 options.max_open_files = -1;
478 break;
479 case kxxHashChecksum: {
480 table_options.checksum = kxxHash;
481 break;
482 }
483 case kxxHash64Checksum: {
484 table_options.checksum = kxxHash64;
485 break;
486 }
487 case kFIFOCompaction: {
488 options.compaction_style = kCompactionStyleFIFO;
489 break;
490 }
491 case kBlockBasedTableWithPrefixHashIndex: {
492 table_options.index_type = BlockBasedTableOptions::kHashSearch;
493 options.prefix_extractor.reset(NewFixedPrefixTransform(1));
494 break;
495 }
496 case kBlockBasedTableWithWholeKeyHashIndex: {
497 table_options.index_type = BlockBasedTableOptions::kHashSearch;
498 options.prefix_extractor.reset(NewNoopTransform());
499 break;
500 }
501 case kBlockBasedTableWithPartitionedIndex: {
502 table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
503 options.prefix_extractor.reset(NewNoopTransform());
504 break;
505 }
506 case kBlockBasedTableWithPartitionedIndexFormat4: {
507 table_options.format_version = 4;
508 // Format 4 changes the binary index format. Since partitioned index is a
509 // super-set of simple indexes, we are also using kTwoLevelIndexSearch to
510 // test this format.
511 table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
512 // The top-level index in partition filters are also affected by format 4.
513 table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
514 table_options.partition_filters = true;
515 table_options.index_block_restart_interval = 8;
516 break;
517 }
518 case kBlockBasedTableWithIndexRestartInterval: {
519 table_options.index_block_restart_interval = 8;
520 break;
521 }
522 case kOptimizeFiltersForHits: {
523 options.optimize_filters_for_hits = true;
524 set_block_based_table_factory = true;
525 break;
526 }
527 case kRowCache: {
528 options.row_cache = NewLRUCache(1024 * 1024);
529 break;
530 }
531 case kRecycleLogFiles: {
532 options.recycle_log_file_num = 2;
533 break;
534 }
535 case kLevelSubcompactions: {
536 options.max_subcompactions = 4;
537 break;
538 }
539 case kUniversalSubcompactions: {
540 options.compaction_style = kCompactionStyleUniversal;
541 options.num_levels = 8;
542 options.max_subcompactions = 4;
543 break;
544 }
545 case kConcurrentSkipList: {
546 options.allow_concurrent_memtable_write = true;
547 options.enable_write_thread_adaptive_yield = true;
548 break;
549 }
550 case kPipelinedWrite: {
551 options.enable_pipelined_write = true;
552 break;
553 }
554 case kConcurrentWALWrites: {
555 // This options optimize 2PC commit path
556 options.two_write_queues = true;
557 options.manual_wal_flush = true;
558 break;
559 }
560 case kUnorderedWrite: {
561 options.allow_concurrent_memtable_write = false;
562 options.unordered_write = false;
563 break;
564 }
565
566 default:
567 break;
568 }
569
570 if (options_override.filter_policy) {
571 table_options.filter_policy = options_override.filter_policy;
572 table_options.partition_filters = options_override.partition_filters;
573 table_options.metadata_block_size = options_override.metadata_block_size;
574 }
575 if (set_block_based_table_factory) {
576 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
577 }
578 options.env = env_;
579 options.create_if_missing = true;
580 options.fail_if_options_file_error = true;
581 return options;
582 }
583
584 void DBTestBase::CreateColumnFamilies(const std::vector<std::string>& cfs,
585 const Options& options) {
586 ColumnFamilyOptions cf_opts(options);
587 size_t cfi = handles_.size();
588 handles_.resize(cfi + cfs.size());
589 for (auto cf : cfs) {
590 Status s = db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]);
591 ASSERT_OK(s);
592 }
593 }
594
595 void DBTestBase::CreateAndReopenWithCF(const std::vector<std::string>& cfs,
596 const Options& options) {
597 CreateColumnFamilies(cfs, options);
598 std::vector<std::string> cfs_plus_default = cfs;
599 cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
600 ReopenWithColumnFamilies(cfs_plus_default, options);
601 }
602
603 void DBTestBase::ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
604 const std::vector<Options>& options) {
605 ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
606 }
607
608 void DBTestBase::ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
609 const Options& options) {
610 ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
611 }
612
613 void DBTestBase::SetTimeElapseOnlySleepOnReopen(DBOptions* options) {
614 time_elapse_only_sleep_on_reopen_ = true;
615
616 // Need to disable stats dumping and persisting which also use
617 // RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
618 // With time_elapse_only_sleep_, this can hang on some platforms (MacOS)
619 // because (a) on some platforms, pthread_cond_timedwait does not appear
620 // to release the lock for other threads to operate if the deadline time
621 // is already passed, and (b) TimedWait calls are currently a bad abstraction
622 // because the deadline parameter is usually computed from Env time,
623 // but is interpreted in real clock time.
624 options->stats_dump_period_sec = 0;
625 options->stats_persist_period_sec = 0;
626 }
627
628 void DBTestBase::MaybeInstallTimeElapseOnlySleep(const DBOptions& options) {
629 if (time_elapse_only_sleep_on_reopen_) {
630 assert(options.env == env_ ||
631 static_cast_with_check<CompositeEnvWrapper>(options.env)
632 ->env_target() == env_);
633 assert(options.stats_dump_period_sec == 0);
634 assert(options.stats_persist_period_sec == 0);
635 // We cannot set these before destroying the last DB because they might
636 // cause a deadlock or similar without the appropriate options set in
637 // the DB.
638 env_->time_elapse_only_sleep_ = true;
639 env_->no_slowdown_ = true;
640 } else {
641 // Going back in same test run is not yet supported, so no
642 // reset in this case.
643 }
644 }
645
646 Status DBTestBase::TryReopenWithColumnFamilies(
647 const std::vector<std::string>& cfs, const std::vector<Options>& options) {
648 Close();
649 EXPECT_EQ(cfs.size(), options.size());
650 std::vector<ColumnFamilyDescriptor> column_families;
651 for (size_t i = 0; i < cfs.size(); ++i) {
652 column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
653 }
654 DBOptions db_opts = DBOptions(options[0]);
655 last_options_ = options[0];
656 MaybeInstallTimeElapseOnlySleep(db_opts);
657 return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
658 }
659
660 Status DBTestBase::TryReopenWithColumnFamilies(
661 const std::vector<std::string>& cfs, const Options& options) {
662 Close();
663 std::vector<Options> v_opts(cfs.size(), options);
664 return TryReopenWithColumnFamilies(cfs, v_opts);
665 }
666
667 void DBTestBase::Reopen(const Options& options) {
668 ASSERT_OK(TryReopen(options));
669 }
670
671 void DBTestBase::Close() {
672 for (auto h : handles_) {
673 EXPECT_OK(db_->DestroyColumnFamilyHandle(h));
674 }
675 handles_.clear();
676 delete db_;
677 db_ = nullptr;
678 }
679
680 void DBTestBase::DestroyAndReopen(const Options& options) {
681 // Destroy using last options
682 Destroy(last_options_);
683 ASSERT_OK(TryReopen(options));
684 }
685
686 void DBTestBase::Destroy(const Options& options, bool delete_cf_paths) {
687 std::vector<ColumnFamilyDescriptor> column_families;
688 if (delete_cf_paths) {
689 for (size_t i = 0; i < handles_.size(); ++i) {
690 ColumnFamilyDescriptor cfdescriptor;
691 // GetDescriptor is not implemented for ROCKSDB_LITE
692 handles_[i]->GetDescriptor(&cfdescriptor).PermitUncheckedError();
693 column_families.push_back(cfdescriptor);
694 }
695 }
696 Close();
697 ASSERT_OK(DestroyDB(dbname_, options, column_families));
698 }
699
700 Status DBTestBase::ReadOnlyReopen(const Options& options) {
701 MaybeInstallTimeElapseOnlySleep(options);
702 return DB::OpenForReadOnly(options, dbname_, &db_);
703 }
704
705 Status DBTestBase::TryReopen(const Options& options) {
706 Close();
707 last_options_.table_factory.reset();
708 // Note: operator= is an unsafe approach here since it destructs
709 // std::shared_ptr in the same order of their creation, in contrast to
710 // destructors which destructs them in the opposite order of creation. One
711 // particular problem is that the cache destructor might invoke callback
712 // functions that use Option members such as statistics. To work around this
713 // problem, we manually call destructor of table_factory which eventually
714 // clears the block cache.
715 last_options_ = options;
716 MaybeInstallTimeElapseOnlySleep(options);
717 return DB::Open(options, dbname_, &db_);
718 }
719
720 bool DBTestBase::IsDirectIOSupported() {
721 return test::IsDirectIOSupported(env_, dbname_);
722 }
723
724 bool DBTestBase::IsMemoryMappedAccessSupported() const {
725 return (!encrypted_env_);
726 }
727
728 Status DBTestBase::Flush(int cf) {
729 if (cf == 0) {
730 return db_->Flush(FlushOptions());
731 } else {
732 return db_->Flush(FlushOptions(), handles_[cf]);
733 }
734 }
735
736 Status DBTestBase::Flush(const std::vector<int>& cf_ids) {
737 std::vector<ColumnFamilyHandle*> cfhs;
738 std::for_each(cf_ids.begin(), cf_ids.end(),
739 [&cfhs, this](int id) { cfhs.emplace_back(handles_[id]); });
740 return db_->Flush(FlushOptions(), cfhs);
741 }
742
743 Status DBTestBase::Put(const Slice& k, const Slice& v, WriteOptions wo) {
744 if (kMergePut == option_config_) {
745 return db_->Merge(wo, k, v);
746 } else {
747 return db_->Put(wo, k, v);
748 }
749 }
750
751 Status DBTestBase::Put(int cf, const Slice& k, const Slice& v,
752 WriteOptions wo) {
753 if (kMergePut == option_config_) {
754 return db_->Merge(wo, handles_[cf], k, v);
755 } else {
756 return db_->Put(wo, handles_[cf], k, v);
757 }
758 }
759
760 Status DBTestBase::Merge(const Slice& k, const Slice& v, WriteOptions wo) {
761 return db_->Merge(wo, k, v);
762 }
763
764 Status DBTestBase::Merge(int cf, const Slice& k, const Slice& v,
765 WriteOptions wo) {
766 return db_->Merge(wo, handles_[cf], k, v);
767 }
768
769 Status DBTestBase::Delete(const std::string& k) {
770 return db_->Delete(WriteOptions(), k);
771 }
772
773 Status DBTestBase::Delete(int cf, const std::string& k) {
774 return db_->Delete(WriteOptions(), handles_[cf], k);
775 }
776
777 Status DBTestBase::SingleDelete(const std::string& k) {
778 return db_->SingleDelete(WriteOptions(), k);
779 }
780
781 Status DBTestBase::SingleDelete(int cf, const std::string& k) {
782 return db_->SingleDelete(WriteOptions(), handles_[cf], k);
783 }
784
785 bool DBTestBase::SetPreserveDeletesSequenceNumber(SequenceNumber sn) {
786 return db_->SetPreserveDeletesSequenceNumber(sn);
787 }
788
789 std::string DBTestBase::Get(const std::string& k, const Snapshot* snapshot) {
790 ReadOptions options;
791 options.verify_checksums = true;
792 options.snapshot = snapshot;
793 std::string result;
794 Status s = db_->Get(options, k, &result);
795 if (s.IsNotFound()) {
796 result = "NOT_FOUND";
797 } else if (!s.ok()) {
798 result = s.ToString();
799 }
800 return result;
801 }
802
803 std::string DBTestBase::Get(int cf, const std::string& k,
804 const Snapshot* snapshot) {
805 ReadOptions options;
806 options.verify_checksums = true;
807 options.snapshot = snapshot;
808 std::string result;
809 Status s = db_->Get(options, handles_[cf], k, &result);
810 if (s.IsNotFound()) {
811 result = "NOT_FOUND";
812 } else if (!s.ok()) {
813 result = s.ToString();
814 }
815 return result;
816 }
817
818 std::vector<std::string> DBTestBase::MultiGet(std::vector<int> cfs,
819 const std::vector<std::string>& k,
820 const Snapshot* snapshot,
821 const bool batched) {
822 ReadOptions options;
823 options.verify_checksums = true;
824 options.snapshot = snapshot;
825 std::vector<ColumnFamilyHandle*> handles;
826 std::vector<Slice> keys;
827 std::vector<std::string> result;
828
829 for (unsigned int i = 0; i < cfs.size(); ++i) {
830 handles.push_back(handles_[cfs[i]]);
831 keys.push_back(k[i]);
832 }
833 std::vector<Status> s;
834 if (!batched) {
835 s = db_->MultiGet(options, handles, keys, &result);
836 for (unsigned int i = 0; i < s.size(); ++i) {
837 if (s[i].IsNotFound()) {
838 result[i] = "NOT_FOUND";
839 } else if (!s[i].ok()) {
840 result[i] = s[i].ToString();
841 }
842 }
843 } else {
844 std::vector<PinnableSlice> pin_values(cfs.size());
845 result.resize(cfs.size());
846 s.resize(cfs.size());
847 db_->MultiGet(options, cfs.size(), handles.data(), keys.data(),
848 pin_values.data(), s.data());
849 for (unsigned int i = 0; i < s.size(); ++i) {
850 if (s[i].IsNotFound()) {
851 result[i] = "NOT_FOUND";
852 } else if (!s[i].ok()) {
853 result[i] = s[i].ToString();
854 } else {
855 result[i].assign(pin_values[i].data(), pin_values[i].size());
856 }
857 }
858 }
859 return result;
860 }
861
862 std::vector<std::string> DBTestBase::MultiGet(const std::vector<std::string>& k,
863 const Snapshot* snapshot) {
864 ReadOptions options;
865 options.verify_checksums = true;
866 options.snapshot = snapshot;
867 std::vector<Slice> keys;
868 std::vector<std::string> result;
869 std::vector<Status> statuses(k.size());
870 std::vector<PinnableSlice> pin_values(k.size());
871
872 for (unsigned int i = 0; i < k.size(); ++i) {
873 keys.push_back(k[i]);
874 }
875 db_->MultiGet(options, dbfull()->DefaultColumnFamily(), keys.size(),
876 keys.data(), pin_values.data(), statuses.data());
877 result.resize(k.size());
878 for (auto iter = result.begin(); iter != result.end(); ++iter) {
879 iter->assign(pin_values[iter - result.begin()].data(),
880 pin_values[iter - result.begin()].size());
881 }
882 for (unsigned int i = 0; i < statuses.size(); ++i) {
883 if (statuses[i].IsNotFound()) {
884 result[i] = "NOT_FOUND";
885 }
886 }
887 return result;
888 }
889
890 Status DBTestBase::Get(const std::string& k, PinnableSlice* v) {
891 ReadOptions options;
892 options.verify_checksums = true;
893 Status s = dbfull()->Get(options, dbfull()->DefaultColumnFamily(), k, v);
894 return s;
895 }
896
897 uint64_t DBTestBase::GetNumSnapshots() {
898 uint64_t int_num;
899 EXPECT_TRUE(dbfull()->GetIntProperty("rocksdb.num-snapshots", &int_num));
900 return int_num;
901 }
902
903 uint64_t DBTestBase::GetTimeOldestSnapshots() {
904 uint64_t int_num;
905 EXPECT_TRUE(
906 dbfull()->GetIntProperty("rocksdb.oldest-snapshot-time", &int_num));
907 return int_num;
908 }
909
910 uint64_t DBTestBase::GetSequenceOldestSnapshots() {
911 uint64_t int_num;
912 EXPECT_TRUE(
913 dbfull()->GetIntProperty("rocksdb.oldest-snapshot-sequence", &int_num));
914 return int_num;
915 }
916
917 // Return a string that contains all key,value pairs in order,
918 // formatted like "(k1->v1)(k2->v2)".
919 std::string DBTestBase::Contents(int cf) {
920 std::vector<std::string> forward;
921 std::string result;
922 Iterator* iter = (cf == 0) ? db_->NewIterator(ReadOptions())
923 : db_->NewIterator(ReadOptions(), handles_[cf]);
924 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
925 std::string s = IterStatus(iter);
926 result.push_back('(');
927 result.append(s);
928 result.push_back(')');
929 forward.push_back(s);
930 }
931
932 // Check reverse iteration results are the reverse of forward results
933 unsigned int matched = 0;
934 for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
935 EXPECT_LT(matched, forward.size());
936 EXPECT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]);
937 matched++;
938 }
939 EXPECT_EQ(matched, forward.size());
940
941 delete iter;
942 return result;
943 }
944
945 std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
946 Arena arena;
947 auto options = CurrentOptions();
948 InternalKeyComparator icmp(options.comparator);
949 ReadRangeDelAggregator range_del_agg(&icmp,
950 kMaxSequenceNumber /* upper_bound */);
951 ReadOptions read_options;
952 ScopedArenaIterator iter;
953 if (cf == 0) {
954 iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
955 kMaxSequenceNumber));
956 } else {
957 iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
958 kMaxSequenceNumber, handles_[cf]));
959 }
960 InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
961 iter->Seek(target.Encode());
962 std::string result;
963 if (!iter->status().ok()) {
964 result = iter->status().ToString();
965 } else {
966 result = "[ ";
967 bool first = true;
968 while (iter->Valid()) {
969 ParsedInternalKey ikey(Slice(), 0, kTypeValue);
970 if (ParseInternalKey(iter->key(), &ikey, true /* log_err_key */) !=
971 Status::OK()) {
972 result += "CORRUPTED";
973 } else {
974 if (!last_options_.comparator->Equal(ikey.user_key, user_key)) {
975 break;
976 }
977 if (!first) {
978 result += ", ";
979 }
980 first = false;
981 switch (ikey.type) {
982 case kTypeValue:
983 result += iter->value().ToString();
984 break;
985 case kTypeMerge:
986 // keep it the same as kTypeValue for testing kMergePut
987 result += iter->value().ToString();
988 break;
989 case kTypeDeletion:
990 result += "DEL";
991 break;
992 case kTypeSingleDeletion:
993 result += "SDEL";
994 break;
995 default:
996 assert(false);
997 break;
998 }
999 }
1000 iter->Next();
1001 }
1002 if (!first) {
1003 result += " ";
1004 }
1005 result += "]";
1006 }
1007 return result;
1008 }
1009
1010 #ifndef ROCKSDB_LITE
1011 int DBTestBase::NumSortedRuns(int cf) {
1012 ColumnFamilyMetaData cf_meta;
1013 if (cf == 0) {
1014 db_->GetColumnFamilyMetaData(&cf_meta);
1015 } else {
1016 db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
1017 }
1018 int num_sr = static_cast<int>(cf_meta.levels[0].files.size());
1019 for (size_t i = 1U; i < cf_meta.levels.size(); i++) {
1020 if (cf_meta.levels[i].files.size() > 0) {
1021 num_sr++;
1022 }
1023 }
1024 return num_sr;
1025 }
1026
1027 uint64_t DBTestBase::TotalSize(int cf) {
1028 ColumnFamilyMetaData cf_meta;
1029 if (cf == 0) {
1030 db_->GetColumnFamilyMetaData(&cf_meta);
1031 } else {
1032 db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
1033 }
1034 return cf_meta.size;
1035 }
1036
1037 uint64_t DBTestBase::SizeAtLevel(int level) {
1038 std::vector<LiveFileMetaData> metadata;
1039 db_->GetLiveFilesMetaData(&metadata);
1040 uint64_t sum = 0;
1041 for (const auto& m : metadata) {
1042 if (m.level == level) {
1043 sum += m.size;
1044 }
1045 }
1046 return sum;
1047 }
1048
1049 size_t DBTestBase::TotalLiveFiles(int cf) {
1050 ColumnFamilyMetaData cf_meta;
1051 if (cf == 0) {
1052 db_->GetColumnFamilyMetaData(&cf_meta);
1053 } else {
1054 db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
1055 }
1056 size_t num_files = 0;
1057 for (auto& level : cf_meta.levels) {
1058 num_files += level.files.size();
1059 }
1060 return num_files;
1061 }
1062
1063 size_t DBTestBase::CountLiveFiles() {
1064 std::vector<LiveFileMetaData> metadata;
1065 db_->GetLiveFilesMetaData(&metadata);
1066 return metadata.size();
1067 }
1068
1069 int DBTestBase::NumTableFilesAtLevel(int level, int cf) {
1070 std::string property;
1071 if (cf == 0) {
1072 // default cfd
1073 EXPECT_TRUE(db_->GetProperty(
1074 "rocksdb.num-files-at-level" + NumberToString(level), &property));
1075 } else {
1076 EXPECT_TRUE(db_->GetProperty(
1077 handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level),
1078 &property));
1079 }
1080 return atoi(property.c_str());
1081 }
1082
1083 double DBTestBase::CompressionRatioAtLevel(int level, int cf) {
1084 std::string property;
1085 if (cf == 0) {
1086 // default cfd
1087 EXPECT_TRUE(db_->GetProperty(
1088 "rocksdb.compression-ratio-at-level" + NumberToString(level),
1089 &property));
1090 } else {
1091 EXPECT_TRUE(db_->GetProperty(
1092 handles_[cf],
1093 "rocksdb.compression-ratio-at-level" + NumberToString(level),
1094 &property));
1095 }
1096 return std::stod(property);
1097 }
1098
1099 int DBTestBase::TotalTableFiles(int cf, int levels) {
1100 if (levels == -1) {
1101 levels = (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
1102 }
1103 int result = 0;
1104 for (int level = 0; level < levels; level++) {
1105 result += NumTableFilesAtLevel(level, cf);
1106 }
1107 return result;
1108 }
1109
1110 // Return spread of files per level
1111 std::string DBTestBase::FilesPerLevel(int cf) {
1112 int num_levels =
1113 (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
1114 std::string result;
1115 size_t last_non_zero_offset = 0;
1116 for (int level = 0; level < num_levels; level++) {
1117 int f = NumTableFilesAtLevel(level, cf);
1118 char buf[100];
1119 snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
1120 result += buf;
1121 if (f > 0) {
1122 last_non_zero_offset = result.size();
1123 }
1124 }
1125 result.resize(last_non_zero_offset);
1126 return result;
1127 }
1128 #endif // !ROCKSDB_LITE
1129
1130 size_t DBTestBase::CountFiles() {
1131 std::vector<std::string> files;
1132 env_->GetChildren(dbname_, &files);
1133
1134 std::vector<std::string> logfiles;
1135 if (dbname_ != last_options_.wal_dir) {
1136 env_->GetChildren(last_options_.wal_dir, &logfiles);
1137 }
1138
1139 return files.size() + logfiles.size();
1140 }
1141
1142 uint64_t DBTestBase::Size(const Slice& start, const Slice& limit, int cf) {
1143 Range r(start, limit);
1144 uint64_t size;
1145 if (cf == 0) {
1146 db_->GetApproximateSizes(&r, 1, &size);
1147 } else {
1148 db_->GetApproximateSizes(handles_[1], &r, 1, &size);
1149 }
1150 return size;
1151 }
1152
1153 void DBTestBase::Compact(int cf, const Slice& start, const Slice& limit,
1154 uint32_t target_path_id) {
1155 CompactRangeOptions compact_options;
1156 compact_options.target_path_id = target_path_id;
1157 ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
1158 }
1159
1160 void DBTestBase::Compact(int cf, const Slice& start, const Slice& limit) {
1161 ASSERT_OK(
1162 db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
1163 }
1164
1165 void DBTestBase::Compact(const Slice& start, const Slice& limit) {
1166 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
1167 }
1168
1169 // Do n memtable compactions, each of which produces an sstable
1170 // covering the range [small,large].
1171 void DBTestBase::MakeTables(int n, const std::string& small,
1172 const std::string& large, int cf) {
1173 for (int i = 0; i < n; i++) {
1174 ASSERT_OK(Put(cf, small, "begin"));
1175 ASSERT_OK(Put(cf, large, "end"));
1176 ASSERT_OK(Flush(cf));
1177 MoveFilesToLevel(n - i - 1, cf);
1178 }
1179 }
1180
1181 // Prevent pushing of new sstables into deeper levels by adding
1182 // tables that cover a specified range to all levels.
1183 void DBTestBase::FillLevels(const std::string& smallest,
1184 const std::string& largest, int cf) {
1185 MakeTables(db_->NumberLevels(handles_[cf]), smallest, largest, cf);
1186 }
1187
1188 void DBTestBase::MoveFilesToLevel(int level, int cf) {
1189 for (int l = 0; l < level; ++l) {
1190 if (cf > 0) {
1191 EXPECT_OK(dbfull()->TEST_CompactRange(l, nullptr, nullptr, handles_[cf]));
1192 } else {
1193 EXPECT_OK(dbfull()->TEST_CompactRange(l, nullptr, nullptr));
1194 }
1195 }
1196 }
1197
1198 #ifndef ROCKSDB_LITE
1199 void DBTestBase::DumpFileCounts(const char* label) {
1200 fprintf(stderr, "---\n%s:\n", label);
1201 fprintf(stderr, "maxoverlap: %" PRIu64 "\n",
1202 dbfull()->TEST_MaxNextLevelOverlappingBytes());
1203 for (int level = 0; level < db_->NumberLevels(); level++) {
1204 int num = NumTableFilesAtLevel(level);
1205 if (num > 0) {
1206 fprintf(stderr, " level %3d : %d files\n", level, num);
1207 }
1208 }
1209 }
1210 #endif // !ROCKSDB_LITE
1211
1212 std::string DBTestBase::DumpSSTableList() {
1213 std::string property;
1214 db_->GetProperty("rocksdb.sstables", &property);
1215 return property;
1216 }
1217
1218 void DBTestBase::GetSstFiles(Env* env, std::string path,
1219 std::vector<std::string>* files) {
1220 EXPECT_OK(env->GetChildren(path, files));
1221
1222 files->erase(
1223 std::remove_if(files->begin(), files->end(), [](std::string name) {
1224 uint64_t number;
1225 FileType type;
1226 return !(ParseFileName(name, &number, &type) && type == kTableFile);
1227 }), files->end());
1228 }
1229
1230 int DBTestBase::GetSstFileCount(std::string path) {
1231 std::vector<std::string> files;
1232 DBTestBase::GetSstFiles(env_, path, &files);
1233 return static_cast<int>(files.size());
1234 }
1235
1236 // this will generate non-overlapping files since it keeps increasing key_idx
1237 void DBTestBase::GenerateNewFile(int cf, Random* rnd, int* key_idx,
1238 bool nowait) {
1239 for (int i = 0; i < KNumKeysByGenerateNewFile; i++) {
1240 ASSERT_OK(Put(cf, Key(*key_idx), rnd->RandomString((i == 99) ? 1 : 990)));
1241 (*key_idx)++;
1242 }
1243 if (!nowait) {
1244 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1245 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1246 }
1247 }
1248
1249 // this will generate non-overlapping files since it keeps increasing key_idx
1250 void DBTestBase::GenerateNewFile(Random* rnd, int* key_idx, bool nowait) {
1251 for (int i = 0; i < KNumKeysByGenerateNewFile; i++) {
1252 ASSERT_OK(Put(Key(*key_idx), rnd->RandomString((i == 99) ? 1 : 990)));
1253 (*key_idx)++;
1254 }
1255 if (!nowait) {
1256 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1257 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1258 }
1259 }
1260
1261 const int DBTestBase::kNumKeysByGenerateNewRandomFile = 51;
1262
1263 void DBTestBase::GenerateNewRandomFile(Random* rnd, bool nowait) {
1264 for (int i = 0; i < kNumKeysByGenerateNewRandomFile; i++) {
1265 ASSERT_OK(Put("key" + rnd->RandomString(7), rnd->RandomString(2000)));
1266 }
1267 ASSERT_OK(Put("key" + rnd->RandomString(7), rnd->RandomString(200)));
1268 if (!nowait) {
1269 dbfull()->TEST_WaitForFlushMemTable();
1270 dbfull()->TEST_WaitForCompact();
1271 }
1272 }
1273
1274 std::string DBTestBase::IterStatus(Iterator* iter) {
1275 std::string result;
1276 if (iter->Valid()) {
1277 result = iter->key().ToString() + "->" + iter->value().ToString();
1278 } else {
1279 result = "(invalid)";
1280 }
1281 return result;
1282 }
1283
1284 Options DBTestBase::OptionsForLogIterTest() {
1285 Options options = CurrentOptions();
1286 options.create_if_missing = true;
1287 options.WAL_ttl_seconds = 1000;
1288 return options;
1289 }
1290
1291 std::string DBTestBase::DummyString(size_t len, char c) {
1292 return std::string(len, c);
1293 }
1294
1295 void DBTestBase::VerifyIterLast(std::string expected_key, int cf) {
1296 Iterator* iter;
1297 ReadOptions ro;
1298 if (cf == 0) {
1299 iter = db_->NewIterator(ro);
1300 } else {
1301 iter = db_->NewIterator(ro, handles_[cf]);
1302 }
1303 iter->SeekToLast();
1304 ASSERT_EQ(IterStatus(iter), expected_key);
1305 delete iter;
1306 }
1307
1308 // Used to test InplaceUpdate
1309
1310 // If previous value is nullptr or delta is > than previous value,
1311 // sets newValue with delta
1312 // If previous value is not empty,
1313 // updates previous value with 'b' string of previous value size - 1.
1314 UpdateStatus DBTestBase::updateInPlaceSmallerSize(char* prevValue,
1315 uint32_t* prevSize,
1316 Slice delta,
1317 std::string* newValue) {
1318 if (prevValue == nullptr) {
1319 *newValue = std::string(delta.size(), 'c');
1320 return UpdateStatus::UPDATED;
1321 } else {
1322 *prevSize = *prevSize - 1;
1323 std::string str_b = std::string(*prevSize, 'b');
1324 memcpy(prevValue, str_b.c_str(), str_b.size());
1325 return UpdateStatus::UPDATED_INPLACE;
1326 }
1327 }
1328
1329 UpdateStatus DBTestBase::updateInPlaceSmallerVarintSize(char* prevValue,
1330 uint32_t* prevSize,
1331 Slice delta,
1332 std::string* newValue) {
1333 if (prevValue == nullptr) {
1334 *newValue = std::string(delta.size(), 'c');
1335 return UpdateStatus::UPDATED;
1336 } else {
1337 *prevSize = 1;
1338 std::string str_b = std::string(*prevSize, 'b');
1339 memcpy(prevValue, str_b.c_str(), str_b.size());
1340 return UpdateStatus::UPDATED_INPLACE;
1341 }
1342 }
1343
1344 UpdateStatus DBTestBase::updateInPlaceLargerSize(char* /*prevValue*/,
1345 uint32_t* /*prevSize*/,
1346 Slice delta,
1347 std::string* newValue) {
1348 *newValue = std::string(delta.size(), 'c');
1349 return UpdateStatus::UPDATED;
1350 }
1351
1352 UpdateStatus DBTestBase::updateInPlaceNoAction(char* /*prevValue*/,
1353 uint32_t* /*prevSize*/,
1354 Slice /*delta*/,
1355 std::string* /*newValue*/) {
1356 return UpdateStatus::UPDATE_FAILED;
1357 }
1358
1359 // Utility method to test InplaceUpdate
1360 void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
1361 Arena arena;
1362 auto options = CurrentOptions();
1363 InternalKeyComparator icmp(options.comparator);
1364 ReadRangeDelAggregator range_del_agg(&icmp,
1365 kMaxSequenceNumber /* upper_bound */);
1366 // This should be defined after range_del_agg so that it destructs the
1367 // assigned iterator before it range_del_agg is already destructed.
1368 ReadOptions read_options;
1369 ScopedArenaIterator iter;
1370 if (cf != 0) {
1371 iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
1372 kMaxSequenceNumber, handles_[cf]));
1373 } else {
1374 iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
1375 kMaxSequenceNumber));
1376 }
1377 iter->SeekToFirst();
1378 ASSERT_OK(iter->status());
1379 int seq = numValues;
1380 while (iter->Valid()) {
1381 ParsedInternalKey ikey;
1382 ikey.clear();
1383 ASSERT_OK(ParseInternalKey(iter->key(), &ikey, true /* log_err_key */));
1384
1385 // checks sequence number for updates
1386 ASSERT_EQ(ikey.sequence, (unsigned)seq--);
1387 iter->Next();
1388 }
1389 ASSERT_EQ(0, seq);
1390 }
1391
1392 void DBTestBase::CopyFile(const std::string& source,
1393 const std::string& destination, uint64_t size) {
1394 const EnvOptions soptions;
1395 std::unique_ptr<SequentialFile> srcfile;
1396 ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions));
1397 std::unique_ptr<WritableFile> destfile;
1398 ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions));
1399
1400 if (size == 0) {
1401 // default argument means copy everything
1402 ASSERT_OK(env_->GetFileSize(source, &size));
1403 }
1404
1405 char buffer[4096];
1406 Slice slice;
1407 while (size > 0) {
1408 uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
1409 ASSERT_OK(srcfile->Read(one, &slice, buffer));
1410 ASSERT_OK(destfile->Append(slice));
1411 size -= slice.size();
1412 }
1413 ASSERT_OK(destfile->Close());
1414 }
1415
1416 Status DBTestBase::GetAllSSTFiles(
1417 std::unordered_map<std::string, uint64_t>* sst_files,
1418 uint64_t* total_size /* = nullptr */) {
1419 if (total_size) {
1420 *total_size = 0;
1421 }
1422 std::vector<std::string> files;
1423 Status s = env_->GetChildren(dbname_, &files);
1424 if (s.ok()) {
1425 for (auto& file_name : files) {
1426 uint64_t number;
1427 FileType type;
1428 if (ParseFileName(file_name, &number, &type) && type == kTableFile) {
1429 std::string file_path = dbname_ + "/" + file_name;
1430 uint64_t file_size = 0;
1431 s = env_->GetFileSize(file_path, &file_size);
1432 if (!s.ok()) {
1433 break;
1434 }
1435 (*sst_files)[file_path] = file_size;
1436 if (total_size) {
1437 *total_size += file_size;
1438 }
1439 }
1440 }
1441 }
1442 return s;
1443 }
1444
1445 std::vector<std::uint64_t> DBTestBase::ListTableFiles(Env* env,
1446 const std::string& path) {
1447 std::vector<std::string> files;
1448 std::vector<uint64_t> file_numbers;
1449 EXPECT_OK(env->GetChildren(path, &files));
1450 uint64_t number;
1451 FileType type;
1452 for (size_t i = 0; i < files.size(); ++i) {
1453 if (ParseFileName(files[i], &number, &type)) {
1454 if (type == kTableFile) {
1455 file_numbers.push_back(number);
1456 }
1457 }
1458 }
1459 return file_numbers;
1460 }
1461
1462 void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
1463 size_t* total_reads_res, bool tailing_iter,
1464 std::map<std::string, Status> status) {
1465 size_t total_reads = 0;
1466
1467 for (auto& kv : true_data) {
1468 Status s = status[kv.first];
1469 if (s.ok()) {
1470 ASSERT_EQ(Get(kv.first), kv.second);
1471 } else {
1472 std::string value;
1473 ASSERT_EQ(s, db_->Get(ReadOptions(), kv.first, &value));
1474 }
1475 total_reads++;
1476 }
1477
1478 // Normal Iterator
1479 {
1480 int iter_cnt = 0;
1481 ReadOptions ro;
1482 ro.total_order_seek = true;
1483 Iterator* iter = db_->NewIterator(ro);
1484 // Verify Iterator::Next()
1485 iter_cnt = 0;
1486 auto data_iter = true_data.begin();
1487 Status s;
1488 for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
1489 ASSERT_EQ(iter->key().ToString(), data_iter->first);
1490 Status current_status = status[data_iter->first];
1491 if (!current_status.ok()) {
1492 s = current_status;
1493 }
1494 ASSERT_EQ(iter->status(), s);
1495 if (current_status.ok()) {
1496 ASSERT_EQ(iter->value().ToString(), data_iter->second);
1497 }
1498 iter_cnt++;
1499 total_reads++;
1500 }
1501 ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
1502 << true_data.size();
1503 delete iter;
1504
1505 // Verify Iterator::Prev()
1506 // Use a new iterator to make sure its status is clean.
1507 iter = db_->NewIterator(ro);
1508 iter_cnt = 0;
1509 s = Status::OK();
1510 auto data_rev = true_data.rbegin();
1511 for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
1512 ASSERT_EQ(iter->key().ToString(), data_rev->first);
1513 Status current_status = status[data_rev->first];
1514 if (!current_status.ok()) {
1515 s = current_status;
1516 }
1517 ASSERT_EQ(iter->status(), s);
1518 if (current_status.ok()) {
1519 ASSERT_EQ(iter->value().ToString(), data_rev->second);
1520 }
1521 iter_cnt++;
1522 total_reads++;
1523 }
1524 ASSERT_EQ(data_rev, true_data.rend()) << iter_cnt << " / "
1525 << true_data.size();
1526
1527 // Verify Iterator::Seek()
1528 for (auto kv : true_data) {
1529 iter->Seek(kv.first);
1530 ASSERT_EQ(kv.first, iter->key().ToString());
1531 ASSERT_EQ(kv.second, iter->value().ToString());
1532 total_reads++;
1533 }
1534 delete iter;
1535 }
1536
1537 if (tailing_iter) {
1538 #ifndef ROCKSDB_LITE
1539 // Tailing iterator
1540 int iter_cnt = 0;
1541 ReadOptions ro;
1542 ro.tailing = true;
1543 ro.total_order_seek = true;
1544 Iterator* iter = db_->NewIterator(ro);
1545
1546 // Verify ForwardIterator::Next()
1547 iter_cnt = 0;
1548 auto data_iter = true_data.begin();
1549 for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
1550 ASSERT_EQ(iter->key().ToString(), data_iter->first);
1551 ASSERT_EQ(iter->value().ToString(), data_iter->second);
1552 iter_cnt++;
1553 total_reads++;
1554 }
1555 ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
1556 << true_data.size();
1557
1558 // Verify ForwardIterator::Seek()
1559 for (auto kv : true_data) {
1560 iter->Seek(kv.first);
1561 ASSERT_EQ(kv.first, iter->key().ToString());
1562 ASSERT_EQ(kv.second, iter->value().ToString());
1563 total_reads++;
1564 }
1565
1566 delete iter;
1567 #endif // ROCKSDB_LITE
1568 }
1569
1570 if (total_reads_res) {
1571 *total_reads_res = total_reads;
1572 }
1573 }
1574
1575 void DBTestBase::VerifyDBInternal(
1576 std::vector<std::pair<std::string, std::string>> true_data) {
1577 Arena arena;
1578 InternalKeyComparator icmp(last_options_.comparator);
1579 ReadRangeDelAggregator range_del_agg(&icmp,
1580 kMaxSequenceNumber /* upper_bound */);
1581 ReadOptions read_options;
1582 auto iter = dbfull()->NewInternalIterator(read_options, &arena,
1583 &range_del_agg, kMaxSequenceNumber);
1584 iter->SeekToFirst();
1585 for (auto p : true_data) {
1586 ASSERT_TRUE(iter->Valid());
1587 ParsedInternalKey ikey;
1588 ASSERT_OK(ParseInternalKey(iter->key(), &ikey, true /* log_err_key */));
1589 ASSERT_EQ(p.first, ikey.user_key);
1590 ASSERT_EQ(p.second, iter->value());
1591 iter->Next();
1592 };
1593 ASSERT_FALSE(iter->Valid());
1594 iter->~InternalIterator();
1595 }
1596
1597 #ifndef ROCKSDB_LITE
1598
1599 uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily(
1600 DB* db, std::string column_family_name) {
1601 std::vector<LiveFileMetaData> metadata;
1602 db->GetLiveFilesMetaData(&metadata);
1603 uint64_t result = 0;
1604 for (auto& fileMetadata : metadata) {
1605 result += (fileMetadata.column_family_name == column_family_name);
1606 }
1607 return result;
1608 }
1609 #endif // ROCKSDB_LITE
1610
1611 } // namespace ROCKSDB_NAMESPACE