]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2013, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #ifndef GFLAGS | |
9 | #include <cstdio> | |
10 | int main() { fprintf(stderr, "Please install gflags to run tools\n"); } | |
11 | #else | |
7c673cae FG |
12 | #include <atomic> |
13 | #include <functional> | |
14 | #include <memory> | |
15 | #include <sstream> | |
16 | #include <unordered_map> | |
17 | ||
7c673cae FG |
18 | #include "monitoring/histogram.h" |
19 | #include "port/port.h" | |
1e59de90 TL |
20 | #include "rocksdb/env.h" |
21 | #include "rocksdb/system_clock.h" | |
f67539c2 | 22 | #include "table/block_based/block_builder.h" |
11fdf7f2 | 23 | #include "util/gflags_compat.h" |
7c673cae FG |
24 | #include "util/mutexlock.h" |
25 | #include "util/stop_watch.h" | |
1e59de90 TL |
26 | #include "utilities/persistent_cache/block_cache_tier.h" |
27 | #include "utilities/persistent_cache/persistent_cache_tier.h" | |
28 | #include "utilities/persistent_cache/volatile_tier_impl.h" | |
7c673cae FG |
29 | |
30 | DEFINE_int32(nsec, 10, "nsec"); | |
31 | DEFINE_int32(nthread_write, 1, "Insert threads"); | |
32 | DEFINE_int32(nthread_read, 1, "Lookup threads"); | |
33 | DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile"); | |
34 | DEFINE_string(log_path, "/tmp/log", "Path for the log file"); | |
35 | DEFINE_uint64(cache_size, std::numeric_limits<uint64_t>::max(), "Cache size"); | |
36 | DEFINE_int32(iosize, 4 * 1024, "Read IO size"); | |
37 | DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size"); | |
38 | DEFINE_int32(writer_qdepth, 1, "File writer qdepth"); | |
39 | DEFINE_bool(enable_pipelined_writes, false, "Enable async writes"); | |
40 | DEFINE_string(cache_type, "block_cache", | |
41 | "Cache type. (block_cache, volatile, tiered)"); | |
42 | DEFINE_bool(benchmark, false, "Benchmark mode"); | |
43 | DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier."); | |
44 | ||
f67539c2 | 45 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
46 | |
47 | std::unique_ptr<PersistentCacheTier> NewVolatileCache() { | |
48 | assert(FLAGS_cache_size != std::numeric_limits<uint64_t>::max()); | |
49 | std::unique_ptr<PersistentCacheTier> pcache( | |
50 | new VolatileCacheTier(FLAGS_cache_size)); | |
51 | return pcache; | |
52 | } | |
53 | ||
54 | std::unique_ptr<PersistentCacheTier> NewBlockCache() { | |
55 | std::shared_ptr<Logger> log; | |
56 | if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) { | |
57 | fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str()); | |
58 | return nullptr; | |
59 | } | |
60 | ||
61 | PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log); | |
62 | opt.writer_dispatch_size = FLAGS_writer_iosize; | |
63 | opt.writer_qdepth = FLAGS_writer_qdepth; | |
64 | opt.pipeline_writes = FLAGS_enable_pipelined_writes; | |
65 | opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max(); | |
66 | std::unique_ptr<PersistentCacheTier> cache(new BlockCacheTier(opt)); | |
67 | Status status = cache->Open(); | |
68 | return cache; | |
69 | } | |
70 | ||
71 | // create a new cache tier | |
72 | // construct a tiered RAM+Block cache | |
73 | std::unique_ptr<PersistentTieredCache> NewTieredCache( | |
74 | const size_t mem_size, const PersistentCacheConfig& opt) { | |
75 | std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache()); | |
76 | // create primary tier | |
77 | assert(mem_size); | |
78 | auto pcache = | |
79 | std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(mem_size)); | |
80 | tcache->AddTier(pcache); | |
81 | // create secondary tier | |
82 | auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt)); | |
83 | tcache->AddTier(scache); | |
84 | ||
85 | Status s = tcache->Open(); | |
86 | assert(s.ok()); | |
87 | return tcache; | |
88 | } | |
89 | ||
90 | std::unique_ptr<PersistentTieredCache> NewTieredCache() { | |
91 | std::shared_ptr<Logger> log; | |
92 | if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) { | |
93 | fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str()); | |
94 | abort(); | |
95 | } | |
96 | ||
97 | auto pct = FLAGS_volatile_cache_pct / static_cast<double>(100); | |
98 | PersistentCacheConfig opt(Env::Default(), FLAGS_path, | |
99 | (1 - pct) * FLAGS_cache_size, log); | |
100 | opt.writer_dispatch_size = FLAGS_writer_iosize; | |
101 | opt.writer_qdepth = FLAGS_writer_qdepth; | |
102 | opt.pipeline_writes = FLAGS_enable_pipelined_writes; | |
103 | opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max(); | |
104 | return NewTieredCache(FLAGS_cache_size * pct, opt); | |
105 | } | |
106 | ||
107 | // | |
108 | // Benchmark driver | |
109 | // | |
110 | class CacheTierBenchmark { | |
111 | public: | |
112 | explicit CacheTierBenchmark(std::shared_ptr<PersistentCacheTier>&& cache) | |
113 | : cache_(cache) { | |
114 | if (FLAGS_nthread_read) { | |
115 | fprintf(stdout, "Pre-populating\n"); | |
116 | Prepop(); | |
117 | fprintf(stdout, "Pre-population completed\n"); | |
118 | } | |
119 | ||
120 | stats_.Clear(); | |
121 | ||
122 | // Start IO threads | |
123 | std::list<port::Thread> threads; | |
124 | Spawn(FLAGS_nthread_write, &threads, | |
125 | std::bind(&CacheTierBenchmark::Write, this)); | |
126 | Spawn(FLAGS_nthread_read, &threads, | |
127 | std::bind(&CacheTierBenchmark::Read, this)); | |
128 | ||
129 | // Wait till FLAGS_nsec and then signal to quit | |
1e59de90 | 130 | StopWatchNano t(SystemClock::Default().get(), /*auto_start=*/true); |
7c673cae FG |
131 | size_t sec = t.ElapsedNanos() / 1000000000ULL; |
132 | while (!quit_) { | |
133 | sec = t.ElapsedNanos() / 1000000000ULL; | |
134 | quit_ = sec > size_t(FLAGS_nsec); | |
135 | /* sleep override */ sleep(1); | |
136 | } | |
137 | ||
138 | // Wait for threads to exit | |
139 | Join(&threads); | |
140 | // Print stats | |
141 | PrintStats(sec); | |
142 | // Close the cache | |
143 | cache_->TEST_Flush(); | |
144 | cache_->Close(); | |
145 | } | |
146 | ||
147 | private: | |
148 | void PrintStats(const size_t sec) { | |
149 | std::ostringstream msg; | |
150 | msg << "Test stats" << std::endl | |
151 | << "* Elapsed: " << sec << " s" << std::endl | |
152 | << "* Write Latency:" << std::endl | |
153 | << stats_.write_latency_.ToString() << std::endl | |
154 | << "* Read Latency:" << std::endl | |
155 | << stats_.read_latency_.ToString() << std::endl | |
156 | << "* Bytes written:" << std::endl | |
157 | << stats_.bytes_written_.ToString() << std::endl | |
158 | << "* Bytes read:" << std::endl | |
159 | << stats_.bytes_read_.ToString() << std::endl | |
160 | << "Cache stats:" << std::endl | |
161 | << cache_->PrintStats() << std::endl; | |
162 | fprintf(stderr, "%s\n", msg.str().c_str()); | |
163 | } | |
164 | ||
165 | // | |
166 | // Insert implementation and corresponding helper functions | |
167 | // | |
168 | void Prepop() { | |
169 | for (uint64_t i = 0; i < 1024 * 1024; ++i) { | |
170 | InsertKey(i); | |
171 | insert_key_limit_++; | |
172 | read_key_limit_++; | |
173 | } | |
174 | ||
175 | // Wait until data is flushed | |
176 | cache_->TEST_Flush(); | |
177 | // warmup the cache | |
178 | for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) { | |
179 | } | |
180 | } | |
181 | ||
182 | void Write() { | |
183 | while (!quit_) { | |
184 | InsertKey(insert_key_limit_++); | |
185 | } | |
186 | } | |
187 | ||
188 | void InsertKey(const uint64_t key) { | |
189 | // construct key | |
190 | uint64_t k[3]; | |
191 | Slice block_key = FillKey(k, key); | |
192 | ||
193 | // construct value | |
194 | auto block = NewBlock(key); | |
195 | ||
196 | // insert | |
1e59de90 | 197 | StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true); |
7c673cae FG |
198 | while (true) { |
199 | Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize); | |
200 | if (status.ok()) { | |
201 | break; | |
202 | } | |
203 | ||
204 | // transient error is possible if we run without pipelining | |
205 | assert(!FLAGS_enable_pipelined_writes); | |
206 | } | |
207 | ||
208 | // adjust stats | |
209 | const size_t elapsed_micro = timer.ElapsedNanos() / 1000; | |
210 | stats_.write_latency_.Add(elapsed_micro); | |
211 | stats_.bytes_written_.Add(FLAGS_iosize); | |
212 | } | |
213 | ||
214 | // | |
215 | // Read implementation | |
216 | // | |
217 | void Read() { | |
218 | while (!quit_) { | |
219 | ReadKey(random() % read_key_limit_); | |
220 | } | |
221 | } | |
222 | ||
223 | void ReadKey(const uint64_t val) { | |
224 | // construct key | |
225 | uint64_t k[3]; | |
226 | Slice key = FillKey(k, val); | |
227 | ||
228 | // Lookup in cache | |
1e59de90 | 229 | StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true); |
7c673cae FG |
230 | std::unique_ptr<char[]> block; |
231 | size_t size; | |
232 | Status status = cache_->Lookup(key, &block, &size); | |
233 | if (!status.ok()) { | |
234 | fprintf(stderr, "%s\n", status.ToString().c_str()); | |
235 | } | |
236 | assert(status.ok()); | |
1e59de90 | 237 | assert(size == (size_t)FLAGS_iosize); |
7c673cae FG |
238 | |
239 | // adjust stats | |
240 | const size_t elapsed_micro = timer.ElapsedNanos() / 1000; | |
241 | stats_.read_latency_.Add(elapsed_micro); | |
242 | stats_.bytes_read_.Add(FLAGS_iosize); | |
243 | ||
244 | // verify content | |
245 | if (!FLAGS_benchmark) { | |
246 | auto expected_block = NewBlock(val); | |
247 | assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0); | |
248 | } | |
249 | } | |
250 | ||
251 | // create data for a key by filling with a certain pattern | |
252 | std::unique_ptr<char[]> NewBlock(const uint64_t val) { | |
494da23a | 253 | std::unique_ptr<char[]> data(new char[FLAGS_iosize]); |
7c673cae FG |
254 | memset(data.get(), val % 255, FLAGS_iosize); |
255 | return data; | |
256 | } | |
257 | ||
258 | // spawn threads | |
259 | void Spawn(const size_t n, std::list<port::Thread>* threads, | |
260 | const std::function<void()>& fn) { | |
261 | for (size_t i = 0; i < n; ++i) { | |
262 | threads->emplace_back(fn); | |
263 | } | |
264 | } | |
265 | ||
266 | // join threads | |
267 | void Join(std::list<port::Thread>* threads) { | |
268 | for (auto& th : *threads) { | |
269 | th.join(); | |
270 | } | |
271 | } | |
272 | ||
273 | // construct key | |
274 | Slice FillKey(uint64_t (&k)[3], const uint64_t val) { | |
275 | k[0] = k[1] = 0; | |
276 | k[2] = val; | |
277 | void* p = static_cast<void*>(&k); | |
278 | return Slice(static_cast<char*>(p), sizeof(k)); | |
279 | } | |
280 | ||
281 | // benchmark stats | |
282 | struct Stats { | |
283 | void Clear() { | |
284 | bytes_written_.Clear(); | |
285 | bytes_read_.Clear(); | |
286 | read_latency_.Clear(); | |
287 | write_latency_.Clear(); | |
288 | } | |
289 | ||
290 | HistogramImpl bytes_written_; | |
291 | HistogramImpl bytes_read_; | |
292 | HistogramImpl read_latency_; | |
293 | HistogramImpl write_latency_; | |
294 | }; | |
295 | ||
296 | std::shared_ptr<PersistentCacheTier> cache_; // cache implementation | |
297 | std::atomic<uint64_t> insert_key_limit_{0}; // data inserted upto | |
298 | std::atomic<uint64_t> read_key_limit_{0}; // data can be read safely upto | |
299 | bool quit_ = false; // Quit thread ? | |
300 | mutable Stats stats_; // Stats | |
301 | }; | |
302 | ||
f67539c2 | 303 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
304 | |
305 | // | |
306 | // main | |
307 | // | |
308 | int main(int argc, char** argv) { | |
11fdf7f2 TL |
309 | GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") + |
310 | std::string(argv[0]) + " [OPTIONS]..."); | |
311 | GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false); | |
7c673cae FG |
312 | |
313 | std::ostringstream msg; | |
314 | msg << "Config" << std::endl | |
315 | << "======" << std::endl | |
316 | << "* nsec=" << FLAGS_nsec << std::endl | |
317 | << "* nthread_write=" << FLAGS_nthread_write << std::endl | |
318 | << "* path=" << FLAGS_path << std::endl | |
319 | << "* cache_size=" << FLAGS_cache_size << std::endl | |
320 | << "* iosize=" << FLAGS_iosize << std::endl | |
321 | << "* writer_iosize=" << FLAGS_writer_iosize << std::endl | |
322 | << "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl | |
323 | << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes | |
324 | << std::endl | |
325 | << "* cache_type=" << FLAGS_cache_type << std::endl | |
326 | << "* benchmark=" << FLAGS_benchmark << std::endl | |
327 | << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl; | |
328 | ||
329 | fprintf(stderr, "%s\n", msg.str().c_str()); | |
330 | ||
f67539c2 | 331 | std::shared_ptr<ROCKSDB_NAMESPACE::PersistentCacheTier> cache; |
7c673cae FG |
332 | if (FLAGS_cache_type == "block_cache") { |
333 | fprintf(stderr, "Using block cache implementation\n"); | |
f67539c2 | 334 | cache = ROCKSDB_NAMESPACE::NewBlockCache(); |
7c673cae FG |
335 | } else if (FLAGS_cache_type == "volatile") { |
336 | fprintf(stderr, "Using volatile cache implementation\n"); | |
f67539c2 | 337 | cache = ROCKSDB_NAMESPACE::NewVolatileCache(); |
7c673cae FG |
338 | } else if (FLAGS_cache_type == "tiered") { |
339 | fprintf(stderr, "Using tiered cache implementation\n"); | |
f67539c2 | 340 | cache = ROCKSDB_NAMESPACE::NewTieredCache(); |
7c673cae FG |
341 | } else { |
342 | fprintf(stderr, "Unknown option for cache\n"); | |
343 | } | |
344 | ||
345 | assert(cache); | |
346 | if (!cache) { | |
347 | fprintf(stderr, "Error creating cache\n"); | |
348 | abort(); | |
349 | } | |
350 | ||
f67539c2 TL |
351 | std::unique_ptr<ROCKSDB_NAMESPACE::CacheTierBenchmark> benchmark( |
352 | new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache))); | |
7c673cae FG |
353 | |
354 | return 0; | |
355 | } | |
356 | #endif // #ifndef GFLAGS | |
357 | #else | |
358 | int main(int, char**) { return 0; } | |
359 | #endif |