]>
Commit | Line | Data |
---|---|---|
494da23a TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | // How to use this example | |
7 | // Open two terminals, in one of them, run `./multi_processes_example 0` to | |
8 | // start a process running the primary instance. This will create a new DB in | |
9 | // kDBPath. The process will run for a while inserting keys to the normal | |
10 | // RocksDB database. | |
11 | // Next, go to the other terminal and run `./multi_processes_example 1` to | |
12 | // start a process running the secondary instance. This will create a secondary | |
13 | // instance following the aforementioned primary instance. This process will | |
14 | // run for a while, tailing the logs of the primary. After process with primary | |
15 | // instance exits, this process will keep running until you hit 'CTRL+C'. | |
16 | ||
494da23a | 17 | #include <chrono> |
f67539c2 | 18 | #include <cinttypes> |
494da23a TL |
19 | #include <cstdio> |
20 | #include <cstdlib> | |
21 | #include <ctime> | |
22 | #include <string> | |
23 | #include <thread> | |
24 | #include <vector> | |
25 | ||
20effc67 TL |
26 | // TODO: port this example to other systems. It should be straightforward for |
27 | // POSIX-compliant systems. | |
494da23a TL |
28 | #if defined(OS_LINUX) |
29 | #include <dirent.h> | |
30 | #include <signal.h> | |
31 | #include <sys/stat.h> | |
32 | #include <sys/types.h> | |
33 | #include <sys/wait.h> | |
34 | #include <unistd.h> | |
494da23a TL |
35 | |
36 | #include "rocksdb/db.h" | |
37 | #include "rocksdb/options.h" | |
38 | #include "rocksdb/slice.h" | |
39 | ||
f67539c2 TL |
40 | using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor; |
41 | using ROCKSDB_NAMESPACE::ColumnFamilyHandle; | |
42 | using ROCKSDB_NAMESPACE::ColumnFamilyOptions; | |
43 | using ROCKSDB_NAMESPACE::DB; | |
44 | using ROCKSDB_NAMESPACE::FlushOptions; | |
45 | using ROCKSDB_NAMESPACE::Iterator; | |
46 | using ROCKSDB_NAMESPACE::Options; | |
47 | using ROCKSDB_NAMESPACE::ReadOptions; | |
48 | using ROCKSDB_NAMESPACE::Slice; | |
49 | using ROCKSDB_NAMESPACE::Status; | |
50 | using ROCKSDB_NAMESPACE::WriteOptions; | |
494da23a TL |
51 | |
52 | const std::string kDBPath = "/tmp/rocksdb_multi_processes_example"; | |
53 | const std::string kPrimaryStatusFile = | |
54 | "/tmp/rocksdb_multi_processes_example_primary_status"; | |
55 | const uint64_t kMaxKey = 600000; | |
56 | const size_t kMaxValueLength = 256; | |
57 | const size_t kNumKeysPerFlush = 1000; | |
58 | ||
59 | const std::vector<std::string>& GetColumnFamilyNames() { | |
60 | static std::vector<std::string> column_family_names = { | |
f67539c2 | 61 | ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, "pikachu"}; |
494da23a TL |
62 | return column_family_names; |
63 | } | |
64 | ||
65 | inline bool IsLittleEndian() { | |
66 | uint32_t x = 1; | |
67 | return *reinterpret_cast<char*>(&x) != 0; | |
68 | } | |
69 | ||
70 | static std::atomic<int>& ShouldSecondaryWait() { | |
71 | static std::atomic<int> should_secondary_wait{1}; | |
72 | return should_secondary_wait; | |
73 | } | |
74 | ||
75 | static std::string Key(uint64_t k) { | |
76 | std::string ret; | |
77 | if (IsLittleEndian()) { | |
78 | ret.append(reinterpret_cast<char*>(&k), sizeof(k)); | |
79 | } else { | |
80 | char buf[sizeof(k)]; | |
81 | buf[0] = k & 0xff; | |
82 | buf[1] = (k >> 8) & 0xff; | |
83 | buf[2] = (k >> 16) & 0xff; | |
84 | buf[3] = (k >> 24) & 0xff; | |
85 | buf[4] = (k >> 32) & 0xff; | |
86 | buf[5] = (k >> 40) & 0xff; | |
87 | buf[6] = (k >> 48) & 0xff; | |
88 | buf[7] = (k >> 56) & 0xff; | |
89 | ret.append(buf, sizeof(k)); | |
90 | } | |
91 | size_t i = 0, j = ret.size() - 1; | |
92 | while (i < j) { | |
93 | char tmp = ret[i]; | |
94 | ret[i] = ret[j]; | |
95 | ret[j] = tmp; | |
96 | ++i; | |
97 | --j; | |
98 | } | |
99 | return ret; | |
100 | } | |
101 | ||
102 | static uint64_t Key(std::string key) { | |
103 | assert(key.size() == sizeof(uint64_t)); | |
104 | size_t i = 0, j = key.size() - 1; | |
105 | while (i < j) { | |
106 | char tmp = key[i]; | |
107 | key[i] = key[j]; | |
108 | key[j] = tmp; | |
109 | ++i; | |
110 | --j; | |
111 | } | |
112 | uint64_t ret = 0; | |
113 | if (IsLittleEndian()) { | |
114 | memcpy(&ret, key.c_str(), sizeof(uint64_t)); | |
115 | } else { | |
116 | const char* buf = key.c_str(); | |
117 | ret |= static_cast<uint64_t>(buf[0]); | |
118 | ret |= (static_cast<uint64_t>(buf[1]) << 8); | |
119 | ret |= (static_cast<uint64_t>(buf[2]) << 16); | |
120 | ret |= (static_cast<uint64_t>(buf[3]) << 24); | |
121 | ret |= (static_cast<uint64_t>(buf[4]) << 32); | |
122 | ret |= (static_cast<uint64_t>(buf[5]) << 40); | |
123 | ret |= (static_cast<uint64_t>(buf[6]) << 48); | |
124 | ret |= (static_cast<uint64_t>(buf[7]) << 56); | |
125 | } | |
126 | return ret; | |
127 | } | |
128 | ||
129 | static Slice GenerateRandomValue(const size_t max_length, char scratch[]) { | |
130 | size_t sz = 1 + (std::rand() % max_length); | |
131 | int rnd = std::rand(); | |
132 | for (size_t i = 0; i != sz; ++i) { | |
133 | scratch[i] = static_cast<char>(rnd ^ i); | |
134 | } | |
135 | return Slice(scratch, sz); | |
136 | } | |
137 | ||
138 | static bool ShouldCloseDB() { return true; } | |
139 | ||
494da23a TL |
140 | void CreateDB() { |
141 | long my_pid = static_cast<long>(getpid()); | |
142 | Options options; | |
f67539c2 | 143 | Status s = ROCKSDB_NAMESPACE::DestroyDB(kDBPath, options); |
494da23a TL |
144 | if (!s.ok()) { |
145 | fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid, | |
146 | s.ToString().c_str()); | |
147 | assert(false); | |
148 | } | |
149 | options.create_if_missing = true; | |
150 | DB* db = nullptr; | |
151 | s = DB::Open(options, kDBPath, &db); | |
152 | if (!s.ok()) { | |
153 | fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid, | |
154 | s.ToString().c_str()); | |
155 | assert(false); | |
156 | } | |
157 | std::vector<ColumnFamilyHandle*> handles; | |
158 | ColumnFamilyOptions cf_opts(options); | |
159 | for (const auto& cf_name : GetColumnFamilyNames()) { | |
f67539c2 | 160 | if (ROCKSDB_NAMESPACE::kDefaultColumnFamilyName != cf_name) { |
494da23a TL |
161 | ColumnFamilyHandle* handle = nullptr; |
162 | s = db->CreateColumnFamily(cf_opts, cf_name, &handle); | |
163 | if (!s.ok()) { | |
164 | fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid, | |
165 | cf_name.c_str(), s.ToString().c_str()); | |
166 | assert(false); | |
167 | } | |
168 | handles.push_back(handle); | |
169 | } | |
170 | } | |
171 | fprintf(stdout, "[process %ld] Column families created\n", my_pid); | |
172 | for (auto h : handles) { | |
173 | delete h; | |
174 | } | |
175 | handles.clear(); | |
176 | delete db; | |
177 | } | |
178 | ||
179 | void RunPrimary() { | |
180 | long my_pid = static_cast<long>(getpid()); | |
181 | fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid); | |
182 | CreateDB(); | |
183 | std::srand(time(nullptr)); | |
184 | DB* db = nullptr; | |
185 | Options options; | |
186 | options.create_if_missing = false; | |
187 | std::vector<ColumnFamilyDescriptor> column_families; | |
188 | for (const auto& cf_name : GetColumnFamilyNames()) { | |
189 | column_families.push_back(ColumnFamilyDescriptor(cf_name, options)); | |
190 | } | |
191 | std::vector<ColumnFamilyHandle*> handles; | |
192 | WriteOptions write_opts; | |
193 | char val_buf[kMaxValueLength] = {0}; | |
194 | uint64_t curr_key = 0; | |
195 | while (curr_key < kMaxKey) { | |
196 | Status s; | |
197 | if (nullptr == db) { | |
198 | s = DB::Open(options, kDBPath, column_families, &handles, &db); | |
199 | if (!s.ok()) { | |
200 | fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid, | |
201 | s.ToString().c_str()); | |
202 | assert(false); | |
203 | } | |
204 | } | |
205 | assert(nullptr != db); | |
206 | assert(handles.size() == GetColumnFamilyNames().size()); | |
207 | for (auto h : handles) { | |
208 | assert(nullptr != h); | |
209 | for (size_t i = 0; i != kNumKeysPerFlush; ++i) { | |
210 | Slice key = Key(curr_key + static_cast<uint64_t>(i)); | |
211 | Slice value = GenerateRandomValue(kMaxValueLength, val_buf); | |
212 | s = db->Put(write_opts, h, key, value); | |
213 | if (!s.ok()) { | |
214 | fprintf(stderr, "[process %ld] Failed to insert\n", my_pid); | |
215 | assert(false); | |
216 | } | |
217 | } | |
218 | s = db->Flush(FlushOptions(), h); | |
219 | if (!s.ok()) { | |
220 | fprintf(stderr, "[process %ld] Failed to flush\n", my_pid); | |
221 | assert(false); | |
222 | } | |
223 | } | |
224 | curr_key += static_cast<uint64_t>(kNumKeysPerFlush); | |
225 | if (ShouldCloseDB()) { | |
226 | for (auto h : handles) { | |
227 | delete h; | |
228 | } | |
229 | handles.clear(); | |
230 | delete db; | |
231 | db = nullptr; | |
232 | } | |
233 | } | |
234 | if (nullptr != db) { | |
235 | for (auto h : handles) { | |
236 | delete h; | |
237 | } | |
238 | handles.clear(); | |
239 | delete db; | |
240 | db = nullptr; | |
241 | } | |
242 | fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid); | |
243 | } | |
244 | ||
245 | void secondary_instance_sigint_handler(int signal) { | |
246 | ShouldSecondaryWait().store(0, std::memory_order_relaxed); | |
247 | fprintf(stdout, "\n"); | |
248 | fflush(stdout); | |
249 | }; | |
250 | ||
251 | void RunSecondary() { | |
252 | ::signal(SIGINT, secondary_instance_sigint_handler); | |
253 | long my_pid = static_cast<long>(getpid()); | |
254 | const std::string kSecondaryPath = | |
255 | "/tmp/rocksdb_multi_processes_example_secondary"; | |
256 | // Create directory if necessary | |
257 | if (nullptr == opendir(kSecondaryPath.c_str())) { | |
258 | int ret = | |
259 | mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); | |
260 | if (ret < 0) { | |
261 | perror("failed to create directory for secondary instance"); | |
262 | exit(0); | |
263 | } | |
264 | } | |
265 | DB* db = nullptr; | |
266 | Options options; | |
267 | options.create_if_missing = false; | |
268 | options.max_open_files = -1; | |
269 | Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db); | |
270 | if (!s.ok()) { | |
271 | fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n", | |
272 | my_pid, s.ToString().c_str()); | |
273 | assert(false); | |
274 | } else { | |
275 | fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid); | |
276 | } | |
277 | ||
278 | ReadOptions ropts; | |
279 | ropts.verify_checksums = true; | |
280 | ropts.total_order_seek = true; | |
281 | ||
282 | std::vector<std::thread> test_threads; | |
283 | test_threads.emplace_back([&]() { | |
284 | while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { | |
285 | std::unique_ptr<Iterator> iter(db->NewIterator(ropts)); | |
286 | iter->SeekToFirst(); | |
287 | size_t count = 0; | |
288 | for (; iter->Valid(); iter->Next()) { | |
289 | ++count; | |
290 | } | |
291 | } | |
292 | fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid); | |
293 | }); | |
294 | ||
295 | test_threads.emplace_back([&]() { | |
296 | std::srand(time(nullptr)); | |
297 | while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { | |
298 | Slice key = Key(std::rand() % kMaxKey); | |
299 | std::string value; | |
300 | db->Get(ropts, key, &value); | |
301 | } | |
20effc67 | 302 | fprintf(stdout, "[process %ld] Point lookup thread finished\n", my_pid); |
494da23a TL |
303 | }); |
304 | ||
305 | uint64_t curr_key = 0; | |
306 | while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { | |
307 | s = db->TryCatchUpWithPrimary(); | |
308 | if (!s.ok()) { | |
309 | fprintf(stderr, | |
310 | "[process %ld] error while trying to catch up with " | |
311 | "primary %s\n", | |
312 | my_pid, s.ToString().c_str()); | |
313 | assert(false); | |
314 | } | |
315 | { | |
316 | std::unique_ptr<Iterator> iter(db->NewIterator(ropts)); | |
317 | if (!iter) { | |
318 | fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid); | |
319 | assert(false); | |
320 | } | |
321 | iter->SeekToLast(); | |
322 | if (iter->Valid()) { | |
323 | uint64_t curr_max_key = Key(iter->key().ToString()); | |
324 | if (curr_max_key != curr_key) { | |
325 | fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid, | |
326 | curr_key); | |
327 | curr_key = curr_max_key; | |
328 | } | |
329 | } | |
330 | } | |
331 | std::this_thread::sleep_for(std::chrono::seconds(1)); | |
332 | } | |
333 | s = db->TryCatchUpWithPrimary(); | |
334 | if (!s.ok()) { | |
335 | fprintf(stderr, | |
336 | "[process %ld] error while trying to catch up with " | |
337 | "primary %s\n", | |
338 | my_pid, s.ToString().c_str()); | |
339 | assert(false); | |
340 | } | |
341 | ||
342 | std::vector<ColumnFamilyDescriptor> column_families; | |
343 | for (const auto& cf_name : GetColumnFamilyNames()) { | |
344 | column_families.push_back(ColumnFamilyDescriptor(cf_name, options)); | |
345 | } | |
346 | std::vector<ColumnFamilyHandle*> handles; | |
347 | DB* verification_db = nullptr; | |
348 | s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles, | |
349 | &verification_db); | |
350 | assert(s.ok()); | |
351 | Iterator* iter1 = verification_db->NewIterator(ropts); | |
352 | iter1->SeekToFirst(); | |
353 | ||
354 | Iterator* iter = db->NewIterator(ropts); | |
355 | iter->SeekToFirst(); | |
356 | for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) { | |
357 | if (iter->key().ToString() != iter1->key().ToString()) { | |
358 | fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n", | |
359 | Key(iter->key().ToString()), Key(iter1->key().ToString())); | |
360 | assert(false); | |
361 | } else if (iter->value().ToString() != iter1->value().ToString()) { | |
362 | fprintf(stderr, "Value mismatch\n"); | |
363 | assert(false); | |
364 | } | |
365 | } | |
366 | fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid); | |
367 | for (auto& thr : test_threads) { | |
368 | thr.join(); | |
369 | } | |
370 | delete iter; | |
371 | delete iter1; | |
372 | delete db; | |
373 | delete verification_db; | |
374 | } | |
375 | ||
376 | int main(int argc, char** argv) { | |
377 | if (argc < 2) { | |
378 | fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]); | |
379 | return 0; | |
380 | } | |
381 | if (atoi(argv[1]) == 0) { | |
382 | RunPrimary(); | |
383 | } else { | |
384 | RunSecondary(); | |
385 | } | |
386 | return 0; | |
387 | } | |
388 | #else // OS_LINUX | |
389 | int main() { | |
20effc67 | 390 | fprintf(stderr, "Not implemented.\n"); |
494da23a TL |
391 | return 0; |
392 | } | |
393 | #endif // !OS_LINUX |