// --num_inserts = the num of inserts the first thread should perform.
// --wal_ttl = the wal ttl for the run.
-using namespace ROCKSDB_NAMESPACE;
+DEFINE_uint64(num_inserts, 1000,
+ "the num of inserts the first thread should"
+ " perform.");
+DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
+DEFINE_uint64(wal_size_limit_MB, 10,
+ "the wal size limit for the run"
+ "(in MB)");
+
+using ROCKSDB_NAMESPACE::BatchResult;
+using ROCKSDB_NAMESPACE::DB;
+using ROCKSDB_NAMESPACE::DestroyDB;
+using ROCKSDB_NAMESPACE::Env;
+using ROCKSDB_NAMESPACE::Options;
+using ROCKSDB_NAMESPACE::Random;
+using ROCKSDB_NAMESPACE::SequenceNumber;
+using ROCKSDB_NAMESPACE::Slice;
+using ROCKSDB_NAMESPACE::Status;
+using ROCKSDB_NAMESPACE::TransactionLogIterator;
+using ROCKSDB_NAMESPACE::WriteOptions;
using GFLAGS_NAMESPACE::ParseCommandLineFlags;
using GFLAGS_NAMESPACE::SetUsageMessage;
struct DataPumpThread {
- size_t no_records;
DB* db; // Assumption DB is Open'ed already.
};
DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
DB* db = t->db;
Random rnd(301);
- size_t i = 0;
- while (i++ < t->no_records) {
+ uint64_t i = 0;
+ while (i++ < FLAGS_num_inserts) {
if (!db->Put(WriteOptions(), Slice(rnd.RandomString(500)),
Slice(rnd.RandomString(500)))
.ok()) {
}
}
-struct ReplicationThread {
- std::atomic<bool> stop;
- DB* db;
- volatile size_t no_read;
-};
-
-static void ReplicationThreadBody(void* arg) {
- ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
- DB* db = t->db;
- std::unique_ptr<TransactionLogIterator> iter;
- SequenceNumber currentSeqNum = 1;
- while (!t->stop.load(std::memory_order_acquire)) {
- iter.reset();
- Status s;
- while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
- if (t->stop.load(std::memory_order_acquire)) {
- return;
- }
- }
- fprintf(stderr, "Refreshing iterator\n");
- for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
- BatchResult res = iter->GetBatch();
- if (res.sequence != currentSeqNum) {
- fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
- (long)currentSeqNum, (long)res.sequence);
- exit(1);
- }
- }
- }
-}
-
-DEFINE_uint64(num_inserts, 1000,
- "the num of inserts the first thread should"
- " perform.");
-DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
-DEFINE_uint64(wal_size_limit_MB, 10,
- "the wal size limit for the run"
- "(in MB)");
-
int main(int argc, const char** argv) {
SetUsageMessage(
std::string("\nUSAGE:\n") + std::string(argv[0]) +
}
DataPumpThread dataPump;
- dataPump.no_records = FLAGS_num_inserts;
dataPump.db = db;
env->StartThread(DataPumpThreadBody, &dataPump);
- ReplicationThread replThread;
- replThread.db = db;
- replThread.no_read = 0;
- replThread.stop.store(false, std::memory_order_release);
-
- env->StartThread(ReplicationThreadBody, &replThread);
- while (replThread.no_read < FLAGS_num_inserts)
- ;
- replThread.stop.store(true, std::memory_order_release);
- if (replThread.no_read < dataPump.no_records) {
- // no. read should be => than inserted.
- fprintf(stderr,
- "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt
- " Written : %" ROCKSDB_PRIszt "\n",
- replThread.no_read, dataPump.no_records);
- exit(1);
+ std::unique_ptr<TransactionLogIterator> iter;
+ SequenceNumber currentSeqNum = 1;
+ uint64_t num_read = 0;
+ for (;;) {
+ iter.reset();
+ // Continue to probe a bit more after all received
+ size_t probes = 0;
+ while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
+ probes++;
+ if (probes > 100 && num_read >= FLAGS_num_inserts) {
+ if (num_read > FLAGS_num_inserts) {
+ fprintf(stderr, "Too many updates read: %ld expected: %ld\n",
+ (long)num_read, (long)FLAGS_num_inserts);
+ exit(1);
+ }
+ fprintf(stderr, "Successful!\n");
+ return 0;
+ }
+ }
+ fprintf(stderr, "Refreshing iterator\n");
+ for (; iter->Valid(); iter->Next(), num_read++, currentSeqNum++) {
+ BatchResult res = iter->GetBatch();
+ if (res.sequence != currentSeqNum) {
+ fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
+ (long)currentSeqNum, (long)res.sequence);
+ exit(1);
+ }
+ }
}
- fprintf(stderr, "Successful!\n");
- exit(0);
}
#endif // GFLAGS