]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/persistent_cache/persistent_cache_bench.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / persistent_cache / persistent_cache_bench.cc
1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 #ifndef ROCKSDB_LITE
7
8 #ifndef GFLAGS
9 #include <cstdio>
10 int main() { fprintf(stderr, "Please install gflags to run tools\n"); }
11 #else
12 #include <atomic>
13 #include <functional>
14 #include <memory>
15 #include <sstream>
16 #include <unordered_map>
17
18 #include "monitoring/histogram.h"
19 #include "port/port.h"
20 #include "rocksdb/env.h"
21 #include "rocksdb/system_clock.h"
22 #include "table/block_based/block_builder.h"
23 #include "util/gflags_compat.h"
24 #include "util/mutexlock.h"
25 #include "util/stop_watch.h"
26 #include "utilities/persistent_cache/block_cache_tier.h"
27 #include "utilities/persistent_cache/persistent_cache_tier.h"
28 #include "utilities/persistent_cache/volatile_tier_impl.h"
29
30 DEFINE_int32(nsec, 10, "nsec");
31 DEFINE_int32(nthread_write, 1, "Insert threads");
32 DEFINE_int32(nthread_read, 1, "Lookup threads");
33 DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile");
34 DEFINE_string(log_path, "/tmp/log", "Path for the log file");
35 DEFINE_uint64(cache_size, std::numeric_limits<uint64_t>::max(), "Cache size");
36 DEFINE_int32(iosize, 4 * 1024, "Read IO size");
37 DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size");
38 DEFINE_int32(writer_qdepth, 1, "File writer qdepth");
39 DEFINE_bool(enable_pipelined_writes, false, "Enable async writes");
40 DEFINE_string(cache_type, "block_cache",
41 "Cache type. (block_cache, volatile, tiered)");
42 DEFINE_bool(benchmark, false, "Benchmark mode");
43 DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier.");
44
45 namespace ROCKSDB_NAMESPACE {
46
47 std::unique_ptr<PersistentCacheTier> NewVolatileCache() {
48 assert(FLAGS_cache_size != std::numeric_limits<uint64_t>::max());
49 std::unique_ptr<PersistentCacheTier> pcache(
50 new VolatileCacheTier(FLAGS_cache_size));
51 return pcache;
52 }
53
54 std::unique_ptr<PersistentCacheTier> NewBlockCache() {
55 std::shared_ptr<Logger> log;
56 if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
57 fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
58 return nullptr;
59 }
60
61 PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log);
62 opt.writer_dispatch_size = FLAGS_writer_iosize;
63 opt.writer_qdepth = FLAGS_writer_qdepth;
64 opt.pipeline_writes = FLAGS_enable_pipelined_writes;
65 opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
66 std::unique_ptr<PersistentCacheTier> cache(new BlockCacheTier(opt));
67 Status status = cache->Open();
68 return cache;
69 }
70
71 // create a new cache tier
72 // construct a tiered RAM+Block cache
73 std::unique_ptr<PersistentTieredCache> NewTieredCache(
74 const size_t mem_size, const PersistentCacheConfig& opt) {
75 std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache());
76 // create primary tier
77 assert(mem_size);
78 auto pcache =
79 std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(mem_size));
80 tcache->AddTier(pcache);
81 // create secondary tier
82 auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt));
83 tcache->AddTier(scache);
84
85 Status s = tcache->Open();
86 assert(s.ok());
87 return tcache;
88 }
89
90 std::unique_ptr<PersistentTieredCache> NewTieredCache() {
91 std::shared_ptr<Logger> log;
92 if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
93 fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
94 abort();
95 }
96
97 auto pct = FLAGS_volatile_cache_pct / static_cast<double>(100);
98 PersistentCacheConfig opt(Env::Default(), FLAGS_path,
99 (1 - pct) * FLAGS_cache_size, log);
100 opt.writer_dispatch_size = FLAGS_writer_iosize;
101 opt.writer_qdepth = FLAGS_writer_qdepth;
102 opt.pipeline_writes = FLAGS_enable_pipelined_writes;
103 opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
104 return NewTieredCache(FLAGS_cache_size * pct, opt);
105 }
106
107 //
108 // Benchmark driver
109 //
110 class CacheTierBenchmark {
111 public:
112 explicit CacheTierBenchmark(std::shared_ptr<PersistentCacheTier>&& cache)
113 : cache_(cache) {
114 if (FLAGS_nthread_read) {
115 fprintf(stdout, "Pre-populating\n");
116 Prepop();
117 fprintf(stdout, "Pre-population completed\n");
118 }
119
120 stats_.Clear();
121
122 // Start IO threads
123 std::list<port::Thread> threads;
124 Spawn(FLAGS_nthread_write, &threads,
125 std::bind(&CacheTierBenchmark::Write, this));
126 Spawn(FLAGS_nthread_read, &threads,
127 std::bind(&CacheTierBenchmark::Read, this));
128
129 // Wait till FLAGS_nsec and then signal to quit
130 StopWatchNano t(SystemClock::Default().get(), /*auto_start=*/true);
131 size_t sec = t.ElapsedNanos() / 1000000000ULL;
132 while (!quit_) {
133 sec = t.ElapsedNanos() / 1000000000ULL;
134 quit_ = sec > size_t(FLAGS_nsec);
135 /* sleep override */ sleep(1);
136 }
137
138 // Wait for threads to exit
139 Join(&threads);
140 // Print stats
141 PrintStats(sec);
142 // Close the cache
143 cache_->TEST_Flush();
144 cache_->Close();
145 }
146
147 private:
148 void PrintStats(const size_t sec) {
149 std::ostringstream msg;
150 msg << "Test stats" << std::endl
151 << "* Elapsed: " << sec << " s" << std::endl
152 << "* Write Latency:" << std::endl
153 << stats_.write_latency_.ToString() << std::endl
154 << "* Read Latency:" << std::endl
155 << stats_.read_latency_.ToString() << std::endl
156 << "* Bytes written:" << std::endl
157 << stats_.bytes_written_.ToString() << std::endl
158 << "* Bytes read:" << std::endl
159 << stats_.bytes_read_.ToString() << std::endl
160 << "Cache stats:" << std::endl
161 << cache_->PrintStats() << std::endl;
162 fprintf(stderr, "%s\n", msg.str().c_str());
163 }
164
165 //
166 // Insert implementation and corresponding helper functions
167 //
168 void Prepop() {
169 for (uint64_t i = 0; i < 1024 * 1024; ++i) {
170 InsertKey(i);
171 insert_key_limit_++;
172 read_key_limit_++;
173 }
174
175 // Wait until data is flushed
176 cache_->TEST_Flush();
177 // warmup the cache
178 for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) {
179 }
180 }
181
182 void Write() {
183 while (!quit_) {
184 InsertKey(insert_key_limit_++);
185 }
186 }
187
188 void InsertKey(const uint64_t key) {
189 // construct key
190 uint64_t k[3];
191 Slice block_key = FillKey(k, key);
192
193 // construct value
194 auto block = NewBlock(key);
195
196 // insert
197 StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true);
198 while (true) {
199 Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize);
200 if (status.ok()) {
201 break;
202 }
203
204 // transient error is possible if we run without pipelining
205 assert(!FLAGS_enable_pipelined_writes);
206 }
207
208 // adjust stats
209 const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
210 stats_.write_latency_.Add(elapsed_micro);
211 stats_.bytes_written_.Add(FLAGS_iosize);
212 }
213
214 //
215 // Read implementation
216 //
217 void Read() {
218 while (!quit_) {
219 ReadKey(random() % read_key_limit_);
220 }
221 }
222
223 void ReadKey(const uint64_t val) {
224 // construct key
225 uint64_t k[3];
226 Slice key = FillKey(k, val);
227
228 // Lookup in cache
229 StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true);
230 std::unique_ptr<char[]> block;
231 size_t size;
232 Status status = cache_->Lookup(key, &block, &size);
233 if (!status.ok()) {
234 fprintf(stderr, "%s\n", status.ToString().c_str());
235 }
236 assert(status.ok());
237 assert(size == (size_t)FLAGS_iosize);
238
239 // adjust stats
240 const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
241 stats_.read_latency_.Add(elapsed_micro);
242 stats_.bytes_read_.Add(FLAGS_iosize);
243
244 // verify content
245 if (!FLAGS_benchmark) {
246 auto expected_block = NewBlock(val);
247 assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0);
248 }
249 }
250
251 // create data for a key by filling with a certain pattern
252 std::unique_ptr<char[]> NewBlock(const uint64_t val) {
253 std::unique_ptr<char[]> data(new char[FLAGS_iosize]);
254 memset(data.get(), val % 255, FLAGS_iosize);
255 return data;
256 }
257
258 // spawn threads
259 void Spawn(const size_t n, std::list<port::Thread>* threads,
260 const std::function<void()>& fn) {
261 for (size_t i = 0; i < n; ++i) {
262 threads->emplace_back(fn);
263 }
264 }
265
266 // join threads
267 void Join(std::list<port::Thread>* threads) {
268 for (auto& th : *threads) {
269 th.join();
270 }
271 }
272
273 // construct key
274 Slice FillKey(uint64_t (&k)[3], const uint64_t val) {
275 k[0] = k[1] = 0;
276 k[2] = val;
277 void* p = static_cast<void*>(&k);
278 return Slice(static_cast<char*>(p), sizeof(k));
279 }
280
281 // benchmark stats
282 struct Stats {
283 void Clear() {
284 bytes_written_.Clear();
285 bytes_read_.Clear();
286 read_latency_.Clear();
287 write_latency_.Clear();
288 }
289
290 HistogramImpl bytes_written_;
291 HistogramImpl bytes_read_;
292 HistogramImpl read_latency_;
293 HistogramImpl write_latency_;
294 };
295
296 std::shared_ptr<PersistentCacheTier> cache_; // cache implementation
297 std::atomic<uint64_t> insert_key_limit_{0}; // data inserted upto
298 std::atomic<uint64_t> read_key_limit_{0}; // data can be read safely upto
299 bool quit_ = false; // Quit thread ?
300 mutable Stats stats_; // Stats
301 };
302
303 } // namespace ROCKSDB_NAMESPACE
304
305 //
306 // main
307 //
308 int main(int argc, char** argv) {
309 GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") +
310 std::string(argv[0]) + " [OPTIONS]...");
311 GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false);
312
313 std::ostringstream msg;
314 msg << "Config" << std::endl
315 << "======" << std::endl
316 << "* nsec=" << FLAGS_nsec << std::endl
317 << "* nthread_write=" << FLAGS_nthread_write << std::endl
318 << "* path=" << FLAGS_path << std::endl
319 << "* cache_size=" << FLAGS_cache_size << std::endl
320 << "* iosize=" << FLAGS_iosize << std::endl
321 << "* writer_iosize=" << FLAGS_writer_iosize << std::endl
322 << "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl
323 << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes
324 << std::endl
325 << "* cache_type=" << FLAGS_cache_type << std::endl
326 << "* benchmark=" << FLAGS_benchmark << std::endl
327 << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl;
328
329 fprintf(stderr, "%s\n", msg.str().c_str());
330
331 std::shared_ptr<ROCKSDB_NAMESPACE::PersistentCacheTier> cache;
332 if (FLAGS_cache_type == "block_cache") {
333 fprintf(stderr, "Using block cache implementation\n");
334 cache = ROCKSDB_NAMESPACE::NewBlockCache();
335 } else if (FLAGS_cache_type == "volatile") {
336 fprintf(stderr, "Using volatile cache implementation\n");
337 cache = ROCKSDB_NAMESPACE::NewVolatileCache();
338 } else if (FLAGS_cache_type == "tiered") {
339 fprintf(stderr, "Using tiered cache implementation\n");
340 cache = ROCKSDB_NAMESPACE::NewTieredCache();
341 } else {
342 fprintf(stderr, "Unknown option for cache\n");
343 }
344
345 assert(cache);
346 if (!cache) {
347 fprintf(stderr, "Error creating cache\n");
348 abort();
349 }
350
351 std::unique_ptr<ROCKSDB_NAMESPACE::CacheTierBenchmark> benchmark(
352 new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache)));
353
354 return 0;
355 }
356 #endif // #ifndef GFLAGS
357 #else
358 int main(int, char**) { return 0; }
359 #endif