]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/forward_iterator_bench.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / forward_iterator_bench.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae 5
7c673cae
FG
6#if !defined(GFLAGS) || defined(ROCKSDB_LITE)
7#include <cstdio>
8int main() {
9 fprintf(stderr, "Please install gflags to run rocksdb tools\n");
10 return 1;
11}
12#elif defined(OS_MACOSX) || defined(OS_WIN)
13// Block forward_iterator_bench under MAC and Windows
14int main() { return 0; }
15#else
7c673cae
FG
16#include <semaphore.h>
17#include <atomic>
18#include <bitset>
19#include <chrono>
20#include <climits>
21#include <condition_variable>
22#include <limits>
23#include <mutex>
24#include <queue>
25#include <random>
26#include <thread>
27
11fdf7f2 28#include "port/port.h"
7c673cae
FG
29#include "rocksdb/cache.h"
30#include "rocksdb/db.h"
31#include "rocksdb/status.h"
32#include "rocksdb/table.h"
f67539c2 33#include "test_util/testharness.h"
11fdf7f2 34#include "util/gflags_compat.h"
7c673cae
FG
35
36const int MAX_SHARDS = 100000;
37
38DEFINE_int32(writers, 8, "");
39DEFINE_int32(readers, 8, "");
40DEFINE_int64(rate, 100000, "");
41DEFINE_int64(value_size, 300, "");
42DEFINE_int64(shards, 1000, "");
43DEFINE_int64(memtable_size, 500000000, "");
44DEFINE_int64(block_cache_size, 300000000, "");
45DEFINE_int64(block_size, 65536, "");
46DEFINE_double(runtime, 300.0, "");
47DEFINE_bool(cache_only_first, true, "");
48DEFINE_bool(iterate_upper_bound, true, "");
49
50struct Stats {
51 char pad1[128] __attribute__((__unused__));
52 std::atomic<uint64_t> written{0};
53 char pad2[128] __attribute__((__unused__));
54 std::atomic<uint64_t> read{0};
55 std::atomic<uint64_t> cache_misses{0};
56 char pad3[128] __attribute__((__unused__));
57} stats;
58
59struct Key {
60 Key() {}
61 Key(uint64_t shard_in, uint64_t seqno_in)
62 : shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {}
63
64 uint64_t shard() const { return be64toh(shard_be); }
65 uint64_t seqno() const { return be64toh(seqno_be); }
66
67 private:
68 uint64_t shard_be;
69 uint64_t seqno_be;
70} __attribute__((__packed__));
71
72struct Reader;
73struct Writer;
74
75struct ShardState {
76 char pad1[128] __attribute__((__unused__));
77 std::atomic<uint64_t> last_written{0};
78 Writer* writer;
79 Reader* reader;
80 char pad2[128] __attribute__((__unused__));
81 std::atomic<uint64_t> last_read{0};
f67539c2
TL
82 std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it;
83 std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it_cacheonly;
7c673cae 84 Key upper_bound;
f67539c2 85 ROCKSDB_NAMESPACE::Slice upper_bound_slice;
7c673cae
FG
86 char pad3[128] __attribute__((__unused__));
87};
88
89struct Reader {
90 public:
f67539c2
TL
91 explicit Reader(std::vector<ShardState>* shard_states,
92 ROCKSDB_NAMESPACE::DB* db)
7c673cae
FG
93 : shard_states_(shard_states), db_(db) {
94 sem_init(&sem_, 0, 0);
95 thread_ = port::Thread(&Reader::run, this);
96 }
97
98 void run() {
99 while (1) {
100 sem_wait(&sem_);
101 if (done_.load()) {
102 break;
103 }
104
105 uint64_t shard;
106 {
107 std::lock_guard<std::mutex> guard(queue_mutex_);
108 assert(!shards_pending_queue_.empty());
109 shard = shards_pending_queue_.front();
110 shards_pending_queue_.pop();
111 shards_pending_set_.reset(shard);
112 }
113 readOnceFromShard(shard);
114 }
115 }
116
117 void readOnceFromShard(uint64_t shard) {
118 ShardState& state = (*shard_states_)[shard];
119 if (!state.it) {
120 // Initialize iterators
f67539c2 121 ROCKSDB_NAMESPACE::ReadOptions options;
7c673cae
FG
122 options.tailing = true;
123 if (FLAGS_iterate_upper_bound) {
124 state.upper_bound = Key(shard, std::numeric_limits<uint64_t>::max());
f67539c2 125 state.upper_bound_slice = ROCKSDB_NAMESPACE::Slice(
7c673cae
FG
126 (const char*)&state.upper_bound, sizeof(state.upper_bound));
127 options.iterate_upper_bound = &state.upper_bound_slice;
128 }
129
130 state.it.reset(db_->NewIterator(options));
131
132 if (FLAGS_cache_only_first) {
f67539c2 133 options.read_tier = ROCKSDB_NAMESPACE::ReadTier::kBlockCacheTier;
7c673cae
FG
134 state.it_cacheonly.reset(db_->NewIterator(options));
135 }
136 }
137
138 const uint64_t upto = state.last_written.load();
f67539c2
TL
139 for (ROCKSDB_NAMESPACE::Iterator* it :
140 {state.it_cacheonly.get(), state.it.get()}) {
7c673cae
FG
141 if (it == nullptr) {
142 continue;
143 }
144 if (state.last_read.load() >= upto) {
145 break;
146 }
147 bool need_seek = true;
148 for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) {
149 if (need_seek) {
150 Key from(shard, state.last_read.load() + 1);
f67539c2 151 it->Seek(ROCKSDB_NAMESPACE::Slice((const char*)&from, sizeof(from)));
7c673cae
FG
152 need_seek = false;
153 } else {
154 it->Next();
155 }
156 if (it->status().IsIncomplete()) {
157 ++::stats.cache_misses;
158 break;
159 }
160 assert(it->Valid());
161 assert(it->key().size() == sizeof(Key));
162 Key key;
163 memcpy(&key, it->key().data(), it->key().size());
164 // fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n",
165 // shard, seq, key.shard(), key.seqno());
166 assert(key.shard() == shard);
167 assert(key.seqno() == seq);
168 state.last_read.store(seq);
169 ++::stats.read;
170 }
171 }
172 }
173
174 void onWrite(uint64_t shard) {
175 {
176 std::lock_guard<std::mutex> guard(queue_mutex_);
177 if (!shards_pending_set_.test(shard)) {
178 shards_pending_queue_.push(shard);
179 shards_pending_set_.set(shard);
180 sem_post(&sem_);
181 }
182 }
183 }
184
185 ~Reader() {
186 done_.store(true);
187 sem_post(&sem_);
188 thread_.join();
189 }
190
191 private:
192 char pad1[128] __attribute__((__unused__));
193 std::vector<ShardState>* shard_states_;
f67539c2
TL
194 ROCKSDB_NAMESPACE::DB* db_;
195 ROCKSDB_NAMESPACE::port::Thread thread_;
7c673cae
FG
196 sem_t sem_;
197 std::mutex queue_mutex_;
198 std::bitset<MAX_SHARDS + 1> shards_pending_set_;
199 std::queue<uint64_t> shards_pending_queue_;
200 std::atomic<bool> done_{false};
201 char pad2[128] __attribute__((__unused__));
202};
203
204struct Writer {
f67539c2
TL
205 explicit Writer(std::vector<ShardState>* shard_states,
206 ROCKSDB_NAMESPACE::DB* db)
7c673cae
FG
207 : shard_states_(shard_states), db_(db) {}
208
209 void start() { thread_ = port::Thread(&Writer::run, this); }
210
211 void run() {
212 std::queue<std::chrono::steady_clock::time_point> workq;
213 std::chrono::steady_clock::time_point deadline(
214 std::chrono::steady_clock::now() +
215 std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime)));
216 std::vector<uint64_t> my_shards;
217 for (int i = 1; i <= FLAGS_shards; ++i) {
218 if ((*shard_states_)[i].writer == this) {
219 my_shards.push_back(i);
220 }
221 }
222
223 std::mt19937 rng{std::random_device()()};
224 std::uniform_int_distribution<int> shard_dist(
225 0, static_cast<int>(my_shards.size()) - 1);
226 std::string value(FLAGS_value_size, '*');
227
228 while (1) {
229 auto now = std::chrono::steady_clock::now();
230 if (FLAGS_runtime >= 0 && now >= deadline) {
231 break;
232 }
233 if (workq.empty()) {
234 for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) {
235 std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate);
236 workq.push(now + offset);
237 }
238 }
239 while (!workq.empty() && workq.front() < now) {
240 workq.pop();
241 uint64_t shard = my_shards[shard_dist(rng)];
242 ShardState& state = (*shard_states_)[shard];
243 uint64_t seqno = state.last_written.load() + 1;
244 Key key(shard, seqno);
245 // fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno);
f67539c2
TL
246 ROCKSDB_NAMESPACE::Status status =
247 db_->Put(ROCKSDB_NAMESPACE::WriteOptions(),
248 ROCKSDB_NAMESPACE::Slice((const char*)&key, sizeof(key)),
249 ROCKSDB_NAMESPACE::Slice(value));
7c673cae
FG
250 assert(status.ok());
251 state.last_written.store(seqno);
252 state.reader->onWrite(shard);
253 ++::stats.written;
254 }
255 std::this_thread::sleep_for(std::chrono::milliseconds(1));
256 }
257 // fprintf(stderr, "Writer done\n");
258 }
259
260 ~Writer() { thread_.join(); }
261
262 private:
263 char pad1[128] __attribute__((__unused__));
264 std::vector<ShardState>* shard_states_;
f67539c2
TL
265 ROCKSDB_NAMESPACE::DB* db_;
266 ROCKSDB_NAMESPACE::port::Thread thread_;
7c673cae
FG
267 char pad2[128] __attribute__((__unused__));
268};
269
270struct StatsThread {
f67539c2 271 explicit StatsThread(ROCKSDB_NAMESPACE::DB* db)
7c673cae
FG
272 : db_(db), thread_(&StatsThread::run, this) {}
273
274 void run() {
275 // using namespace std::chrono;
276 auto tstart = std::chrono::steady_clock::now(), tlast = tstart;
277 uint64_t wlast = 0, rlast = 0;
278 while (!done_.load()) {
279 {
280 std::unique_lock<std::mutex> lock(cvm_);
281 cv_.wait_for(lock, std::chrono::seconds(1));
282 }
283 auto now = std::chrono::steady_clock::now();
284 double elapsed =
285 std::chrono::duration_cast<std::chrono::duration<double> >(
286 now - tlast).count();
287 uint64_t w = ::stats.written.load();
288 uint64_t r = ::stats.read.load();
289 fprintf(stderr,
290 "%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | "
291 "r/s %10.0f | cache misses %10ld\n",
292 db_->GetEnv()->TimeToString(time(nullptr)).c_str(),
293 std::chrono::duration_cast<std::chrono::seconds>(now - tstart)
294 .count(),
295 w, (w - wlast) / elapsed, r, (r - rlast) / elapsed,
296 ::stats.cache_misses.load());
297 wlast = w;
298 rlast = r;
299 tlast = now;
300 }
301 }
302
303 ~StatsThread() {
304 {
305 std::lock_guard<std::mutex> guard(cvm_);
306 done_.store(true);
307 }
308 cv_.notify_all();
309 thread_.join();
310 }
311
312 private:
f67539c2 313 ROCKSDB_NAMESPACE::DB* db_;
7c673cae
FG
314 std::mutex cvm_;
315 std::condition_variable cv_;
f67539c2 316 ROCKSDB_NAMESPACE::port::Thread thread_;
7c673cae
FG
317 std::atomic<bool> done_{false};
318};
319
320int main(int argc, char** argv) {
11fdf7f2 321 GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
7c673cae
FG
322
323 std::mt19937 rng{std::random_device()()};
f67539c2
TL
324 ROCKSDB_NAMESPACE::Status status;
325 std::string path =
326 ROCKSDB_NAMESPACE::test::PerThreadDBPath("forward_iterator_test");
7c673cae 327 fprintf(stderr, "db path is %s\n", path.c_str());
f67539c2 328 ROCKSDB_NAMESPACE::Options options;
7c673cae 329 options.create_if_missing = true;
f67539c2
TL
330 options.compression = ROCKSDB_NAMESPACE::CompressionType::kNoCompression;
331 options.compaction_style =
332 ROCKSDB_NAMESPACE::CompactionStyle::kCompactionStyleNone;
7c673cae
FG
333 options.level0_slowdown_writes_trigger = 99999;
334 options.level0_stop_writes_trigger = 99999;
335 options.use_direct_io_for_flush_and_compaction = true;
336 options.write_buffer_size = FLAGS_memtable_size;
f67539c2
TL
337 ROCKSDB_NAMESPACE::BlockBasedTableOptions table_options;
338 table_options.block_cache =
339 ROCKSDB_NAMESPACE::NewLRUCache(FLAGS_block_cache_size);
7c673cae
FG
340 table_options.block_size = FLAGS_block_size;
341 options.table_factory.reset(
f67539c2 342 ROCKSDB_NAMESPACE::NewBlockBasedTableFactory(table_options));
7c673cae 343
f67539c2 344 status = ROCKSDB_NAMESPACE::DestroyDB(path, options);
7c673cae 345 assert(status.ok());
f67539c2
TL
346 ROCKSDB_NAMESPACE::DB* db_raw;
347 status = ROCKSDB_NAMESPACE::DB::Open(options, path, &db_raw);
7c673cae 348 assert(status.ok());
f67539c2 349 std::unique_ptr<ROCKSDB_NAMESPACE::DB> db(db_raw);
7c673cae
FG
350
351 std::vector<ShardState> shard_states(FLAGS_shards + 1);
352 std::deque<Reader> readers;
353 while (static_cast<int>(readers.size()) < FLAGS_readers) {
354 readers.emplace_back(&shard_states, db_raw);
355 }
356 std::deque<Writer> writers;
357 while (static_cast<int>(writers.size()) < FLAGS_writers) {
358 writers.emplace_back(&shard_states, db_raw);
359 }
360
361 // Each shard gets a random reader and random writer assigned to it
362 for (int i = 1; i <= FLAGS_shards; ++i) {
363 std::uniform_int_distribution<int> reader_dist(0, FLAGS_readers - 1);
364 std::uniform_int_distribution<int> writer_dist(0, FLAGS_writers - 1);
365 shard_states[i].reader = &readers[reader_dist(rng)];
366 shard_states[i].writer = &writers[writer_dist(rng)];
367 }
368
369 StatsThread stats_thread(db_raw);
370 for (Writer& w : writers) {
371 w.start();
372 }
373
374 writers.clear();
375 readers.clear();
376}
377#endif // !defined(GFLAGS) || defined(ROCKSDB_LITE)