]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/tools/db_repl_stress.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / tools / db_repl_stress.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7#ifndef GFLAGS
8#include <cstdio>
9int main() {
10 fprintf(stderr, "Please install gflags to run rocksdb tools\n");
11 return 1;
12}
13#else
14
7c673cae 15#include <atomic>
11fdf7f2 16#include <cstdio>
7c673cae
FG
17
18#include "db/write_batch_internal.h"
19#include "rocksdb/db.h"
20#include "rocksdb/types.h"
f67539c2 21#include "test_util/testutil.h"
11fdf7f2 22#include "util/gflags_compat.h"
7c673cae
FG
23
24// Run a thread to perform Put's.
25// Another thread uses GetUpdatesSince API to keep getting the updates.
26// options :
27// --num_inserts = the num of inserts the first thread should perform.
28// --wal_ttl = the wal ttl for the run.
29
f67539c2 30using namespace ROCKSDB_NAMESPACE;
7c673cae 31
11fdf7f2
TL
32using GFLAGS_NAMESPACE::ParseCommandLineFlags;
33using GFLAGS_NAMESPACE::SetUsageMessage;
7c673cae
FG
34
35struct DataPumpThread {
36 size_t no_records;
11fdf7f2 37 DB* db; // Assumption DB is Open'ed already.
7c673cae
FG
38};
39
7c673cae
FG
40static void DataPumpThreadBody(void* arg) {
41 DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
42 DB* db = t->db;
43 Random rnd(301);
44 size_t i = 0;
11fdf7f2 45 while (i++ < t->no_records) {
20effc67
TL
46 if (!db->Put(WriteOptions(), Slice(rnd.RandomString(500)),
47 Slice(rnd.RandomString(500)))
11fdf7f2 48 .ok()) {
7c673cae
FG
49 fprintf(stderr, "Error in put\n");
50 exit(1);
51 }
52 }
53}
54
55struct ReplicationThread {
56 std::atomic<bool> stop;
57 DB* db;
58 volatile size_t no_read;
59};
60
61static void ReplicationThreadBody(void* arg) {
62 ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
63 DB* db = t->db;
494da23a 64 std::unique_ptr<TransactionLogIterator> iter;
7c673cae
FG
65 SequenceNumber currentSeqNum = 1;
66 while (!t->stop.load(std::memory_order_acquire)) {
67 iter.reset();
68 Status s;
11fdf7f2 69 while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
7c673cae
FG
70 if (t->stop.load(std::memory_order_acquire)) {
71 return;
72 }
73 }
74 fprintf(stderr, "Refreshing iterator\n");
11fdf7f2 75 for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
7c673cae
FG
76 BatchResult res = iter->GetBatch();
77 if (res.sequence != currentSeqNum) {
11fdf7f2
TL
78 fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
79 (long)currentSeqNum, (long)res.sequence);
7c673cae
FG
80 exit(1);
81 }
82 }
83 }
84}
85
11fdf7f2
TL
86DEFINE_uint64(num_inserts, 1000,
87 "the num of inserts the first thread should"
7c673cae
FG
88 " perform.");
89DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
11fdf7f2
TL
90DEFINE_uint64(wal_size_limit_MB, 10,
91 "the wal size limit for the run"
7c673cae
FG
92 "(in MB)");
93
94int main(int argc, const char** argv) {
95 SetUsageMessage(
96 std::string("\nUSAGE:\n") + std::string(argv[0]) +
97 " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
98 " --wal_size_limit_MB=<WAL_size_limit_MB>");
99 ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
100
101 Env* env = Env::Default();
102 std::string default_db_path;
103 env->GetTestDirectory(&default_db_path);
104 default_db_path += "db_repl_stress";
105 Options options;
106 options.create_if_missing = true;
107 options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
108 options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
109 DB* db;
110 DestroyDB(default_db_path, options);
111
112 Status s = DB::Open(options, default_db_path, &db);
113
114 if (!s.ok()) {
115 fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
116 exit(1);
117 }
118
119 DataPumpThread dataPump;
120 dataPump.no_records = FLAGS_num_inserts;
121 dataPump.db = db;
122 env->StartThread(DataPumpThreadBody, &dataPump);
123
124 ReplicationThread replThread;
125 replThread.db = db;
126 replThread.no_read = 0;
127 replThread.stop.store(false, std::memory_order_release);
128
129 env->StartThread(ReplicationThreadBody, &replThread);
11fdf7f2
TL
130 while (replThread.no_read < FLAGS_num_inserts)
131 ;
7c673cae
FG
132 replThread.stop.store(true, std::memory_order_release);
133 if (replThread.no_read < dataPump.no_records) {
134 // no. read should be => than inserted.
135 fprintf(stderr,
136 "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt
137 " Written : %" ROCKSDB_PRIszt "\n",
138 replThread.no_read, dataPump.no_records);
139 exit(1);
140 }
141 fprintf(stderr, "Successful!\n");
142 exit(0);
143}
144
145#endif // GFLAGS
146
147#else // ROCKSDB_LITE
148#include <stdio.h>
11fdf7f2 149int main(int /*argc*/, char** /*argv*/) {
7c673cae
FG
150 fprintf(stderr, "Not supported in lite mode.\n");
151 return 1;
152}
153#endif // ROCKSDB_LITE