]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/persistent_cache/persistent_cache_bench.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / utilities / persistent_cache / persistent_cache_bench.cc
CommitLineData
7c673cae 1// Copyright (c) 2013, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6#ifndef ROCKSDB_LITE
7
8#ifndef GFLAGS
9#include <cstdio>
10int main() { fprintf(stderr, "Please install gflags to run tools\n"); }
11#else
7c673cae
FG
12#include <atomic>
13#include <functional>
14#include <memory>
15#include <sstream>
16#include <unordered_map>
17
18#include "rocksdb/env.h"
19
20#include "utilities/persistent_cache/block_cache_tier.h"
21#include "utilities/persistent_cache/persistent_cache_tier.h"
22#include "utilities/persistent_cache/volatile_tier_impl.h"
23
24#include "monitoring/histogram.h"
25#include "port/port.h"
f67539c2 26#include "table/block_based/block_builder.h"
11fdf7f2 27#include "util/gflags_compat.h"
7c673cae
FG
28#include "util/mutexlock.h"
29#include "util/stop_watch.h"
30
31DEFINE_int32(nsec, 10, "nsec");
32DEFINE_int32(nthread_write, 1, "Insert threads");
33DEFINE_int32(nthread_read, 1, "Lookup threads");
34DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile");
35DEFINE_string(log_path, "/tmp/log", "Path for the log file");
36DEFINE_uint64(cache_size, std::numeric_limits<uint64_t>::max(), "Cache size");
37DEFINE_int32(iosize, 4 * 1024, "Read IO size");
38DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size");
39DEFINE_int32(writer_qdepth, 1, "File writer qdepth");
40DEFINE_bool(enable_pipelined_writes, false, "Enable async writes");
41DEFINE_string(cache_type, "block_cache",
42 "Cache type. (block_cache, volatile, tiered)");
43DEFINE_bool(benchmark, false, "Benchmark mode");
44DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier.");
45
f67539c2 46namespace ROCKSDB_NAMESPACE {
7c673cae
FG
47
48std::unique_ptr<PersistentCacheTier> NewVolatileCache() {
49 assert(FLAGS_cache_size != std::numeric_limits<uint64_t>::max());
50 std::unique_ptr<PersistentCacheTier> pcache(
51 new VolatileCacheTier(FLAGS_cache_size));
52 return pcache;
53}
54
55std::unique_ptr<PersistentCacheTier> NewBlockCache() {
56 std::shared_ptr<Logger> log;
57 if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
58 fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
59 return nullptr;
60 }
61
62 PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log);
63 opt.writer_dispatch_size = FLAGS_writer_iosize;
64 opt.writer_qdepth = FLAGS_writer_qdepth;
65 opt.pipeline_writes = FLAGS_enable_pipelined_writes;
66 opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
67 std::unique_ptr<PersistentCacheTier> cache(new BlockCacheTier(opt));
68 Status status = cache->Open();
69 return cache;
70}
71
72// create a new cache tier
73// construct a tiered RAM+Block cache
74std::unique_ptr<PersistentTieredCache> NewTieredCache(
75 const size_t mem_size, const PersistentCacheConfig& opt) {
76 std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache());
77 // create primary tier
78 assert(mem_size);
79 auto pcache =
80 std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(mem_size));
81 tcache->AddTier(pcache);
82 // create secondary tier
83 auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt));
84 tcache->AddTier(scache);
85
86 Status s = tcache->Open();
87 assert(s.ok());
88 return tcache;
89}
90
91std::unique_ptr<PersistentTieredCache> NewTieredCache() {
92 std::shared_ptr<Logger> log;
93 if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
94 fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
95 abort();
96 }
97
98 auto pct = FLAGS_volatile_cache_pct / static_cast<double>(100);
99 PersistentCacheConfig opt(Env::Default(), FLAGS_path,
100 (1 - pct) * FLAGS_cache_size, log);
101 opt.writer_dispatch_size = FLAGS_writer_iosize;
102 opt.writer_qdepth = FLAGS_writer_qdepth;
103 opt.pipeline_writes = FLAGS_enable_pipelined_writes;
104 opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
105 return NewTieredCache(FLAGS_cache_size * pct, opt);
106}
107
108//
109// Benchmark driver
110//
111class CacheTierBenchmark {
112 public:
113 explicit CacheTierBenchmark(std::shared_ptr<PersistentCacheTier>&& cache)
114 : cache_(cache) {
115 if (FLAGS_nthread_read) {
116 fprintf(stdout, "Pre-populating\n");
117 Prepop();
118 fprintf(stdout, "Pre-population completed\n");
119 }
120
121 stats_.Clear();
122
123 // Start IO threads
124 std::list<port::Thread> threads;
125 Spawn(FLAGS_nthread_write, &threads,
126 std::bind(&CacheTierBenchmark::Write, this));
127 Spawn(FLAGS_nthread_read, &threads,
128 std::bind(&CacheTierBenchmark::Read, this));
129
130 // Wait till FLAGS_nsec and then signal to quit
131 StopWatchNano t(Env::Default(), /*auto_start=*/true);
132 size_t sec = t.ElapsedNanos() / 1000000000ULL;
133 while (!quit_) {
134 sec = t.ElapsedNanos() / 1000000000ULL;
135 quit_ = sec > size_t(FLAGS_nsec);
136 /* sleep override */ sleep(1);
137 }
138
139 // Wait for threads to exit
140 Join(&threads);
141 // Print stats
142 PrintStats(sec);
143 // Close the cache
144 cache_->TEST_Flush();
145 cache_->Close();
146 }
147
148 private:
149 void PrintStats(const size_t sec) {
150 std::ostringstream msg;
151 msg << "Test stats" << std::endl
152 << "* Elapsed: " << sec << " s" << std::endl
153 << "* Write Latency:" << std::endl
154 << stats_.write_latency_.ToString() << std::endl
155 << "* Read Latency:" << std::endl
156 << stats_.read_latency_.ToString() << std::endl
157 << "* Bytes written:" << std::endl
158 << stats_.bytes_written_.ToString() << std::endl
159 << "* Bytes read:" << std::endl
160 << stats_.bytes_read_.ToString() << std::endl
161 << "Cache stats:" << std::endl
162 << cache_->PrintStats() << std::endl;
163 fprintf(stderr, "%s\n", msg.str().c_str());
164 }
165
166 //
167 // Insert implementation and corresponding helper functions
168 //
169 void Prepop() {
170 for (uint64_t i = 0; i < 1024 * 1024; ++i) {
171 InsertKey(i);
172 insert_key_limit_++;
173 read_key_limit_++;
174 }
175
176 // Wait until data is flushed
177 cache_->TEST_Flush();
178 // warmup the cache
179 for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) {
180 }
181 }
182
183 void Write() {
184 while (!quit_) {
185 InsertKey(insert_key_limit_++);
186 }
187 }
188
189 void InsertKey(const uint64_t key) {
190 // construct key
191 uint64_t k[3];
192 Slice block_key = FillKey(k, key);
193
194 // construct value
195 auto block = NewBlock(key);
196
197 // insert
198 StopWatchNano timer(Env::Default(), /*auto_start=*/true);
199 while (true) {
200 Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize);
201 if (status.ok()) {
202 break;
203 }
204
205 // transient error is possible if we run without pipelining
206 assert(!FLAGS_enable_pipelined_writes);
207 }
208
209 // adjust stats
210 const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
211 stats_.write_latency_.Add(elapsed_micro);
212 stats_.bytes_written_.Add(FLAGS_iosize);
213 }
214
215 //
216 // Read implementation
217 //
218 void Read() {
219 while (!quit_) {
220 ReadKey(random() % read_key_limit_);
221 }
222 }
223
224 void ReadKey(const uint64_t val) {
225 // construct key
226 uint64_t k[3];
227 Slice key = FillKey(k, val);
228
229 // Lookup in cache
230 StopWatchNano timer(Env::Default(), /*auto_start=*/true);
231 std::unique_ptr<char[]> block;
232 size_t size;
233 Status status = cache_->Lookup(key, &block, &size);
234 if (!status.ok()) {
235 fprintf(stderr, "%s\n", status.ToString().c_str());
236 }
237 assert(status.ok());
238 assert(size == (size_t) FLAGS_iosize);
239
240 // adjust stats
241 const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
242 stats_.read_latency_.Add(elapsed_micro);
243 stats_.bytes_read_.Add(FLAGS_iosize);
244
245 // verify content
246 if (!FLAGS_benchmark) {
247 auto expected_block = NewBlock(val);
248 assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0);
249 }
250 }
251
252 // create data for a key by filling with a certain pattern
253 std::unique_ptr<char[]> NewBlock(const uint64_t val) {
494da23a 254 std::unique_ptr<char[]> data(new char[FLAGS_iosize]);
7c673cae
FG
255 memset(data.get(), val % 255, FLAGS_iosize);
256 return data;
257 }
258
259 // spawn threads
260 void Spawn(const size_t n, std::list<port::Thread>* threads,
261 const std::function<void()>& fn) {
262 for (size_t i = 0; i < n; ++i) {
263 threads->emplace_back(fn);
264 }
265 }
266
267 // join threads
268 void Join(std::list<port::Thread>* threads) {
269 for (auto& th : *threads) {
270 th.join();
271 }
272 }
273
274 // construct key
275 Slice FillKey(uint64_t (&k)[3], const uint64_t val) {
276 k[0] = k[1] = 0;
277 k[2] = val;
278 void* p = static_cast<void*>(&k);
279 return Slice(static_cast<char*>(p), sizeof(k));
280 }
281
282 // benchmark stats
283 struct Stats {
284 void Clear() {
285 bytes_written_.Clear();
286 bytes_read_.Clear();
287 read_latency_.Clear();
288 write_latency_.Clear();
289 }
290
291 HistogramImpl bytes_written_;
292 HistogramImpl bytes_read_;
293 HistogramImpl read_latency_;
294 HistogramImpl write_latency_;
295 };
296
297 std::shared_ptr<PersistentCacheTier> cache_; // cache implementation
298 std::atomic<uint64_t> insert_key_limit_{0}; // data inserted upto
299 std::atomic<uint64_t> read_key_limit_{0}; // data can be read safely upto
300 bool quit_ = false; // Quit thread ?
301 mutable Stats stats_; // Stats
302};
303
f67539c2 304} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
305
306//
307// main
308//
309int main(int argc, char** argv) {
11fdf7f2
TL
310 GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") +
311 std::string(argv[0]) + " [OPTIONS]...");
312 GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false);
7c673cae
FG
313
314 std::ostringstream msg;
315 msg << "Config" << std::endl
316 << "======" << std::endl
317 << "* nsec=" << FLAGS_nsec << std::endl
318 << "* nthread_write=" << FLAGS_nthread_write << std::endl
319 << "* path=" << FLAGS_path << std::endl
320 << "* cache_size=" << FLAGS_cache_size << std::endl
321 << "* iosize=" << FLAGS_iosize << std::endl
322 << "* writer_iosize=" << FLAGS_writer_iosize << std::endl
323 << "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl
324 << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes
325 << std::endl
326 << "* cache_type=" << FLAGS_cache_type << std::endl
327 << "* benchmark=" << FLAGS_benchmark << std::endl
328 << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl;
329
330 fprintf(stderr, "%s\n", msg.str().c_str());
331
f67539c2 332 std::shared_ptr<ROCKSDB_NAMESPACE::PersistentCacheTier> cache;
7c673cae
FG
333 if (FLAGS_cache_type == "block_cache") {
334 fprintf(stderr, "Using block cache implementation\n");
f67539c2 335 cache = ROCKSDB_NAMESPACE::NewBlockCache();
7c673cae
FG
336 } else if (FLAGS_cache_type == "volatile") {
337 fprintf(stderr, "Using volatile cache implementation\n");
f67539c2 338 cache = ROCKSDB_NAMESPACE::NewVolatileCache();
7c673cae
FG
339 } else if (FLAGS_cache_type == "tiered") {
340 fprintf(stderr, "Using tiered cache implementation\n");
f67539c2 341 cache = ROCKSDB_NAMESPACE::NewTieredCache();
7c673cae
FG
342 } else {
343 fprintf(stderr, "Unknown option for cache\n");
344 }
345
346 assert(cache);
347 if (!cache) {
348 fprintf(stderr, "Error creating cache\n");
349 abort();
350 }
351
f67539c2
TL
352 std::unique_ptr<ROCKSDB_NAMESPACE::CacheTierBenchmark> benchmark(
353 new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache)));
7c673cae
FG
354
355 return 0;
356}
357#endif // #ifndef GFLAGS
358#else
359int main(int, char**) { return 0; }
360#endif