]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / cassandra / cassandra_functional_test.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include <iostream>
20effc67 7
f67539c2 8#include "db/db_impl/db_impl.h"
11fdf7f2 9#include "rocksdb/db.h"
11fdf7f2
TL
10#include "rocksdb/merge_operator.h"
11#include "rocksdb/utilities/db_ttl.h"
f67539c2 12#include "test_util/testharness.h"
20effc67 13#include "util/cast_util.h"
11fdf7f2 14#include "util/random.h"
11fdf7f2
TL
15#include "utilities/cassandra/cassandra_compaction_filter.h"
16#include "utilities/cassandra/merge_operator.h"
17#include "utilities/cassandra/test_utils.h"
f67539c2 18#include "utilities/merge_operators.h"
11fdf7f2 19
11fdf7f2 20
f67539c2 21namespace ROCKSDB_NAMESPACE {
11fdf7f2
TL
22namespace cassandra {
23
24// Path to the database on file system
25const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test");
26
27class CassandraStore {
28 public:
29 explicit CassandraStore(std::shared_ptr<DB> db)
30 : db_(db), write_option_(), get_option_() {
31 assert(db);
32 }
33
34 bool Append(const std::string& key, const RowValue& val){
35 std::string result;
36 val.Serialize(&result);
37 Slice valSlice(result.data(), result.size());
38 auto s = db_->Merge(write_option_, key, valSlice);
39
40 if (s.ok()) {
41 return true;
42 } else {
43 std::cerr << "ERROR " << s.ToString() << std::endl;
44 return false;
45 }
46 }
47
48 bool Put(const std::string& key, const RowValue& val) {
49 std::string result;
50 val.Serialize(&result);
51 Slice valSlice(result.data(), result.size());
52 auto s = db_->Put(write_option_, key, valSlice);
53 if (s.ok()) {
54 return true;
55 } else {
56 std::cerr << "ERROR " << s.ToString() << std::endl;
57 return false;
58 }
59 }
60
61 void Flush() {
62 dbfull()->TEST_FlushMemTable();
63 dbfull()->TEST_WaitForCompact();
64 }
65
66 void Compact() {
67 dbfull()->TEST_CompactRange(
68 0, nullptr, nullptr, db_->DefaultColumnFamily());
69 }
70
71 std::tuple<bool, RowValue> Get(const std::string& key){
72 std::string result;
73 auto s = db_->Get(get_option_, key, &result);
74
75 if (s.ok()) {
76 return std::make_tuple(true,
77 RowValue::Deserialize(result.data(),
78 result.size()));
79 }
80
81 if (!s.IsNotFound()) {
82 std::cerr << "ERROR " << s.ToString() << std::endl;
83 }
84
85 return std::make_tuple(false, RowValue(0, 0));
86 }
87
88 private:
89 std::shared_ptr<DB> db_;
90 WriteOptions write_option_;
91 ReadOptions get_option_;
92
20effc67 93 DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_.get()); }
11fdf7f2
TL
94};
95
96class TestCompactionFilterFactory : public CompactionFilterFactory {
97public:
98 explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
99 int32_t gc_grace_period_in_seconds)
100 : purge_ttl_on_expiration_(purge_ttl_on_expiration),
101 gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
102
494da23a 103 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
11fdf7f2 104 const CompactionFilter::Context& /*context*/) override {
494da23a 105 return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
11fdf7f2 106 purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
494da23a 107 }
11fdf7f2 108
494da23a 109 const char* Name() const override { return "TestCompactionFilterFactory"; }
11fdf7f2
TL
110
111private:
112 bool purge_ttl_on_expiration_;
113 int32_t gc_grace_period_in_seconds_;
114};
115
116
117// The class for unit-testing
118class CassandraFunctionalTest : public testing::Test {
119public:
120 CassandraFunctionalTest() {
121 DestroyDB(kDbName, Options()); // Start each test with a fresh DB
122 }
123
124 std::shared_ptr<DB> OpenDb() {
125 DB* db;
126 Options options;
127 options.create_if_missing = true;
128 options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
129 auto* cf_factory = new TestCompactionFilterFactory(
130 purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
131 options.compaction_filter_factory.reset(cf_factory);
132 EXPECT_OK(DB::Open(options, kDbName, &db));
133 return std::shared_ptr<DB>(db);
134 }
135
136 bool purge_ttl_on_expiration_ = false;
137 int32_t gc_grace_period_in_seconds_ = 100;
138};
139
140// THE TEST CASES BEGIN HERE
141
142TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
143 CassandraStore store(OpenDb());
144 int64_t now = time(nullptr);
145
146 store.Append("k1", CreateTestRowValue({
147 CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
148 CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
149 CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
150 }));
151 store.Append("k1",CreateTestRowValue({
152 CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
153 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
154 CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
155 CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
156 }));
157 store.Append("k1", CreateTestRowValue({
158 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
159 CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
160 CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
161 CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
162 }));
163
164 auto ret = store.Get("k1");
165
166 ASSERT_TRUE(std::get<0>(ret));
167 RowValue& merged = std::get<1>(ret);
20effc67
TL
168 EXPECT_EQ(merged.get_columns().size(), 5);
169 VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 0,
170 ToMicroSeconds(now + 6));
171 VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 1,
172 ToMicroSeconds(now + 8));
173 VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 2,
174 ToMicroSeconds(now + 7));
175 VerifyRowValueColumns(merged.get_columns(), 3, kExpiringColumn, 7,
176 ToMicroSeconds(now + 17));
177 VerifyRowValueColumns(merged.get_columns(), 4, kTombstone, 11,
178 ToMicroSeconds(now + 11));
11fdf7f2
TL
179}
180
181TEST_F(CassandraFunctionalTest,
182 CompactionShouldConvertExpiredColumnsToTombstone) {
183 CassandraStore store(OpenDb());
184 int64_t now= time(nullptr);
185
186 store.Append("k1", CreateTestRowValue({
187 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
188 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired
189 CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
190 }));
191
192 store.Flush();
193
194 store.Append("k1",CreateTestRowValue({
195 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
196 CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
197 }));
198
199 store.Flush();
200 store.Compact();
201
202 auto ret = store.Get("k1");
203 ASSERT_TRUE(std::get<0>(ret));
204 RowValue& merged = std::get<1>(ret);
20effc67
TL
205 EXPECT_EQ(merged.get_columns().size(), 4);
206 VerifyRowValueColumns(merged.get_columns(), 0, kTombstone, 0,
207 ToMicroSeconds(now - 10));
208 VerifyRowValueColumns(merged.get_columns(), 1, kExpiringColumn, 1,
209 ToMicroSeconds(now - kTtl + 10));
210 VerifyRowValueColumns(merged.get_columns(), 2, kColumn, 2,
211 ToMicroSeconds(now));
212 VerifyRowValueColumns(merged.get_columns(), 3, kTombstone, 3,
213 ToMicroSeconds(now));
11fdf7f2
TL
214}
215
216
217TEST_F(CassandraFunctionalTest,
218 CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
219 purge_ttl_on_expiration_ = true;
220 CassandraStore store(OpenDb());
221 int64_t now = time(nullptr);
222
223 store.Append("k1", CreateTestRowValue({
224 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
225 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired
226 CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
227 }));
228
229 store.Flush();
230
231 store.Append("k1",CreateTestRowValue({
232 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
233 CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
234 }));
235
236 store.Flush();
237 store.Compact();
238
239 auto ret = store.Get("k1");
240 ASSERT_TRUE(std::get<0>(ret));
241 RowValue& merged = std::get<1>(ret);
20effc67
TL
242 EXPECT_EQ(merged.get_columns().size(), 3);
243 VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 1,
244 ToMicroSeconds(now));
245 VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 2,
246 ToMicroSeconds(now));
247 VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 3,
248 ToMicroSeconds(now));
11fdf7f2
TL
249}
250
251TEST_F(CassandraFunctionalTest,
252 CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
253 purge_ttl_on_expiration_ = true;
254 CassandraStore store(OpenDb());
255 int64_t now = time(nullptr);
256
257 store.Append("k1", CreateTestRowValue({
258 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)),
259 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)),
260 }));
261
262 store.Flush();
263
264 store.Append("k1",CreateTestRowValue({
265 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)),
266 }));
267
268 store.Flush();
269 store.Compact();
270 ASSERT_FALSE(std::get<0>(store.Get("k1")));
271}
272
273TEST_F(CassandraFunctionalTest,
274 CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
275 purge_ttl_on_expiration_ = true;
276 CassandraStore store(OpenDb());
277 int64_t now = time(nullptr);
278
279 store.Append("k1", CreateTestRowValue({
280 CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
281 CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))
282 }));
283
284 store.Append("k2", CreateTestRowValue({
285 CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now))
286 }));
287
288 store.Flush();
289
290 store.Append("k1",CreateTestRowValue({
291 CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
292 }));
293
294 store.Flush();
295 store.Compact();
296
297 auto ret = store.Get("k1");
298 ASSERT_TRUE(std::get<0>(ret));
299 RowValue& gced = std::get<1>(ret);
20effc67
TL
300 EXPECT_EQ(gced.get_columns().size(), 1);
301 VerifyRowValueColumns(gced.get_columns(), 0, kColumn, 1, ToMicroSeconds(now));
11fdf7f2
TL
302}
303
304TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
305 purge_ttl_on_expiration_ = true;
306 CassandraStore store(OpenDb());
307 int64_t now = time(nullptr);
308
309 store.Put("k1", CreateTestRowValue({
310 CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
311 }));
312
313 store.Flush();
314 store.Compact();
315 ASSERT_FALSE(std::get<0>(store.Get("k1")));
316}
317
318} // namespace cassandra
f67539c2 319} // namespace ROCKSDB_NAMESPACE
11fdf7f2
TL
320
321int main(int argc, char** argv) {
322 ::testing::InitGoogleTest(&argc, argv);
323 return RUN_ALL_TESTS();
324}