]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2017-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #include <iostream> | |
f67539c2 | 7 | #include "db/db_impl/db_impl.h" |
11fdf7f2 | 8 | #include "rocksdb/db.h" |
11fdf7f2 TL |
9 | #include "rocksdb/merge_operator.h" |
10 | #include "rocksdb/utilities/db_ttl.h" | |
f67539c2 | 11 | #include "test_util/testharness.h" |
11fdf7f2 | 12 | #include "util/random.h" |
11fdf7f2 TL |
13 | #include "utilities/cassandra/cassandra_compaction_filter.h" |
14 | #include "utilities/cassandra/merge_operator.h" | |
15 | #include "utilities/cassandra/test_utils.h" | |
f67539c2 | 16 | #include "utilities/merge_operators.h" |
11fdf7f2 | 17 | |
f67539c2 | 18 | using namespace ROCKSDB_NAMESPACE; |
11fdf7f2 | 19 | |
f67539c2 | 20 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
21 | namespace cassandra { |
22 | ||
23 | // Path to the database on file system | |
24 | const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test"); | |
25 | ||
26 | class CassandraStore { | |
27 | public: | |
28 | explicit CassandraStore(std::shared_ptr<DB> db) | |
29 | : db_(db), write_option_(), get_option_() { | |
30 | assert(db); | |
31 | } | |
32 | ||
33 | bool Append(const std::string& key, const RowValue& val){ | |
34 | std::string result; | |
35 | val.Serialize(&result); | |
36 | Slice valSlice(result.data(), result.size()); | |
37 | auto s = db_->Merge(write_option_, key, valSlice); | |
38 | ||
39 | if (s.ok()) { | |
40 | return true; | |
41 | } else { | |
42 | std::cerr << "ERROR " << s.ToString() << std::endl; | |
43 | return false; | |
44 | } | |
45 | } | |
46 | ||
47 | bool Put(const std::string& key, const RowValue& val) { | |
48 | std::string result; | |
49 | val.Serialize(&result); | |
50 | Slice valSlice(result.data(), result.size()); | |
51 | auto s = db_->Put(write_option_, key, valSlice); | |
52 | if (s.ok()) { | |
53 | return true; | |
54 | } else { | |
55 | std::cerr << "ERROR " << s.ToString() << std::endl; | |
56 | return false; | |
57 | } | |
58 | } | |
59 | ||
60 | void Flush() { | |
61 | dbfull()->TEST_FlushMemTable(); | |
62 | dbfull()->TEST_WaitForCompact(); | |
63 | } | |
64 | ||
65 | void Compact() { | |
66 | dbfull()->TEST_CompactRange( | |
67 | 0, nullptr, nullptr, db_->DefaultColumnFamily()); | |
68 | } | |
69 | ||
70 | std::tuple<bool, RowValue> Get(const std::string& key){ | |
71 | std::string result; | |
72 | auto s = db_->Get(get_option_, key, &result); | |
73 | ||
74 | if (s.ok()) { | |
75 | return std::make_tuple(true, | |
76 | RowValue::Deserialize(result.data(), | |
77 | result.size())); | |
78 | } | |
79 | ||
80 | if (!s.IsNotFound()) { | |
81 | std::cerr << "ERROR " << s.ToString() << std::endl; | |
82 | } | |
83 | ||
84 | return std::make_tuple(false, RowValue(0, 0)); | |
85 | } | |
86 | ||
87 | private: | |
88 | std::shared_ptr<DB> db_; | |
89 | WriteOptions write_option_; | |
90 | ReadOptions get_option_; | |
91 | ||
92 | DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_.get()); } | |
93 | }; | |
94 | ||
95 | class TestCompactionFilterFactory : public CompactionFilterFactory { | |
96 | public: | |
97 | explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration, | |
98 | int32_t gc_grace_period_in_seconds) | |
99 | : purge_ttl_on_expiration_(purge_ttl_on_expiration), | |
100 | gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} | |
101 | ||
494da23a | 102 | std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
11fdf7f2 | 103 | const CompactionFilter::Context& /*context*/) override { |
494da23a | 104 | return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter( |
11fdf7f2 | 105 | purge_ttl_on_expiration_, gc_grace_period_in_seconds_)); |
494da23a | 106 | } |
11fdf7f2 | 107 | |
494da23a | 108 | const char* Name() const override { return "TestCompactionFilterFactory"; } |
11fdf7f2 TL |
109 | |
110 | private: | |
111 | bool purge_ttl_on_expiration_; | |
112 | int32_t gc_grace_period_in_seconds_; | |
113 | }; | |
114 | ||
115 | ||
116 | // The class for unit-testing | |
117 | class CassandraFunctionalTest : public testing::Test { | |
118 | public: | |
119 | CassandraFunctionalTest() { | |
120 | DestroyDB(kDbName, Options()); // Start each test with a fresh DB | |
121 | } | |
122 | ||
123 | std::shared_ptr<DB> OpenDb() { | |
124 | DB* db; | |
125 | Options options; | |
126 | options.create_if_missing = true; | |
127 | options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_)); | |
128 | auto* cf_factory = new TestCompactionFilterFactory( | |
129 | purge_ttl_on_expiration_, gc_grace_period_in_seconds_); | |
130 | options.compaction_filter_factory.reset(cf_factory); | |
131 | EXPECT_OK(DB::Open(options, kDbName, &db)); | |
132 | return std::shared_ptr<DB>(db); | |
133 | } | |
134 | ||
135 | bool purge_ttl_on_expiration_ = false; | |
136 | int32_t gc_grace_period_in_seconds_ = 100; | |
137 | }; | |
138 | ||
139 | // THE TEST CASES BEGIN HERE | |
140 | ||
141 | TEST_F(CassandraFunctionalTest, SimpleMergeTest) { | |
142 | CassandraStore store(OpenDb()); | |
143 | int64_t now = time(nullptr); | |
144 | ||
145 | store.Append("k1", CreateTestRowValue({ | |
146 | CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)), | |
147 | CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)), | |
148 | CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)), | |
149 | })); | |
150 | store.Append("k1",CreateTestRowValue({ | |
151 | CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)), | |
152 | CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)), | |
153 | CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)), | |
154 | CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)), | |
155 | })); | |
156 | store.Append("k1", CreateTestRowValue({ | |
157 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)), | |
158 | CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)), | |
159 | CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)), | |
160 | CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)), | |
161 | })); | |
162 | ||
163 | auto ret = store.Get("k1"); | |
164 | ||
165 | ASSERT_TRUE(std::get<0>(ret)); | |
166 | RowValue& merged = std::get<1>(ret); | |
167 | EXPECT_EQ(merged.columns_.size(), 5); | |
168 | VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, ToMicroSeconds(now + 6)); | |
169 | VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, ToMicroSeconds(now + 8)); | |
170 | VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, ToMicroSeconds(now + 7)); | |
171 | VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, ToMicroSeconds(now + 17)); | |
172 | VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, ToMicroSeconds(now + 11)); | |
173 | } | |
174 | ||
175 | TEST_F(CassandraFunctionalTest, | |
176 | CompactionShouldConvertExpiredColumnsToTombstone) { | |
177 | CassandraStore store(OpenDb()); | |
178 | int64_t now= time(nullptr); | |
179 | ||
180 | store.Append("k1", CreateTestRowValue({ | |
181 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired | |
182 | CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired | |
183 | CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) | |
184 | })); | |
185 | ||
186 | store.Flush(); | |
187 | ||
188 | store.Append("k1",CreateTestRowValue({ | |
189 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired | |
190 | CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now)) | |
191 | })); | |
192 | ||
193 | store.Flush(); | |
194 | store.Compact(); | |
195 | ||
196 | auto ret = store.Get("k1"); | |
197 | ASSERT_TRUE(std::get<0>(ret)); | |
198 | RowValue& merged = std::get<1>(ret); | |
199 | EXPECT_EQ(merged.columns_.size(), 4); | |
200 | VerifyRowValueColumns(merged.columns_, 0, kTombstone, 0, ToMicroSeconds(now - 10)); | |
201 | VerifyRowValueColumns(merged.columns_, 1, kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)); | |
202 | VerifyRowValueColumns(merged.columns_, 2, kColumn, 2, ToMicroSeconds(now)); | |
203 | VerifyRowValueColumns(merged.columns_, 3, kTombstone, 3, ToMicroSeconds(now)); | |
204 | } | |
205 | ||
206 | ||
207 | TEST_F(CassandraFunctionalTest, | |
208 | CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) { | |
209 | purge_ttl_on_expiration_ = true; | |
210 | CassandraStore store(OpenDb()); | |
211 | int64_t now = time(nullptr); | |
212 | ||
213 | store.Append("k1", CreateTestRowValue({ | |
214 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired | |
215 | CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired | |
216 | CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) | |
217 | })); | |
218 | ||
219 | store.Flush(); | |
220 | ||
221 | store.Append("k1",CreateTestRowValue({ | |
222 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired | |
223 | CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now)) | |
224 | })); | |
225 | ||
226 | store.Flush(); | |
227 | store.Compact(); | |
228 | ||
229 | auto ret = store.Get("k1"); | |
230 | ASSERT_TRUE(std::get<0>(ret)); | |
231 | RowValue& merged = std::get<1>(ret); | |
232 | EXPECT_EQ(merged.columns_.size(), 3); | |
233 | VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 1, ToMicroSeconds(now)); | |
234 | VerifyRowValueColumns(merged.columns_, 1, kColumn, 2, ToMicroSeconds(now)); | |
235 | VerifyRowValueColumns(merged.columns_, 2, kTombstone, 3, ToMicroSeconds(now)); | |
236 | } | |
237 | ||
238 | TEST_F(CassandraFunctionalTest, | |
239 | CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) { | |
240 | purge_ttl_on_expiration_ = true; | |
241 | CassandraStore store(OpenDb()); | |
242 | int64_t now = time(nullptr); | |
243 | ||
244 | store.Append("k1", CreateTestRowValue({ | |
245 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), | |
246 | CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)), | |
247 | })); | |
248 | ||
249 | store.Flush(); | |
250 | ||
251 | store.Append("k1",CreateTestRowValue({ | |
252 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), | |
253 | })); | |
254 | ||
255 | store.Flush(); | |
256 | store.Compact(); | |
257 | ASSERT_FALSE(std::get<0>(store.Get("k1"))); | |
258 | } | |
259 | ||
260 | TEST_F(CassandraFunctionalTest, | |
261 | CompactionShouldRemoveTombstoneExceedingGCGracePeriod) { | |
262 | purge_ttl_on_expiration_ = true; | |
263 | CassandraStore store(OpenDb()); | |
264 | int64_t now = time(nullptr); | |
265 | ||
266 | store.Append("k1", CreateTestRowValue({ | |
267 | CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), | |
268 | CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)) | |
269 | })); | |
270 | ||
271 | store.Append("k2", CreateTestRowValue({ | |
272 | CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)) | |
273 | })); | |
274 | ||
275 | store.Flush(); | |
276 | ||
277 | store.Append("k1",CreateTestRowValue({ | |
278 | CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)), | |
279 | })); | |
280 | ||
281 | store.Flush(); | |
282 | store.Compact(); | |
283 | ||
284 | auto ret = store.Get("k1"); | |
285 | ASSERT_TRUE(std::get<0>(ret)); | |
286 | RowValue& gced = std::get<1>(ret); | |
287 | EXPECT_EQ(gced.columns_.size(), 1); | |
288 | VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now)); | |
289 | } | |
290 | ||
291 | TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) { | |
292 | purge_ttl_on_expiration_ = true; | |
293 | CassandraStore store(OpenDb()); | |
294 | int64_t now = time(nullptr); | |
295 | ||
296 | store.Put("k1", CreateTestRowValue({ | |
297 | CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), | |
298 | })); | |
299 | ||
300 | store.Flush(); | |
301 | store.Compact(); | |
302 | ASSERT_FALSE(std::get<0>(store.Get("k1"))); | |
303 | } | |
304 | ||
305 | } // namespace cassandra | |
f67539c2 | 306 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 TL |
307 | |
308 | int main(int argc, char** argv) { | |
309 | ::testing::InitGoogleTest(&argc, argv); | |
310 | return RUN_ALL_TESTS(); | |
311 | } |