]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2013, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #ifndef GFLAGS | |
9 | #include <cstdio> | |
10 | int main() { fprintf(stderr, "Please install gflags to run tools\n"); } | |
11 | #else | |
7c673cae FG |
12 | #include <atomic> |
13 | #include <functional> | |
14 | #include <memory> | |
15 | #include <sstream> | |
16 | #include <unordered_map> | |
17 | ||
18 | #include "rocksdb/env.h" | |
19 | ||
20 | #include "utilities/persistent_cache/block_cache_tier.h" | |
21 | #include "utilities/persistent_cache/persistent_cache_tier.h" | |
22 | #include "utilities/persistent_cache/volatile_tier_impl.h" | |
23 | ||
24 | #include "monitoring/histogram.h" | |
25 | #include "port/port.h" | |
26 | #include "table/block_builder.h" | |
11fdf7f2 | 27 | #include "util/gflags_compat.h" |
7c673cae FG |
28 | #include "util/mutexlock.h" |
29 | #include "util/stop_watch.h" | |
30 | ||
31 | DEFINE_int32(nsec, 10, "nsec"); | |
32 | DEFINE_int32(nthread_write, 1, "Insert threads"); | |
33 | DEFINE_int32(nthread_read, 1, "Lookup threads"); | |
34 | DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile"); | |
35 | DEFINE_string(log_path, "/tmp/log", "Path for the log file"); | |
36 | DEFINE_uint64(cache_size, std::numeric_limits<uint64_t>::max(), "Cache size"); | |
37 | DEFINE_int32(iosize, 4 * 1024, "Read IO size"); | |
38 | DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size"); | |
39 | DEFINE_int32(writer_qdepth, 1, "File writer qdepth"); | |
40 | DEFINE_bool(enable_pipelined_writes, false, "Enable async writes"); | |
41 | DEFINE_string(cache_type, "block_cache", | |
42 | "Cache type. (block_cache, volatile, tiered)"); | |
43 | DEFINE_bool(benchmark, false, "Benchmark mode"); | |
44 | DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier."); | |
45 | ||
46 | namespace rocksdb { | |
47 | ||
48 | std::unique_ptr<PersistentCacheTier> NewVolatileCache() { | |
49 | assert(FLAGS_cache_size != std::numeric_limits<uint64_t>::max()); | |
50 | std::unique_ptr<PersistentCacheTier> pcache( | |
51 | new VolatileCacheTier(FLAGS_cache_size)); | |
52 | return pcache; | |
53 | } | |
54 | ||
55 | std::unique_ptr<PersistentCacheTier> NewBlockCache() { | |
56 | std::shared_ptr<Logger> log; | |
57 | if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) { | |
58 | fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str()); | |
59 | return nullptr; | |
60 | } | |
61 | ||
62 | PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log); | |
63 | opt.writer_dispatch_size = FLAGS_writer_iosize; | |
64 | opt.writer_qdepth = FLAGS_writer_qdepth; | |
65 | opt.pipeline_writes = FLAGS_enable_pipelined_writes; | |
66 | opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max(); | |
67 | std::unique_ptr<PersistentCacheTier> cache(new BlockCacheTier(opt)); | |
68 | Status status = cache->Open(); | |
69 | return cache; | |
70 | } | |
71 | ||
72 | // create a new cache tier | |
73 | // construct a tiered RAM+Block cache | |
74 | std::unique_ptr<PersistentTieredCache> NewTieredCache( | |
75 | const size_t mem_size, const PersistentCacheConfig& opt) { | |
76 | std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache()); | |
77 | // create primary tier | |
78 | assert(mem_size); | |
79 | auto pcache = | |
80 | std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(mem_size)); | |
81 | tcache->AddTier(pcache); | |
82 | // create secondary tier | |
83 | auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt)); | |
84 | tcache->AddTier(scache); | |
85 | ||
86 | Status s = tcache->Open(); | |
87 | assert(s.ok()); | |
88 | return tcache; | |
89 | } | |
90 | ||
91 | std::unique_ptr<PersistentTieredCache> NewTieredCache() { | |
92 | std::shared_ptr<Logger> log; | |
93 | if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) { | |
94 | fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str()); | |
95 | abort(); | |
96 | } | |
97 | ||
98 | auto pct = FLAGS_volatile_cache_pct / static_cast<double>(100); | |
99 | PersistentCacheConfig opt(Env::Default(), FLAGS_path, | |
100 | (1 - pct) * FLAGS_cache_size, log); | |
101 | opt.writer_dispatch_size = FLAGS_writer_iosize; | |
102 | opt.writer_qdepth = FLAGS_writer_qdepth; | |
103 | opt.pipeline_writes = FLAGS_enable_pipelined_writes; | |
104 | opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max(); | |
105 | return NewTieredCache(FLAGS_cache_size * pct, opt); | |
106 | } | |
107 | ||
108 | // | |
109 | // Benchmark driver | |
110 | // | |
111 | class CacheTierBenchmark { | |
112 | public: | |
113 | explicit CacheTierBenchmark(std::shared_ptr<PersistentCacheTier>&& cache) | |
114 | : cache_(cache) { | |
115 | if (FLAGS_nthread_read) { | |
116 | fprintf(stdout, "Pre-populating\n"); | |
117 | Prepop(); | |
118 | fprintf(stdout, "Pre-population completed\n"); | |
119 | } | |
120 | ||
121 | stats_.Clear(); | |
122 | ||
123 | // Start IO threads | |
124 | std::list<port::Thread> threads; | |
125 | Spawn(FLAGS_nthread_write, &threads, | |
126 | std::bind(&CacheTierBenchmark::Write, this)); | |
127 | Spawn(FLAGS_nthread_read, &threads, | |
128 | std::bind(&CacheTierBenchmark::Read, this)); | |
129 | ||
130 | // Wait till FLAGS_nsec and then signal to quit | |
131 | StopWatchNano t(Env::Default(), /*auto_start=*/true); | |
132 | size_t sec = t.ElapsedNanos() / 1000000000ULL; | |
133 | while (!quit_) { | |
134 | sec = t.ElapsedNanos() / 1000000000ULL; | |
135 | quit_ = sec > size_t(FLAGS_nsec); | |
136 | /* sleep override */ sleep(1); | |
137 | } | |
138 | ||
139 | // Wait for threads to exit | |
140 | Join(&threads); | |
141 | // Print stats | |
142 | PrintStats(sec); | |
143 | // Close the cache | |
144 | cache_->TEST_Flush(); | |
145 | cache_->Close(); | |
146 | } | |
147 | ||
148 | private: | |
149 | void PrintStats(const size_t sec) { | |
150 | std::ostringstream msg; | |
151 | msg << "Test stats" << std::endl | |
152 | << "* Elapsed: " << sec << " s" << std::endl | |
153 | << "* Write Latency:" << std::endl | |
154 | << stats_.write_latency_.ToString() << std::endl | |
155 | << "* Read Latency:" << std::endl | |
156 | << stats_.read_latency_.ToString() << std::endl | |
157 | << "* Bytes written:" << std::endl | |
158 | << stats_.bytes_written_.ToString() << std::endl | |
159 | << "* Bytes read:" << std::endl | |
160 | << stats_.bytes_read_.ToString() << std::endl | |
161 | << "Cache stats:" << std::endl | |
162 | << cache_->PrintStats() << std::endl; | |
163 | fprintf(stderr, "%s\n", msg.str().c_str()); | |
164 | } | |
165 | ||
166 | // | |
167 | // Insert implementation and corresponding helper functions | |
168 | // | |
169 | void Prepop() { | |
170 | for (uint64_t i = 0; i < 1024 * 1024; ++i) { | |
171 | InsertKey(i); | |
172 | insert_key_limit_++; | |
173 | read_key_limit_++; | |
174 | } | |
175 | ||
176 | // Wait until data is flushed | |
177 | cache_->TEST_Flush(); | |
178 | // warmup the cache | |
179 | for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) { | |
180 | } | |
181 | } | |
182 | ||
183 | void Write() { | |
184 | while (!quit_) { | |
185 | InsertKey(insert_key_limit_++); | |
186 | } | |
187 | } | |
188 | ||
189 | void InsertKey(const uint64_t key) { | |
190 | // construct key | |
191 | uint64_t k[3]; | |
192 | Slice block_key = FillKey(k, key); | |
193 | ||
194 | // construct value | |
195 | auto block = NewBlock(key); | |
196 | ||
197 | // insert | |
198 | StopWatchNano timer(Env::Default(), /*auto_start=*/true); | |
199 | while (true) { | |
200 | Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize); | |
201 | if (status.ok()) { | |
202 | break; | |
203 | } | |
204 | ||
205 | // transient error is possible if we run without pipelining | |
206 | assert(!FLAGS_enable_pipelined_writes); | |
207 | } | |
208 | ||
209 | // adjust stats | |
210 | const size_t elapsed_micro = timer.ElapsedNanos() / 1000; | |
211 | stats_.write_latency_.Add(elapsed_micro); | |
212 | stats_.bytes_written_.Add(FLAGS_iosize); | |
213 | } | |
214 | ||
215 | // | |
216 | // Read implementation | |
217 | // | |
218 | void Read() { | |
219 | while (!quit_) { | |
220 | ReadKey(random() % read_key_limit_); | |
221 | } | |
222 | } | |
223 | ||
224 | void ReadKey(const uint64_t val) { | |
225 | // construct key | |
226 | uint64_t k[3]; | |
227 | Slice key = FillKey(k, val); | |
228 | ||
229 | // Lookup in cache | |
230 | StopWatchNano timer(Env::Default(), /*auto_start=*/true); | |
231 | std::unique_ptr<char[]> block; | |
232 | size_t size; | |
233 | Status status = cache_->Lookup(key, &block, &size); | |
234 | if (!status.ok()) { | |
235 | fprintf(stderr, "%s\n", status.ToString().c_str()); | |
236 | } | |
237 | assert(status.ok()); | |
238 | assert(size == (size_t) FLAGS_iosize); | |
239 | ||
240 | // adjust stats | |
241 | const size_t elapsed_micro = timer.ElapsedNanos() / 1000; | |
242 | stats_.read_latency_.Add(elapsed_micro); | |
243 | stats_.bytes_read_.Add(FLAGS_iosize); | |
244 | ||
245 | // verify content | |
246 | if (!FLAGS_benchmark) { | |
247 | auto expected_block = NewBlock(val); | |
248 | assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0); | |
249 | } | |
250 | } | |
251 | ||
252 | // create data for a key by filling with a certain pattern | |
253 | std::unique_ptr<char[]> NewBlock(const uint64_t val) { | |
254 | unique_ptr<char[]> data(new char[FLAGS_iosize]); | |
255 | memset(data.get(), val % 255, FLAGS_iosize); | |
256 | return data; | |
257 | } | |
258 | ||
259 | // spawn threads | |
260 | void Spawn(const size_t n, std::list<port::Thread>* threads, | |
261 | const std::function<void()>& fn) { | |
262 | for (size_t i = 0; i < n; ++i) { | |
263 | threads->emplace_back(fn); | |
264 | } | |
265 | } | |
266 | ||
267 | // join threads | |
268 | void Join(std::list<port::Thread>* threads) { | |
269 | for (auto& th : *threads) { | |
270 | th.join(); | |
271 | } | |
272 | } | |
273 | ||
274 | // construct key | |
275 | Slice FillKey(uint64_t (&k)[3], const uint64_t val) { | |
276 | k[0] = k[1] = 0; | |
277 | k[2] = val; | |
278 | void* p = static_cast<void*>(&k); | |
279 | return Slice(static_cast<char*>(p), sizeof(k)); | |
280 | } | |
281 | ||
282 | // benchmark stats | |
283 | struct Stats { | |
284 | void Clear() { | |
285 | bytes_written_.Clear(); | |
286 | bytes_read_.Clear(); | |
287 | read_latency_.Clear(); | |
288 | write_latency_.Clear(); | |
289 | } | |
290 | ||
291 | HistogramImpl bytes_written_; | |
292 | HistogramImpl bytes_read_; | |
293 | HistogramImpl read_latency_; | |
294 | HistogramImpl write_latency_; | |
295 | }; | |
296 | ||
297 | std::shared_ptr<PersistentCacheTier> cache_; // cache implementation | |
298 | std::atomic<uint64_t> insert_key_limit_{0}; // data inserted upto | |
299 | std::atomic<uint64_t> read_key_limit_{0}; // data can be read safely upto | |
300 | bool quit_ = false; // Quit thread ? | |
301 | mutable Stats stats_; // Stats | |
302 | }; | |
303 | ||
304 | } // namespace rocksdb | |
305 | ||
306 | // | |
307 | // main | |
308 | // | |
309 | int main(int argc, char** argv) { | |
11fdf7f2 TL |
310 | GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") + |
311 | std::string(argv[0]) + " [OPTIONS]..."); | |
312 | GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false); | |
7c673cae FG |
313 | |
314 | std::ostringstream msg; | |
315 | msg << "Config" << std::endl | |
316 | << "======" << std::endl | |
317 | << "* nsec=" << FLAGS_nsec << std::endl | |
318 | << "* nthread_write=" << FLAGS_nthread_write << std::endl | |
319 | << "* path=" << FLAGS_path << std::endl | |
320 | << "* cache_size=" << FLAGS_cache_size << std::endl | |
321 | << "* iosize=" << FLAGS_iosize << std::endl | |
322 | << "* writer_iosize=" << FLAGS_writer_iosize << std::endl | |
323 | << "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl | |
324 | << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes | |
325 | << std::endl | |
326 | << "* cache_type=" << FLAGS_cache_type << std::endl | |
327 | << "* benchmark=" << FLAGS_benchmark << std::endl | |
328 | << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl; | |
329 | ||
330 | fprintf(stderr, "%s\n", msg.str().c_str()); | |
331 | ||
332 | std::shared_ptr<rocksdb::PersistentCacheTier> cache; | |
333 | if (FLAGS_cache_type == "block_cache") { | |
334 | fprintf(stderr, "Using block cache implementation\n"); | |
335 | cache = rocksdb::NewBlockCache(); | |
336 | } else if (FLAGS_cache_type == "volatile") { | |
337 | fprintf(stderr, "Using volatile cache implementation\n"); | |
338 | cache = rocksdb::NewVolatileCache(); | |
339 | } else if (FLAGS_cache_type == "tiered") { | |
340 | fprintf(stderr, "Using tiered cache implementation\n"); | |
341 | cache = rocksdb::NewTieredCache(); | |
342 | } else { | |
343 | fprintf(stderr, "Unknown option for cache\n"); | |
344 | } | |
345 | ||
346 | assert(cache); | |
347 | if (!cache) { | |
348 | fprintf(stderr, "Error creating cache\n"); | |
349 | abort(); | |
350 | } | |
351 | ||
352 | std::unique_ptr<rocksdb::CacheTierBenchmark> benchmark( | |
353 | new rocksdb::CacheTierBenchmark(std::move(cache))); | |
354 | ||
355 | return 0; | |
356 | } | |
357 | #endif // #ifndef GFLAGS | |
358 | #else | |
359 | int main(int, char**) { return 0; } | |
360 | #endif |