]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/forward_iterator_bench.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / forward_iterator_bench.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef __STDC_FORMAT_MACROS
7 #define __STDC_FORMAT_MACROS
8 #endif
9
10 #if !defined(GFLAGS) || defined(ROCKSDB_LITE)
11 #include <cstdio>
12 int main() {
13 fprintf(stderr, "Please install gflags to run rocksdb tools\n");
14 return 1;
15 }
16 #elif defined(OS_MACOSX) || defined(OS_WIN)
17 // Block forward_iterator_bench under MAC and Windows
18 int main() { return 0; }
19 #else
20 #include <semaphore.h>
21 #include <atomic>
22 #include <bitset>
23 #include <chrono>
24 #include <climits>
25 #include <condition_variable>
26 #include <limits>
27 #include <mutex>
28 #include <queue>
29 #include <random>
30 #include <thread>
31
32 #include "port/port.h"
33 #include "rocksdb/cache.h"
34 #include "rocksdb/db.h"
35 #include "rocksdb/status.h"
36 #include "rocksdb/table.h"
37 #include "util/gflags_compat.h"
38 #include "util/testharness.h"
39
40 const int MAX_SHARDS = 100000;
41
42 DEFINE_int32(writers, 8, "");
43 DEFINE_int32(readers, 8, "");
44 DEFINE_int64(rate, 100000, "");
45 DEFINE_int64(value_size, 300, "");
46 DEFINE_int64(shards, 1000, "");
47 DEFINE_int64(memtable_size, 500000000, "");
48 DEFINE_int64(block_cache_size, 300000000, "");
49 DEFINE_int64(block_size, 65536, "");
50 DEFINE_double(runtime, 300.0, "");
51 DEFINE_bool(cache_only_first, true, "");
52 DEFINE_bool(iterate_upper_bound, true, "");
53
54 struct Stats {
55 char pad1[128] __attribute__((__unused__));
56 std::atomic<uint64_t> written{0};
57 char pad2[128] __attribute__((__unused__));
58 std::atomic<uint64_t> read{0};
59 std::atomic<uint64_t> cache_misses{0};
60 char pad3[128] __attribute__((__unused__));
61 } stats;
62
63 struct Key {
64 Key() {}
65 Key(uint64_t shard_in, uint64_t seqno_in)
66 : shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {}
67
68 uint64_t shard() const { return be64toh(shard_be); }
69 uint64_t seqno() const { return be64toh(seqno_be); }
70
71 private:
72 uint64_t shard_be;
73 uint64_t seqno_be;
74 } __attribute__((__packed__));
75
76 struct Reader;
77 struct Writer;
78
79 struct ShardState {
80 char pad1[128] __attribute__((__unused__));
81 std::atomic<uint64_t> last_written{0};
82 Writer* writer;
83 Reader* reader;
84 char pad2[128] __attribute__((__unused__));
85 std::atomic<uint64_t> last_read{0};
86 std::unique_ptr<rocksdb::Iterator> it;
87 std::unique_ptr<rocksdb::Iterator> it_cacheonly;
88 Key upper_bound;
89 rocksdb::Slice upper_bound_slice;
90 char pad3[128] __attribute__((__unused__));
91 };
92
93 struct Reader {
94 public:
95 explicit Reader(std::vector<ShardState>* shard_states, rocksdb::DB* db)
96 : shard_states_(shard_states), db_(db) {
97 sem_init(&sem_, 0, 0);
98 thread_ = port::Thread(&Reader::run, this);
99 }
100
101 void run() {
102 while (1) {
103 sem_wait(&sem_);
104 if (done_.load()) {
105 break;
106 }
107
108 uint64_t shard;
109 {
110 std::lock_guard<std::mutex> guard(queue_mutex_);
111 assert(!shards_pending_queue_.empty());
112 shard = shards_pending_queue_.front();
113 shards_pending_queue_.pop();
114 shards_pending_set_.reset(shard);
115 }
116 readOnceFromShard(shard);
117 }
118 }
119
120 void readOnceFromShard(uint64_t shard) {
121 ShardState& state = (*shard_states_)[shard];
122 if (!state.it) {
123 // Initialize iterators
124 rocksdb::ReadOptions options;
125 options.tailing = true;
126 if (FLAGS_iterate_upper_bound) {
127 state.upper_bound = Key(shard, std::numeric_limits<uint64_t>::max());
128 state.upper_bound_slice = rocksdb::Slice(
129 (const char*)&state.upper_bound, sizeof(state.upper_bound));
130 options.iterate_upper_bound = &state.upper_bound_slice;
131 }
132
133 state.it.reset(db_->NewIterator(options));
134
135 if (FLAGS_cache_only_first) {
136 options.read_tier = rocksdb::ReadTier::kBlockCacheTier;
137 state.it_cacheonly.reset(db_->NewIterator(options));
138 }
139 }
140
141 const uint64_t upto = state.last_written.load();
142 for (rocksdb::Iterator* it : {state.it_cacheonly.get(), state.it.get()}) {
143 if (it == nullptr) {
144 continue;
145 }
146 if (state.last_read.load() >= upto) {
147 break;
148 }
149 bool need_seek = true;
150 for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) {
151 if (need_seek) {
152 Key from(shard, state.last_read.load() + 1);
153 it->Seek(rocksdb::Slice((const char*)&from, sizeof(from)));
154 need_seek = false;
155 } else {
156 it->Next();
157 }
158 if (it->status().IsIncomplete()) {
159 ++::stats.cache_misses;
160 break;
161 }
162 assert(it->Valid());
163 assert(it->key().size() == sizeof(Key));
164 Key key;
165 memcpy(&key, it->key().data(), it->key().size());
166 // fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n",
167 // shard, seq, key.shard(), key.seqno());
168 assert(key.shard() == shard);
169 assert(key.seqno() == seq);
170 state.last_read.store(seq);
171 ++::stats.read;
172 }
173 }
174 }
175
176 void onWrite(uint64_t shard) {
177 {
178 std::lock_guard<std::mutex> guard(queue_mutex_);
179 if (!shards_pending_set_.test(shard)) {
180 shards_pending_queue_.push(shard);
181 shards_pending_set_.set(shard);
182 sem_post(&sem_);
183 }
184 }
185 }
186
187 ~Reader() {
188 done_.store(true);
189 sem_post(&sem_);
190 thread_.join();
191 }
192
193 private:
194 char pad1[128] __attribute__((__unused__));
195 std::vector<ShardState>* shard_states_;
196 rocksdb::DB* db_;
197 rocksdb::port::Thread thread_;
198 sem_t sem_;
199 std::mutex queue_mutex_;
200 std::bitset<MAX_SHARDS + 1> shards_pending_set_;
201 std::queue<uint64_t> shards_pending_queue_;
202 std::atomic<bool> done_{false};
203 char pad2[128] __attribute__((__unused__));
204 };
205
206 struct Writer {
207 explicit Writer(std::vector<ShardState>* shard_states, rocksdb::DB* db)
208 : shard_states_(shard_states), db_(db) {}
209
210 void start() { thread_ = port::Thread(&Writer::run, this); }
211
212 void run() {
213 std::queue<std::chrono::steady_clock::time_point> workq;
214 std::chrono::steady_clock::time_point deadline(
215 std::chrono::steady_clock::now() +
216 std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime)));
217 std::vector<uint64_t> my_shards;
218 for (int i = 1; i <= FLAGS_shards; ++i) {
219 if ((*shard_states_)[i].writer == this) {
220 my_shards.push_back(i);
221 }
222 }
223
224 std::mt19937 rng{std::random_device()()};
225 std::uniform_int_distribution<int> shard_dist(
226 0, static_cast<int>(my_shards.size()) - 1);
227 std::string value(FLAGS_value_size, '*');
228
229 while (1) {
230 auto now = std::chrono::steady_clock::now();
231 if (FLAGS_runtime >= 0 && now >= deadline) {
232 break;
233 }
234 if (workq.empty()) {
235 for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) {
236 std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate);
237 workq.push(now + offset);
238 }
239 }
240 while (!workq.empty() && workq.front() < now) {
241 workq.pop();
242 uint64_t shard = my_shards[shard_dist(rng)];
243 ShardState& state = (*shard_states_)[shard];
244 uint64_t seqno = state.last_written.load() + 1;
245 Key key(shard, seqno);
246 // fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno);
247 rocksdb::Status status =
248 db_->Put(rocksdb::WriteOptions(),
249 rocksdb::Slice((const char*)&key, sizeof(key)),
250 rocksdb::Slice(value));
251 assert(status.ok());
252 state.last_written.store(seqno);
253 state.reader->onWrite(shard);
254 ++::stats.written;
255 }
256 std::this_thread::sleep_for(std::chrono::milliseconds(1));
257 }
258 // fprintf(stderr, "Writer done\n");
259 }
260
261 ~Writer() { thread_.join(); }
262
263 private:
264 char pad1[128] __attribute__((__unused__));
265 std::vector<ShardState>* shard_states_;
266 rocksdb::DB* db_;
267 rocksdb::port::Thread thread_;
268 char pad2[128] __attribute__((__unused__));
269 };
270
271 struct StatsThread {
272 explicit StatsThread(rocksdb::DB* db)
273 : db_(db), thread_(&StatsThread::run, this) {}
274
275 void run() {
276 // using namespace std::chrono;
277 auto tstart = std::chrono::steady_clock::now(), tlast = tstart;
278 uint64_t wlast = 0, rlast = 0;
279 while (!done_.load()) {
280 {
281 std::unique_lock<std::mutex> lock(cvm_);
282 cv_.wait_for(lock, std::chrono::seconds(1));
283 }
284 auto now = std::chrono::steady_clock::now();
285 double elapsed =
286 std::chrono::duration_cast<std::chrono::duration<double> >(
287 now - tlast).count();
288 uint64_t w = ::stats.written.load();
289 uint64_t r = ::stats.read.load();
290 fprintf(stderr,
291 "%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | "
292 "r/s %10.0f | cache misses %10ld\n",
293 db_->GetEnv()->TimeToString(time(nullptr)).c_str(),
294 std::chrono::duration_cast<std::chrono::seconds>(now - tstart)
295 .count(),
296 w, (w - wlast) / elapsed, r, (r - rlast) / elapsed,
297 ::stats.cache_misses.load());
298 wlast = w;
299 rlast = r;
300 tlast = now;
301 }
302 }
303
304 ~StatsThread() {
305 {
306 std::lock_guard<std::mutex> guard(cvm_);
307 done_.store(true);
308 }
309 cv_.notify_all();
310 thread_.join();
311 }
312
313 private:
314 rocksdb::DB* db_;
315 std::mutex cvm_;
316 std::condition_variable cv_;
317 rocksdb::port::Thread thread_;
318 std::atomic<bool> done_{false};
319 };
320
321 int main(int argc, char** argv) {
322 GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
323
324 std::mt19937 rng{std::random_device()()};
325 rocksdb::Status status;
326 std::string path = rocksdb::test::PerThreadDBPath("forward_iterator_test");
327 fprintf(stderr, "db path is %s\n", path.c_str());
328 rocksdb::Options options;
329 options.create_if_missing = true;
330 options.compression = rocksdb::CompressionType::kNoCompression;
331 options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone;
332 options.level0_slowdown_writes_trigger = 99999;
333 options.level0_stop_writes_trigger = 99999;
334 options.use_direct_io_for_flush_and_compaction = true;
335 options.write_buffer_size = FLAGS_memtable_size;
336 rocksdb::BlockBasedTableOptions table_options;
337 table_options.block_cache = rocksdb::NewLRUCache(FLAGS_block_cache_size);
338 table_options.block_size = FLAGS_block_size;
339 options.table_factory.reset(
340 rocksdb::NewBlockBasedTableFactory(table_options));
341
342 status = rocksdb::DestroyDB(path, options);
343 assert(status.ok());
344 rocksdb::DB* db_raw;
345 status = rocksdb::DB::Open(options, path, &db_raw);
346 assert(status.ok());
347 std::unique_ptr<rocksdb::DB> db(db_raw);
348
349 std::vector<ShardState> shard_states(FLAGS_shards + 1);
350 std::deque<Reader> readers;
351 while (static_cast<int>(readers.size()) < FLAGS_readers) {
352 readers.emplace_back(&shard_states, db_raw);
353 }
354 std::deque<Writer> writers;
355 while (static_cast<int>(writers.size()) < FLAGS_writers) {
356 writers.emplace_back(&shard_states, db_raw);
357 }
358
359 // Each shard gets a random reader and random writer assigned to it
360 for (int i = 1; i <= FLAGS_shards; ++i) {
361 std::uniform_int_distribution<int> reader_dist(0, FLAGS_readers - 1);
362 std::uniform_int_distribution<int> writer_dist(0, FLAGS_writers - 1);
363 shard_states[i].reader = &readers[reader_dist(rng)];
364 shard_states[i].writer = &writers[writer_dist(rng)];
365 }
366
367 StatsThread stats_thread(db_raw);
368 for (Writer& w : writers) {
369 w.start();
370 }
371
372 writers.clear();
373 readers.clear();
374 }
375 #endif // !defined(GFLAGS) || defined(ROCKSDB_LITE)