1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
10 int main() { fprintf(stderr
, "Please install gflags to run tools\n"); }
16 #include <unordered_map>
18 #include "monitoring/histogram.h"
19 #include "port/port.h"
20 #include "rocksdb/env.h"
21 #include "rocksdb/system_clock.h"
22 #include "table/block_based/block_builder.h"
23 #include "util/gflags_compat.h"
24 #include "util/mutexlock.h"
25 #include "util/stop_watch.h"
26 #include "utilities/persistent_cache/block_cache_tier.h"
27 #include "utilities/persistent_cache/persistent_cache_tier.h"
28 #include "utilities/persistent_cache/volatile_tier_impl.h"
30 DEFINE_int32(nsec
, 10, "nsec");
31 DEFINE_int32(nthread_write
, 1, "Insert threads");
32 DEFINE_int32(nthread_read
, 1, "Lookup threads");
33 DEFINE_string(path
, "/tmp/microbench/blkcache", "Path for cachefile");
34 DEFINE_string(log_path
, "/tmp/log", "Path for the log file");
35 DEFINE_uint64(cache_size
, std::numeric_limits
<uint64_t>::max(), "Cache size");
36 DEFINE_int32(iosize
, 4 * 1024, "Read IO size");
37 DEFINE_int32(writer_iosize
, 4 * 1024, "File writer IO size");
38 DEFINE_int32(writer_qdepth
, 1, "File writer qdepth");
39 DEFINE_bool(enable_pipelined_writes
, false, "Enable async writes");
40 DEFINE_string(cache_type
, "block_cache",
41 "Cache type. (block_cache, volatile, tiered)");
42 DEFINE_bool(benchmark
, false, "Benchmark mode");
43 DEFINE_int32(volatile_cache_pct
, 10, "Percentage of cache in memory tier.");
45 namespace ROCKSDB_NAMESPACE
{
47 std::unique_ptr
<PersistentCacheTier
> NewVolatileCache() {
48 assert(FLAGS_cache_size
!= std::numeric_limits
<uint64_t>::max());
49 std::unique_ptr
<PersistentCacheTier
> pcache(
50 new VolatileCacheTier(FLAGS_cache_size
));
54 std::unique_ptr
<PersistentCacheTier
> NewBlockCache() {
55 std::shared_ptr
<Logger
> log
;
56 if (!Env::Default()->NewLogger(FLAGS_log_path
, &log
).ok()) {
57 fprintf(stderr
, "Error creating log %s \n", FLAGS_log_path
.c_str());
61 PersistentCacheConfig
opt(Env::Default(), FLAGS_path
, FLAGS_cache_size
, log
);
62 opt
.writer_dispatch_size
= FLAGS_writer_iosize
;
63 opt
.writer_qdepth
= FLAGS_writer_qdepth
;
64 opt
.pipeline_writes
= FLAGS_enable_pipelined_writes
;
65 opt
.max_write_pipeline_backlog_size
= std::numeric_limits
<uint64_t>::max();
66 std::unique_ptr
<PersistentCacheTier
> cache(new BlockCacheTier(opt
));
67 Status status
= cache
->Open();
71 // create a new cache tier
72 // construct a tiered RAM+Block cache
73 std::unique_ptr
<PersistentTieredCache
> NewTieredCache(
74 const size_t mem_size
, const PersistentCacheConfig
& opt
) {
75 std::unique_ptr
<PersistentTieredCache
> tcache(new PersistentTieredCache());
76 // create primary tier
79 std::shared_ptr
<PersistentCacheTier
>(new VolatileCacheTier(mem_size
));
80 tcache
->AddTier(pcache
);
81 // create secondary tier
82 auto scache
= std::shared_ptr
<PersistentCacheTier
>(new BlockCacheTier(opt
));
83 tcache
->AddTier(scache
);
85 Status s
= tcache
->Open();
90 std::unique_ptr
<PersistentTieredCache
> NewTieredCache() {
91 std::shared_ptr
<Logger
> log
;
92 if (!Env::Default()->NewLogger(FLAGS_log_path
, &log
).ok()) {
93 fprintf(stderr
, "Error creating log %s \n", FLAGS_log_path
.c_str());
97 auto pct
= FLAGS_volatile_cache_pct
/ static_cast<double>(100);
98 PersistentCacheConfig
opt(Env::Default(), FLAGS_path
,
99 (1 - pct
) * FLAGS_cache_size
, log
);
100 opt
.writer_dispatch_size
= FLAGS_writer_iosize
;
101 opt
.writer_qdepth
= FLAGS_writer_qdepth
;
102 opt
.pipeline_writes
= FLAGS_enable_pipelined_writes
;
103 opt
.max_write_pipeline_backlog_size
= std::numeric_limits
<uint64_t>::max();
104 return NewTieredCache(FLAGS_cache_size
* pct
, opt
);
110 class CacheTierBenchmark
{
112 explicit CacheTierBenchmark(std::shared_ptr
<PersistentCacheTier
>&& cache
)
114 if (FLAGS_nthread_read
) {
115 fprintf(stdout
, "Pre-populating\n");
117 fprintf(stdout
, "Pre-population completed\n");
123 std::list
<port::Thread
> threads
;
124 Spawn(FLAGS_nthread_write
, &threads
,
125 std::bind(&CacheTierBenchmark::Write
, this));
126 Spawn(FLAGS_nthread_read
, &threads
,
127 std::bind(&CacheTierBenchmark::Read
, this));
129 // Wait till FLAGS_nsec and then signal to quit
130 StopWatchNano
t(SystemClock::Default().get(), /*auto_start=*/true);
131 size_t sec
= t
.ElapsedNanos() / 1000000000ULL;
133 sec
= t
.ElapsedNanos() / 1000000000ULL;
134 quit_
= sec
> size_t(FLAGS_nsec
);
135 /* sleep override */ sleep(1);
138 // Wait for threads to exit
143 cache_
->TEST_Flush();
148 void PrintStats(const size_t sec
) {
149 std::ostringstream msg
;
150 msg
<< "Test stats" << std::endl
151 << "* Elapsed: " << sec
<< " s" << std::endl
152 << "* Write Latency:" << std::endl
153 << stats_
.write_latency_
.ToString() << std::endl
154 << "* Read Latency:" << std::endl
155 << stats_
.read_latency_
.ToString() << std::endl
156 << "* Bytes written:" << std::endl
157 << stats_
.bytes_written_
.ToString() << std::endl
158 << "* Bytes read:" << std::endl
159 << stats_
.bytes_read_
.ToString() << std::endl
160 << "Cache stats:" << std::endl
161 << cache_
->PrintStats() << std::endl
;
162 fprintf(stderr
, "%s\n", msg
.str().c_str());
166 // Insert implementation and corresponding helper functions
169 for (uint64_t i
= 0; i
< 1024 * 1024; ++i
) {
175 // Wait until data is flushed
176 cache_
->TEST_Flush();
178 for (uint64_t i
= 0; i
< 1024 * 1024; ReadKey(i
++)) {
184 InsertKey(insert_key_limit_
++);
188 void InsertKey(const uint64_t key
) {
191 Slice block_key
= FillKey(k
, key
);
194 auto block
= NewBlock(key
);
197 StopWatchNano
timer(SystemClock::Default().get(), /*auto_start=*/true);
199 Status status
= cache_
->Insert(block_key
, block
.get(), FLAGS_iosize
);
204 // transient error is possible if we run without pipelining
205 assert(!FLAGS_enable_pipelined_writes
);
209 const size_t elapsed_micro
= timer
.ElapsedNanos() / 1000;
210 stats_
.write_latency_
.Add(elapsed_micro
);
211 stats_
.bytes_written_
.Add(FLAGS_iosize
);
215 // Read implementation
219 ReadKey(random() % read_key_limit_
);
223 void ReadKey(const uint64_t val
) {
226 Slice key
= FillKey(k
, val
);
229 StopWatchNano
timer(SystemClock::Default().get(), /*auto_start=*/true);
230 std::unique_ptr
<char[]> block
;
232 Status status
= cache_
->Lookup(key
, &block
, &size
);
234 fprintf(stderr
, "%s\n", status
.ToString().c_str());
237 assert(size
== (size_t)FLAGS_iosize
);
240 const size_t elapsed_micro
= timer
.ElapsedNanos() / 1000;
241 stats_
.read_latency_
.Add(elapsed_micro
);
242 stats_
.bytes_read_
.Add(FLAGS_iosize
);
245 if (!FLAGS_benchmark
) {
246 auto expected_block
= NewBlock(val
);
247 assert(memcmp(block
.get(), expected_block
.get(), FLAGS_iosize
) == 0);
251 // create data for a key by filling with a certain pattern
252 std::unique_ptr
<char[]> NewBlock(const uint64_t val
) {
253 std::unique_ptr
<char[]> data(new char[FLAGS_iosize
]);
254 memset(data
.get(), val
% 255, FLAGS_iosize
);
259 void Spawn(const size_t n
, std::list
<port::Thread
>* threads
,
260 const std::function
<void()>& fn
) {
261 for (size_t i
= 0; i
< n
; ++i
) {
262 threads
->emplace_back(fn
);
267 void Join(std::list
<port::Thread
>* threads
) {
268 for (auto& th
: *threads
) {
274 Slice
FillKey(uint64_t (&k
)[3], const uint64_t val
) {
277 void* p
= static_cast<void*>(&k
);
278 return Slice(static_cast<char*>(p
), sizeof(k
));
284 bytes_written_
.Clear();
286 read_latency_
.Clear();
287 write_latency_
.Clear();
290 HistogramImpl bytes_written_
;
291 HistogramImpl bytes_read_
;
292 HistogramImpl read_latency_
;
293 HistogramImpl write_latency_
;
296 std::shared_ptr
<PersistentCacheTier
> cache_
; // cache implementation
297 std::atomic
<uint64_t> insert_key_limit_
{0}; // data inserted upto
298 std::atomic
<uint64_t> read_key_limit_
{0}; // data can be read safely upto
299 bool quit_
= false; // Quit thread ?
300 mutable Stats stats_
; // Stats
303 } // namespace ROCKSDB_NAMESPACE
308 int main(int argc
, char** argv
) {
309 GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") +
310 std::string(argv
[0]) + " [OPTIONS]...");
311 GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc
, &argv
, false);
313 std::ostringstream msg
;
314 msg
<< "Config" << std::endl
315 << "======" << std::endl
316 << "* nsec=" << FLAGS_nsec
<< std::endl
317 << "* nthread_write=" << FLAGS_nthread_write
<< std::endl
318 << "* path=" << FLAGS_path
<< std::endl
319 << "* cache_size=" << FLAGS_cache_size
<< std::endl
320 << "* iosize=" << FLAGS_iosize
<< std::endl
321 << "* writer_iosize=" << FLAGS_writer_iosize
<< std::endl
322 << "* writer_qdepth=" << FLAGS_writer_qdepth
<< std::endl
323 << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes
325 << "* cache_type=" << FLAGS_cache_type
<< std::endl
326 << "* benchmark=" << FLAGS_benchmark
<< std::endl
327 << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct
<< std::endl
;
329 fprintf(stderr
, "%s\n", msg
.str().c_str());
331 std::shared_ptr
<ROCKSDB_NAMESPACE::PersistentCacheTier
> cache
;
332 if (FLAGS_cache_type
== "block_cache") {
333 fprintf(stderr
, "Using block cache implementation\n");
334 cache
= ROCKSDB_NAMESPACE::NewBlockCache();
335 } else if (FLAGS_cache_type
== "volatile") {
336 fprintf(stderr
, "Using volatile cache implementation\n");
337 cache
= ROCKSDB_NAMESPACE::NewVolatileCache();
338 } else if (FLAGS_cache_type
== "tiered") {
339 fprintf(stderr
, "Using tiered cache implementation\n");
340 cache
= ROCKSDB_NAMESPACE::NewTieredCache();
342 fprintf(stderr
, "Unknown option for cache\n");
347 fprintf(stderr
, "Error creating cache\n");
351 std::unique_ptr
<ROCKSDB_NAMESPACE::CacheTierBenchmark
> benchmark(
352 new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache
)));
356 #endif // #ifndef GFLAGS
358 int main(int, char**) { return 0; }