1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
10 int main() { fprintf(stderr
, "Please install gflags to run tools\n"); }
16 #include <unordered_map>
18 #include "rocksdb/env.h"
20 #include "utilities/persistent_cache/block_cache_tier.h"
21 #include "utilities/persistent_cache/persistent_cache_tier.h"
22 #include "utilities/persistent_cache/volatile_tier_impl.h"
24 #include "monitoring/histogram.h"
25 #include "port/port.h"
26 #include "table/block_based/block_builder.h"
27 #include "util/gflags_compat.h"
28 #include "util/mutexlock.h"
29 #include "util/stop_watch.h"
31 DEFINE_int32(nsec
, 10, "nsec");
32 DEFINE_int32(nthread_write
, 1, "Insert threads");
33 DEFINE_int32(nthread_read
, 1, "Lookup threads");
34 DEFINE_string(path
, "/tmp/microbench/blkcache", "Path for cachefile");
35 DEFINE_string(log_path
, "/tmp/log", "Path for the log file");
36 DEFINE_uint64(cache_size
, std::numeric_limits
<uint64_t>::max(), "Cache size");
37 DEFINE_int32(iosize
, 4 * 1024, "Read IO size");
38 DEFINE_int32(writer_iosize
, 4 * 1024, "File writer IO size");
39 DEFINE_int32(writer_qdepth
, 1, "File writer qdepth");
40 DEFINE_bool(enable_pipelined_writes
, false, "Enable async writes");
41 DEFINE_string(cache_type
, "block_cache",
42 "Cache type. (block_cache, volatile, tiered)");
43 DEFINE_bool(benchmark
, false, "Benchmark mode");
44 DEFINE_int32(volatile_cache_pct
, 10, "Percentage of cache in memory tier.");
46 namespace ROCKSDB_NAMESPACE
{
48 std::unique_ptr
<PersistentCacheTier
> NewVolatileCache() {
49 assert(FLAGS_cache_size
!= std::numeric_limits
<uint64_t>::max());
50 std::unique_ptr
<PersistentCacheTier
> pcache(
51 new VolatileCacheTier(FLAGS_cache_size
));
55 std::unique_ptr
<PersistentCacheTier
> NewBlockCache() {
56 std::shared_ptr
<Logger
> log
;
57 if (!Env::Default()->NewLogger(FLAGS_log_path
, &log
).ok()) {
58 fprintf(stderr
, "Error creating log %s \n", FLAGS_log_path
.c_str());
62 PersistentCacheConfig
opt(Env::Default(), FLAGS_path
, FLAGS_cache_size
, log
);
63 opt
.writer_dispatch_size
= FLAGS_writer_iosize
;
64 opt
.writer_qdepth
= FLAGS_writer_qdepth
;
65 opt
.pipeline_writes
= FLAGS_enable_pipelined_writes
;
66 opt
.max_write_pipeline_backlog_size
= std::numeric_limits
<uint64_t>::max();
67 std::unique_ptr
<PersistentCacheTier
> cache(new BlockCacheTier(opt
));
68 Status status
= cache
->Open();
72 // create a new cache tier
73 // construct a tiered RAM+Block cache
74 std::unique_ptr
<PersistentTieredCache
> NewTieredCache(
75 const size_t mem_size
, const PersistentCacheConfig
& opt
) {
76 std::unique_ptr
<PersistentTieredCache
> tcache(new PersistentTieredCache());
77 // create primary tier
80 std::shared_ptr
<PersistentCacheTier
>(new VolatileCacheTier(mem_size
));
81 tcache
->AddTier(pcache
);
82 // create secondary tier
83 auto scache
= std::shared_ptr
<PersistentCacheTier
>(new BlockCacheTier(opt
));
84 tcache
->AddTier(scache
);
86 Status s
= tcache
->Open();
91 std::unique_ptr
<PersistentTieredCache
> NewTieredCache() {
92 std::shared_ptr
<Logger
> log
;
93 if (!Env::Default()->NewLogger(FLAGS_log_path
, &log
).ok()) {
94 fprintf(stderr
, "Error creating log %s \n", FLAGS_log_path
.c_str());
98 auto pct
= FLAGS_volatile_cache_pct
/ static_cast<double>(100);
99 PersistentCacheConfig
opt(Env::Default(), FLAGS_path
,
100 (1 - pct
) * FLAGS_cache_size
, log
);
101 opt
.writer_dispatch_size
= FLAGS_writer_iosize
;
102 opt
.writer_qdepth
= FLAGS_writer_qdepth
;
103 opt
.pipeline_writes
= FLAGS_enable_pipelined_writes
;
104 opt
.max_write_pipeline_backlog_size
= std::numeric_limits
<uint64_t>::max();
105 return NewTieredCache(FLAGS_cache_size
* pct
, opt
);
111 class CacheTierBenchmark
{
113 explicit CacheTierBenchmark(std::shared_ptr
<PersistentCacheTier
>&& cache
)
115 if (FLAGS_nthread_read
) {
116 fprintf(stdout
, "Pre-populating\n");
118 fprintf(stdout
, "Pre-population completed\n");
124 std::list
<port::Thread
> threads
;
125 Spawn(FLAGS_nthread_write
, &threads
,
126 std::bind(&CacheTierBenchmark::Write
, this));
127 Spawn(FLAGS_nthread_read
, &threads
,
128 std::bind(&CacheTierBenchmark::Read
, this));
130 // Wait till FLAGS_nsec and then signal to quit
131 StopWatchNano
t(Env::Default(), /*auto_start=*/true);
132 size_t sec
= t
.ElapsedNanos() / 1000000000ULL;
134 sec
= t
.ElapsedNanos() / 1000000000ULL;
135 quit_
= sec
> size_t(FLAGS_nsec
);
136 /* sleep override */ sleep(1);
139 // Wait for threads to exit
144 cache_
->TEST_Flush();
149 void PrintStats(const size_t sec
) {
150 std::ostringstream msg
;
151 msg
<< "Test stats" << std::endl
152 << "* Elapsed: " << sec
<< " s" << std::endl
153 << "* Write Latency:" << std::endl
154 << stats_
.write_latency_
.ToString() << std::endl
155 << "* Read Latency:" << std::endl
156 << stats_
.read_latency_
.ToString() << std::endl
157 << "* Bytes written:" << std::endl
158 << stats_
.bytes_written_
.ToString() << std::endl
159 << "* Bytes read:" << std::endl
160 << stats_
.bytes_read_
.ToString() << std::endl
161 << "Cache stats:" << std::endl
162 << cache_
->PrintStats() << std::endl
;
163 fprintf(stderr
, "%s\n", msg
.str().c_str());
167 // Insert implementation and corresponding helper functions
170 for (uint64_t i
= 0; i
< 1024 * 1024; ++i
) {
176 // Wait until data is flushed
177 cache_
->TEST_Flush();
179 for (uint64_t i
= 0; i
< 1024 * 1024; ReadKey(i
++)) {
185 InsertKey(insert_key_limit_
++);
189 void InsertKey(const uint64_t key
) {
192 Slice block_key
= FillKey(k
, key
);
195 auto block
= NewBlock(key
);
198 StopWatchNano
timer(Env::Default(), /*auto_start=*/true);
200 Status status
= cache_
->Insert(block_key
, block
.get(), FLAGS_iosize
);
205 // transient error is possible if we run without pipelining
206 assert(!FLAGS_enable_pipelined_writes
);
210 const size_t elapsed_micro
= timer
.ElapsedNanos() / 1000;
211 stats_
.write_latency_
.Add(elapsed_micro
);
212 stats_
.bytes_written_
.Add(FLAGS_iosize
);
216 // Read implementation
220 ReadKey(random() % read_key_limit_
);
224 void ReadKey(const uint64_t val
) {
227 Slice key
= FillKey(k
, val
);
230 StopWatchNano
timer(Env::Default(), /*auto_start=*/true);
231 std::unique_ptr
<char[]> block
;
233 Status status
= cache_
->Lookup(key
, &block
, &size
);
235 fprintf(stderr
, "%s\n", status
.ToString().c_str());
238 assert(size
== (size_t) FLAGS_iosize
);
241 const size_t elapsed_micro
= timer
.ElapsedNanos() / 1000;
242 stats_
.read_latency_
.Add(elapsed_micro
);
243 stats_
.bytes_read_
.Add(FLAGS_iosize
);
246 if (!FLAGS_benchmark
) {
247 auto expected_block
= NewBlock(val
);
248 assert(memcmp(block
.get(), expected_block
.get(), FLAGS_iosize
) == 0);
252 // create data for a key by filling with a certain pattern
253 std::unique_ptr
<char[]> NewBlock(const uint64_t val
) {
254 std::unique_ptr
<char[]> data(new char[FLAGS_iosize
]);
255 memset(data
.get(), val
% 255, FLAGS_iosize
);
260 void Spawn(const size_t n
, std::list
<port::Thread
>* threads
,
261 const std::function
<void()>& fn
) {
262 for (size_t i
= 0; i
< n
; ++i
) {
263 threads
->emplace_back(fn
);
268 void Join(std::list
<port::Thread
>* threads
) {
269 for (auto& th
: *threads
) {
275 Slice
FillKey(uint64_t (&k
)[3], const uint64_t val
) {
278 void* p
= static_cast<void*>(&k
);
279 return Slice(static_cast<char*>(p
), sizeof(k
));
285 bytes_written_
.Clear();
287 read_latency_
.Clear();
288 write_latency_
.Clear();
291 HistogramImpl bytes_written_
;
292 HistogramImpl bytes_read_
;
293 HistogramImpl read_latency_
;
294 HistogramImpl write_latency_
;
297 std::shared_ptr
<PersistentCacheTier
> cache_
; // cache implementation
298 std::atomic
<uint64_t> insert_key_limit_
{0}; // data inserted upto
299 std::atomic
<uint64_t> read_key_limit_
{0}; // data can be read safely upto
300 bool quit_
= false; // Quit thread ?
301 mutable Stats stats_
; // Stats
304 } // namespace ROCKSDB_NAMESPACE
309 int main(int argc
, char** argv
) {
310 GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") +
311 std::string(argv
[0]) + " [OPTIONS]...");
312 GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc
, &argv
, false);
314 std::ostringstream msg
;
315 msg
<< "Config" << std::endl
316 << "======" << std::endl
317 << "* nsec=" << FLAGS_nsec
<< std::endl
318 << "* nthread_write=" << FLAGS_nthread_write
<< std::endl
319 << "* path=" << FLAGS_path
<< std::endl
320 << "* cache_size=" << FLAGS_cache_size
<< std::endl
321 << "* iosize=" << FLAGS_iosize
<< std::endl
322 << "* writer_iosize=" << FLAGS_writer_iosize
<< std::endl
323 << "* writer_qdepth=" << FLAGS_writer_qdepth
<< std::endl
324 << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes
326 << "* cache_type=" << FLAGS_cache_type
<< std::endl
327 << "* benchmark=" << FLAGS_benchmark
<< std::endl
328 << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct
<< std::endl
;
330 fprintf(stderr
, "%s\n", msg
.str().c_str());
332 std::shared_ptr
<ROCKSDB_NAMESPACE::PersistentCacheTier
> cache
;
333 if (FLAGS_cache_type
== "block_cache") {
334 fprintf(stderr
, "Using block cache implementation\n");
335 cache
= ROCKSDB_NAMESPACE::NewBlockCache();
336 } else if (FLAGS_cache_type
== "volatile") {
337 fprintf(stderr
, "Using volatile cache implementation\n");
338 cache
= ROCKSDB_NAMESPACE::NewVolatileCache();
339 } else if (FLAGS_cache_type
== "tiered") {
340 fprintf(stderr
, "Using tiered cache implementation\n");
341 cache
= ROCKSDB_NAMESPACE::NewTieredCache();
343 fprintf(stderr
, "Unknown option for cache\n");
348 fprintf(stderr
, "Error creating cache\n");
352 std::unique_ptr
<ROCKSDB_NAMESPACE::CacheTierBenchmark
> benchmark(
353 new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache
)));
357 #endif // #ifndef GFLAGS
359 int main(int, char**) { return 0; }