1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #ifndef __STDC_FORMAT_MACROS
7 #define __STDC_FORMAT_MACROS
10 #if !defined(GFLAGS) || defined(ROCKSDB_LITE)
13 fprintf(stderr
, "Please install gflags to run rocksdb tools\n");
16 #elif defined(OS_MACOSX) || defined(OS_WIN)
17 // Block forward_iterator_bench under MAC and Windows
18 int main() { return 0; }
20 #include <semaphore.h>
25 #include <condition_variable>
32 #include "port/port.h"
33 #include "rocksdb/cache.h"
34 #include "rocksdb/db.h"
35 #include "rocksdb/status.h"
36 #include "rocksdb/table.h"
37 #include "util/gflags_compat.h"
38 #include "util/testharness.h"
40 const int MAX_SHARDS
= 100000;
42 DEFINE_int32(writers
, 8, "");
43 DEFINE_int32(readers
, 8, "");
44 DEFINE_int64(rate
, 100000, "");
45 DEFINE_int64(value_size
, 300, "");
46 DEFINE_int64(shards
, 1000, "");
47 DEFINE_int64(memtable_size
, 500000000, "");
48 DEFINE_int64(block_cache_size
, 300000000, "");
49 DEFINE_int64(block_size
, 65536, "");
50 DEFINE_double(runtime
, 300.0, "");
51 DEFINE_bool(cache_only_first
, true, "");
52 DEFINE_bool(iterate_upper_bound
, true, "");
55 char pad1
[128] __attribute__((__unused__
));
56 std::atomic
<uint64_t> written
{0};
57 char pad2
[128] __attribute__((__unused__
));
58 std::atomic
<uint64_t> read
{0};
59 std::atomic
<uint64_t> cache_misses
{0};
60 char pad3
[128] __attribute__((__unused__
));
65 Key(uint64_t shard_in
, uint64_t seqno_in
)
66 : shard_be(htobe64(shard_in
)), seqno_be(htobe64(seqno_in
)) {}
68 uint64_t shard() const { return be64toh(shard_be
); }
69 uint64_t seqno() const { return be64toh(seqno_be
); }
74 } __attribute__((__packed__
));
80 char pad1
[128] __attribute__((__unused__
));
81 std::atomic
<uint64_t> last_written
{0};
84 char pad2
[128] __attribute__((__unused__
));
85 std::atomic
<uint64_t> last_read
{0};
86 std::unique_ptr
<rocksdb::Iterator
> it
;
87 std::unique_ptr
<rocksdb::Iterator
> it_cacheonly
;
89 rocksdb::Slice upper_bound_slice
;
90 char pad3
[128] __attribute__((__unused__
));
95 explicit Reader(std::vector
<ShardState
>* shard_states
, rocksdb::DB
* db
)
96 : shard_states_(shard_states
), db_(db
) {
97 sem_init(&sem_
, 0, 0);
98 thread_
= port::Thread(&Reader::run
, this);
110 std::lock_guard
<std::mutex
> guard(queue_mutex_
);
111 assert(!shards_pending_queue_
.empty());
112 shard
= shards_pending_queue_
.front();
113 shards_pending_queue_
.pop();
114 shards_pending_set_
.reset(shard
);
116 readOnceFromShard(shard
);
120 void readOnceFromShard(uint64_t shard
) {
121 ShardState
& state
= (*shard_states_
)[shard
];
123 // Initialize iterators
124 rocksdb::ReadOptions options
;
125 options
.tailing
= true;
126 if (FLAGS_iterate_upper_bound
) {
127 state
.upper_bound
= Key(shard
, std::numeric_limits
<uint64_t>::max());
128 state
.upper_bound_slice
= rocksdb::Slice(
129 (const char*)&state
.upper_bound
, sizeof(state
.upper_bound
));
130 options
.iterate_upper_bound
= &state
.upper_bound_slice
;
133 state
.it
.reset(db_
->NewIterator(options
));
135 if (FLAGS_cache_only_first
) {
136 options
.read_tier
= rocksdb::ReadTier::kBlockCacheTier
;
137 state
.it_cacheonly
.reset(db_
->NewIterator(options
));
141 const uint64_t upto
= state
.last_written
.load();
142 for (rocksdb::Iterator
* it
: {state
.it_cacheonly
.get(), state
.it
.get()}) {
146 if (state
.last_read
.load() >= upto
) {
149 bool need_seek
= true;
150 for (uint64_t seq
= state
.last_read
.load() + 1; seq
<= upto
; ++seq
) {
152 Key
from(shard
, state
.last_read
.load() + 1);
153 it
->Seek(rocksdb::Slice((const char*)&from
, sizeof(from
)));
158 if (it
->status().IsIncomplete()) {
159 ++::stats
.cache_misses
;
163 assert(it
->key().size() == sizeof(Key
));
165 memcpy(&key
, it
->key().data(), it
->key().size());
166 // fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n",
167 // shard, seq, key.shard(), key.seqno());
168 assert(key
.shard() == shard
);
169 assert(key
.seqno() == seq
);
170 state
.last_read
.store(seq
);
176 void onWrite(uint64_t shard
) {
178 std::lock_guard
<std::mutex
> guard(queue_mutex_
);
179 if (!shards_pending_set_
.test(shard
)) {
180 shards_pending_queue_
.push(shard
);
181 shards_pending_set_
.set(shard
);
194 char pad1
[128] __attribute__((__unused__
));
195 std::vector
<ShardState
>* shard_states_
;
197 rocksdb::port::Thread thread_
;
199 std::mutex queue_mutex_
;
200 std::bitset
<MAX_SHARDS
+ 1> shards_pending_set_
;
201 std::queue
<uint64_t> shards_pending_queue_
;
202 std::atomic
<bool> done_
{false};
203 char pad2
[128] __attribute__((__unused__
));
207 explicit Writer(std::vector
<ShardState
>* shard_states
, rocksdb::DB
* db
)
208 : shard_states_(shard_states
), db_(db
) {}
210 void start() { thread_
= port::Thread(&Writer::run
, this); }
213 std::queue
<std::chrono::steady_clock::time_point
> workq
;
214 std::chrono::steady_clock::time_point
deadline(
215 std::chrono::steady_clock::now() +
216 std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime
)));
217 std::vector
<uint64_t> my_shards
;
218 for (int i
= 1; i
<= FLAGS_shards
; ++i
) {
219 if ((*shard_states_
)[i
].writer
== this) {
220 my_shards
.push_back(i
);
224 std::mt19937 rng
{std::random_device()()};
225 std::uniform_int_distribution
<int> shard_dist(
226 0, static_cast<int>(my_shards
.size()) - 1);
227 std::string
value(FLAGS_value_size
, '*');
230 auto now
= std::chrono::steady_clock::now();
231 if (FLAGS_runtime
>= 0 && now
>= deadline
) {
235 for (int i
= 0; i
< FLAGS_rate
; i
+= FLAGS_writers
) {
236 std::chrono::nanoseconds
offset(1000000000LL * i
/ FLAGS_rate
);
237 workq
.push(now
+ offset
);
240 while (!workq
.empty() && workq
.front() < now
) {
242 uint64_t shard
= my_shards
[shard_dist(rng
)];
243 ShardState
& state
= (*shard_states_
)[shard
];
244 uint64_t seqno
= state
.last_written
.load() + 1;
245 Key
key(shard
, seqno
);
246 // fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno);
247 rocksdb::Status status
=
248 db_
->Put(rocksdb::WriteOptions(),
249 rocksdb::Slice((const char*)&key
, sizeof(key
)),
250 rocksdb::Slice(value
));
252 state
.last_written
.store(seqno
);
253 state
.reader
->onWrite(shard
);
256 std::this_thread::sleep_for(std::chrono::milliseconds(1));
258 // fprintf(stderr, "Writer done\n");
261 ~Writer() { thread_
.join(); }
264 char pad1
[128] __attribute__((__unused__
));
265 std::vector
<ShardState
>* shard_states_
;
267 rocksdb::port::Thread thread_
;
268 char pad2
[128] __attribute__((__unused__
));
272 explicit StatsThread(rocksdb::DB
* db
)
273 : db_(db
), thread_(&StatsThread::run
, this) {}
276 // using namespace std::chrono;
277 auto tstart
= std::chrono::steady_clock::now(), tlast
= tstart
;
278 uint64_t wlast
= 0, rlast
= 0;
279 while (!done_
.load()) {
281 std::unique_lock
<std::mutex
> lock(cvm_
);
282 cv_
.wait_for(lock
, std::chrono::seconds(1));
284 auto now
= std::chrono::steady_clock::now();
286 std::chrono::duration_cast
<std::chrono::duration
<double> >(
287 now
- tlast
).count();
288 uint64_t w
= ::stats
.written
.load();
289 uint64_t r
= ::stats
.read
.load();
291 "%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | "
292 "r/s %10.0f | cache misses %10ld\n",
293 db_
->GetEnv()->TimeToString(time(nullptr)).c_str(),
294 std::chrono::duration_cast
<std::chrono::seconds
>(now
- tstart
)
296 w
, (w
- wlast
) / elapsed
, r
, (r
- rlast
) / elapsed
,
297 ::stats
.cache_misses
.load());
306 std::lock_guard
<std::mutex
> guard(cvm_
);
316 std::condition_variable cv_
;
317 rocksdb::port::Thread thread_
;
318 std::atomic
<bool> done_
{false};
321 int main(int argc
, char** argv
) {
322 GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc
, &argv
, true);
324 std::mt19937 rng
{std::random_device()()};
325 rocksdb::Status status
;
326 std::string path
= rocksdb::test::PerThreadDBPath("forward_iterator_test");
327 fprintf(stderr
, "db path is %s\n", path
.c_str());
328 rocksdb::Options options
;
329 options
.create_if_missing
= true;
330 options
.compression
= rocksdb::CompressionType::kNoCompression
;
331 options
.compaction_style
= rocksdb::CompactionStyle::kCompactionStyleNone
;
332 options
.level0_slowdown_writes_trigger
= 99999;
333 options
.level0_stop_writes_trigger
= 99999;
334 options
.use_direct_io_for_flush_and_compaction
= true;
335 options
.write_buffer_size
= FLAGS_memtable_size
;
336 rocksdb::BlockBasedTableOptions table_options
;
337 table_options
.block_cache
= rocksdb::NewLRUCache(FLAGS_block_cache_size
);
338 table_options
.block_size
= FLAGS_block_size
;
339 options
.table_factory
.reset(
340 rocksdb::NewBlockBasedTableFactory(table_options
));
342 status
= rocksdb::DestroyDB(path
, options
);
345 status
= rocksdb::DB::Open(options
, path
, &db_raw
);
347 std::unique_ptr
<rocksdb::DB
> db(db_raw
);
349 std::vector
<ShardState
> shard_states(FLAGS_shards
+ 1);
350 std::deque
<Reader
> readers
;
351 while (static_cast<int>(readers
.size()) < FLAGS_readers
) {
352 readers
.emplace_back(&shard_states
, db_raw
);
354 std::deque
<Writer
> writers
;
355 while (static_cast<int>(writers
.size()) < FLAGS_writers
) {
356 writers
.emplace_back(&shard_states
, db_raw
);
359 // Each shard gets a random reader and random writer assigned to it
360 for (int i
= 1; i
<= FLAGS_shards
; ++i
) {
361 std::uniform_int_distribution
<int> reader_dist(0, FLAGS_readers
- 1);
362 std::uniform_int_distribution
<int> writer_dist(0, FLAGS_writers
- 1);
363 shard_states
[i
].reader
= &readers
[reader_dist(rng
)];
364 shard_states
[i
].writer
= &writers
[writer_dist(rng
)];
367 StatsThread
stats_thread(db_raw
);
368 for (Writer
& w
: writers
) {
375 #endif // !defined(GFLAGS) || defined(ROCKSDB_LITE)