1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
10 fprintf(stderr
, "Please install gflags to run rocksdb tools\n");
18 #include "db/write_batch_internal.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/types.h"
21 #include "util/gflags_compat.h"
22 #include "util/testutil.h"
24 // Run a thread to perform Put's.
25 // Another thread uses GetUpdatesSince API to keep getting the updates.
27 // --num_inserts = the num of inserts the first thread should perform.
28 // --wal_ttl = the wal ttl for the run.
30 using namespace rocksdb
;
32 using GFLAGS_NAMESPACE::ParseCommandLineFlags
;
33 using GFLAGS_NAMESPACE::SetUsageMessage
;
35 struct DataPumpThread
{
37 DB
* db
; // Assumption DB is Open'ed already.
40 static std::string
RandomString(Random
* rnd
, int len
) {
42 test::RandomString(rnd
, len
, &r
);
46 static void DataPumpThreadBody(void* arg
) {
47 DataPumpThread
* t
= reinterpret_cast<DataPumpThread
*>(arg
);
51 while (i
++ < t
->no_records
) {
52 if (!db
->Put(WriteOptions(), Slice(RandomString(&rnd
, 500)),
53 Slice(RandomString(&rnd
, 500)))
55 fprintf(stderr
, "Error in put\n");
61 struct ReplicationThread
{
62 std::atomic
<bool> stop
;
64 volatile size_t no_read
;
67 static void ReplicationThreadBody(void* arg
) {
68 ReplicationThread
* t
= reinterpret_cast<ReplicationThread
*>(arg
);
70 std::unique_ptr
<TransactionLogIterator
> iter
;
71 SequenceNumber currentSeqNum
= 1;
72 while (!t
->stop
.load(std::memory_order_acquire
)) {
75 while (!db
->GetUpdatesSince(currentSeqNum
, &iter
).ok()) {
76 if (t
->stop
.load(std::memory_order_acquire
)) {
80 fprintf(stderr
, "Refreshing iterator\n");
81 for (; iter
->Valid(); iter
->Next(), t
->no_read
++, currentSeqNum
++) {
82 BatchResult res
= iter
->GetBatch();
83 if (res
.sequence
!= currentSeqNum
) {
84 fprintf(stderr
, "Missed a seq no. b/w %ld and %ld\n",
85 (long)currentSeqNum
, (long)res
.sequence
);
92 DEFINE_uint64(num_inserts
, 1000,
93 "the num of inserts the first thread should"
95 DEFINE_uint64(wal_ttl_seconds
, 1000, "the wal ttl for the run(in seconds)");
96 DEFINE_uint64(wal_size_limit_MB
, 10,
97 "the wal size limit for the run"
100 int main(int argc
, const char** argv
) {
102 std::string("\nUSAGE:\n") + std::string(argv
[0]) +
103 " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
104 " --wal_size_limit_MB=<WAL_size_limit_MB>");
105 ParseCommandLineFlags(&argc
, const_cast<char***>(&argv
), true);
107 Env
* env
= Env::Default();
108 std::string default_db_path
;
109 env
->GetTestDirectory(&default_db_path
);
110 default_db_path
+= "db_repl_stress";
112 options
.create_if_missing
= true;
113 options
.WAL_ttl_seconds
= FLAGS_wal_ttl_seconds
;
114 options
.WAL_size_limit_MB
= FLAGS_wal_size_limit_MB
;
116 DestroyDB(default_db_path
, options
);
118 Status s
= DB::Open(options
, default_db_path
, &db
);
121 fprintf(stderr
, "Could not open DB due to %s\n", s
.ToString().c_str());
125 DataPumpThread dataPump
;
126 dataPump
.no_records
= FLAGS_num_inserts
;
128 env
->StartThread(DataPumpThreadBody
, &dataPump
);
130 ReplicationThread replThread
;
132 replThread
.no_read
= 0;
133 replThread
.stop
.store(false, std::memory_order_release
);
135 env
->StartThread(ReplicationThreadBody
, &replThread
);
136 while (replThread
.no_read
< FLAGS_num_inserts
)
138 replThread
.stop
.store(true, std::memory_order_release
);
139 if (replThread
.no_read
< dataPump
.no_records
) {
140 // no. read should be => than inserted.
142 "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt
143 " Written : %" ROCKSDB_PRIszt
"\n",
144 replThread
.no_read
, dataPump
.no_records
);
147 fprintf(stderr
, "Successful!\n");
153 #else // ROCKSDB_LITE
155 int main(int /*argc*/, char** /*argv*/) {
156 fprintf(stderr
, "Not supported in lite mode.\n");
159 #endif // ROCKSDB_LITE