]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | #ifndef GFLAGS | |
8 | #include <cstdio> | |
9 | int main() { | |
10 | fprintf(stderr, "Please install gflags to run rocksdb tools\n"); | |
11 | return 1; | |
12 | } | |
13 | #else | |
14 | ||
7c673cae | 15 | #include <atomic> |
11fdf7f2 | 16 | #include <cstdio> |
7c673cae FG |
17 | |
18 | #include "db/write_batch_internal.h" | |
19 | #include "rocksdb/db.h" | |
20 | #include "rocksdb/types.h" | |
f67539c2 | 21 | #include "test_util/testutil.h" |
11fdf7f2 | 22 | #include "util/gflags_compat.h" |
7c673cae FG |
23 | |
24 | // Run a thread to perform Put's. | |
25 | // Another thread uses GetUpdatesSince API to keep getting the updates. | |
26 | // options : | |
27 | // --num_inserts = the num of inserts the first thread should perform. | |
28 | // --wal_ttl = the wal ttl for the run. | |
29 | ||
f67539c2 | 30 | using namespace ROCKSDB_NAMESPACE; |
7c673cae | 31 | |
11fdf7f2 TL |
32 | using GFLAGS_NAMESPACE::ParseCommandLineFlags; |
33 | using GFLAGS_NAMESPACE::SetUsageMessage; | |
7c673cae FG |
34 | |
35 | struct DataPumpThread { | |
36 | size_t no_records; | |
11fdf7f2 | 37 | DB* db; // Assumption DB is Open'ed already. |
7c673cae FG |
38 | }; |
39 | ||
7c673cae FG |
40 | static void DataPumpThreadBody(void* arg) { |
41 | DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg); | |
42 | DB* db = t->db; | |
43 | Random rnd(301); | |
44 | size_t i = 0; | |
11fdf7f2 | 45 | while (i++ < t->no_records) { |
20effc67 TL |
46 | if (!db->Put(WriteOptions(), Slice(rnd.RandomString(500)), |
47 | Slice(rnd.RandomString(500))) | |
11fdf7f2 | 48 | .ok()) { |
7c673cae FG |
49 | fprintf(stderr, "Error in put\n"); |
50 | exit(1); | |
51 | } | |
52 | } | |
53 | } | |
54 | ||
55 | struct ReplicationThread { | |
56 | std::atomic<bool> stop; | |
57 | DB* db; | |
58 | volatile size_t no_read; | |
59 | }; | |
60 | ||
61 | static void ReplicationThreadBody(void* arg) { | |
62 | ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg); | |
63 | DB* db = t->db; | |
494da23a | 64 | std::unique_ptr<TransactionLogIterator> iter; |
7c673cae FG |
65 | SequenceNumber currentSeqNum = 1; |
66 | while (!t->stop.load(std::memory_order_acquire)) { | |
67 | iter.reset(); | |
68 | Status s; | |
11fdf7f2 | 69 | while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) { |
7c673cae FG |
70 | if (t->stop.load(std::memory_order_acquire)) { |
71 | return; | |
72 | } | |
73 | } | |
74 | fprintf(stderr, "Refreshing iterator\n"); | |
11fdf7f2 | 75 | for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) { |
7c673cae FG |
76 | BatchResult res = iter->GetBatch(); |
77 | if (res.sequence != currentSeqNum) { | |
11fdf7f2 TL |
78 | fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", |
79 | (long)currentSeqNum, (long)res.sequence); | |
7c673cae FG |
80 | exit(1); |
81 | } | |
82 | } | |
83 | } | |
84 | } | |
85 | ||
11fdf7f2 TL |
86 | DEFINE_uint64(num_inserts, 1000, |
87 | "the num of inserts the first thread should" | |
7c673cae FG |
88 | " perform."); |
89 | DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)"); | |
11fdf7f2 TL |
90 | DEFINE_uint64(wal_size_limit_MB, 10, |
91 | "the wal size limit for the run" | |
7c673cae FG |
92 | "(in MB)"); |
93 | ||
94 | int main(int argc, const char** argv) { | |
95 | SetUsageMessage( | |
96 | std::string("\nUSAGE:\n") + std::string(argv[0]) + | |
97 | " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" + | |
98 | " --wal_size_limit_MB=<WAL_size_limit_MB>"); | |
99 | ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true); | |
100 | ||
101 | Env* env = Env::Default(); | |
102 | std::string default_db_path; | |
103 | env->GetTestDirectory(&default_db_path); | |
104 | default_db_path += "db_repl_stress"; | |
105 | Options options; | |
106 | options.create_if_missing = true; | |
107 | options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds; | |
108 | options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB; | |
109 | DB* db; | |
110 | DestroyDB(default_db_path, options); | |
111 | ||
112 | Status s = DB::Open(options, default_db_path, &db); | |
113 | ||
114 | if (!s.ok()) { | |
115 | fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str()); | |
116 | exit(1); | |
117 | } | |
118 | ||
119 | DataPumpThread dataPump; | |
120 | dataPump.no_records = FLAGS_num_inserts; | |
121 | dataPump.db = db; | |
122 | env->StartThread(DataPumpThreadBody, &dataPump); | |
123 | ||
124 | ReplicationThread replThread; | |
125 | replThread.db = db; | |
126 | replThread.no_read = 0; | |
127 | replThread.stop.store(false, std::memory_order_release); | |
128 | ||
129 | env->StartThread(ReplicationThreadBody, &replThread); | |
11fdf7f2 TL |
130 | while (replThread.no_read < FLAGS_num_inserts) |
131 | ; | |
7c673cae FG |
132 | replThread.stop.store(true, std::memory_order_release); |
133 | if (replThread.no_read < dataPump.no_records) { | |
134 | // no. read should be => than inserted. | |
135 | fprintf(stderr, | |
136 | "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt | |
137 | " Written : %" ROCKSDB_PRIszt "\n", | |
138 | replThread.no_read, dataPump.no_records); | |
139 | exit(1); | |
140 | } | |
141 | fprintf(stderr, "Successful!\n"); | |
142 | exit(0); | |
143 | } | |
144 | ||
145 | #endif // GFLAGS | |
146 | ||
147 | #else // ROCKSDB_LITE | |
148 | #include <stdio.h> | |
11fdf7f2 | 149 | int main(int /*argc*/, char** /*argv*/) { |
7c673cae FG |
150 | fprintf(stderr, "Not supported in lite mode.\n"); |
151 | return 1; | |
152 | } | |
153 | #endif // ROCKSDB_LITE |