]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2017-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #include <iostream> | |
20effc67 | 7 | |
f67539c2 | 8 | #include "db/db_impl/db_impl.h" |
11fdf7f2 | 9 | #include "rocksdb/db.h" |
11fdf7f2 TL |
10 | #include "rocksdb/merge_operator.h" |
11 | #include "rocksdb/utilities/db_ttl.h" | |
f67539c2 | 12 | #include "test_util/testharness.h" |
20effc67 | 13 | #include "util/cast_util.h" |
11fdf7f2 | 14 | #include "util/random.h" |
11fdf7f2 TL |
15 | #include "utilities/cassandra/cassandra_compaction_filter.h" |
16 | #include "utilities/cassandra/merge_operator.h" | |
17 | #include "utilities/cassandra/test_utils.h" | |
f67539c2 | 18 | #include "utilities/merge_operators.h" |
11fdf7f2 | 19 | |
11fdf7f2 | 20 | |
f67539c2 | 21 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
22 | namespace cassandra { |
23 | ||
24 | // Path to the database on file system | |
25 | const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test"); | |
26 | ||
27 | class CassandraStore { | |
28 | public: | |
29 | explicit CassandraStore(std::shared_ptr<DB> db) | |
30 | : db_(db), write_option_(), get_option_() { | |
31 | assert(db); | |
32 | } | |
33 | ||
34 | bool Append(const std::string& key, const RowValue& val){ | |
35 | std::string result; | |
36 | val.Serialize(&result); | |
37 | Slice valSlice(result.data(), result.size()); | |
38 | auto s = db_->Merge(write_option_, key, valSlice); | |
39 | ||
40 | if (s.ok()) { | |
41 | return true; | |
42 | } else { | |
43 | std::cerr << "ERROR " << s.ToString() << std::endl; | |
44 | return false; | |
45 | } | |
46 | } | |
47 | ||
48 | bool Put(const std::string& key, const RowValue& val) { | |
49 | std::string result; | |
50 | val.Serialize(&result); | |
51 | Slice valSlice(result.data(), result.size()); | |
52 | auto s = db_->Put(write_option_, key, valSlice); | |
53 | if (s.ok()) { | |
54 | return true; | |
55 | } else { | |
56 | std::cerr << "ERROR " << s.ToString() << std::endl; | |
57 | return false; | |
58 | } | |
59 | } | |
60 | ||
61 | void Flush() { | |
62 | dbfull()->TEST_FlushMemTable(); | |
63 | dbfull()->TEST_WaitForCompact(); | |
64 | } | |
65 | ||
66 | void Compact() { | |
67 | dbfull()->TEST_CompactRange( | |
68 | 0, nullptr, nullptr, db_->DefaultColumnFamily()); | |
69 | } | |
70 | ||
71 | std::tuple<bool, RowValue> Get(const std::string& key){ | |
72 | std::string result; | |
73 | auto s = db_->Get(get_option_, key, &result); | |
74 | ||
75 | if (s.ok()) { | |
76 | return std::make_tuple(true, | |
77 | RowValue::Deserialize(result.data(), | |
78 | result.size())); | |
79 | } | |
80 | ||
81 | if (!s.IsNotFound()) { | |
82 | std::cerr << "ERROR " << s.ToString() << std::endl; | |
83 | } | |
84 | ||
85 | return std::make_tuple(false, RowValue(0, 0)); | |
86 | } | |
87 | ||
88 | private: | |
89 | std::shared_ptr<DB> db_; | |
90 | WriteOptions write_option_; | |
91 | ReadOptions get_option_; | |
92 | ||
20effc67 | 93 | DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_.get()); } |
11fdf7f2 TL |
94 | }; |
95 | ||
96 | class TestCompactionFilterFactory : public CompactionFilterFactory { | |
97 | public: | |
98 | explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration, | |
99 | int32_t gc_grace_period_in_seconds) | |
100 | : purge_ttl_on_expiration_(purge_ttl_on_expiration), | |
101 | gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} | |
102 | ||
494da23a | 103 | std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
11fdf7f2 | 104 | const CompactionFilter::Context& /*context*/) override { |
494da23a | 105 | return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter( |
11fdf7f2 | 106 | purge_ttl_on_expiration_, gc_grace_period_in_seconds_)); |
494da23a | 107 | } |
11fdf7f2 | 108 | |
494da23a | 109 | const char* Name() const override { return "TestCompactionFilterFactory"; } |
11fdf7f2 TL |
110 | |
111 | private: | |
112 | bool purge_ttl_on_expiration_; | |
113 | int32_t gc_grace_period_in_seconds_; | |
114 | }; | |
115 | ||
116 | ||
117 | // The class for unit-testing | |
118 | class CassandraFunctionalTest : public testing::Test { | |
119 | public: | |
120 | CassandraFunctionalTest() { | |
121 | DestroyDB(kDbName, Options()); // Start each test with a fresh DB | |
122 | } | |
123 | ||
124 | std::shared_ptr<DB> OpenDb() { | |
125 | DB* db; | |
126 | Options options; | |
127 | options.create_if_missing = true; | |
128 | options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_)); | |
129 | auto* cf_factory = new TestCompactionFilterFactory( | |
130 | purge_ttl_on_expiration_, gc_grace_period_in_seconds_); | |
131 | options.compaction_filter_factory.reset(cf_factory); | |
132 | EXPECT_OK(DB::Open(options, kDbName, &db)); | |
133 | return std::shared_ptr<DB>(db); | |
134 | } | |
135 | ||
136 | bool purge_ttl_on_expiration_ = false; | |
137 | int32_t gc_grace_period_in_seconds_ = 100; | |
138 | }; | |
139 | ||
140 | // THE TEST CASES BEGIN HERE | |
141 | ||
142 | TEST_F(CassandraFunctionalTest, SimpleMergeTest) { | |
143 | CassandraStore store(OpenDb()); | |
144 | int64_t now = time(nullptr); | |
145 | ||
146 | store.Append("k1", CreateTestRowValue({ | |
147 | CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)), | |
148 | CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)), | |
149 | CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)), | |
150 | })); | |
151 | store.Append("k1",CreateTestRowValue({ | |
152 | CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)), | |
153 | CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)), | |
154 | CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)), | |
155 | CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)), | |
156 | })); | |
157 | store.Append("k1", CreateTestRowValue({ | |
158 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)), | |
159 | CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)), | |
160 | CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)), | |
161 | CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)), | |
162 | })); | |
163 | ||
164 | auto ret = store.Get("k1"); | |
165 | ||
166 | ASSERT_TRUE(std::get<0>(ret)); | |
167 | RowValue& merged = std::get<1>(ret); | |
20effc67 TL |
168 | EXPECT_EQ(merged.get_columns().size(), 5); |
169 | VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 0, | |
170 | ToMicroSeconds(now + 6)); | |
171 | VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 1, | |
172 | ToMicroSeconds(now + 8)); | |
173 | VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 2, | |
174 | ToMicroSeconds(now + 7)); | |
175 | VerifyRowValueColumns(merged.get_columns(), 3, kExpiringColumn, 7, | |
176 | ToMicroSeconds(now + 17)); | |
177 | VerifyRowValueColumns(merged.get_columns(), 4, kTombstone, 11, | |
178 | ToMicroSeconds(now + 11)); | |
11fdf7f2 TL |
179 | } |
180 | ||
181 | TEST_F(CassandraFunctionalTest, | |
182 | CompactionShouldConvertExpiredColumnsToTombstone) { | |
183 | CassandraStore store(OpenDb()); | |
184 | int64_t now= time(nullptr); | |
185 | ||
186 | store.Append("k1", CreateTestRowValue({ | |
187 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired | |
188 | CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired | |
189 | CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) | |
190 | })); | |
191 | ||
192 | store.Flush(); | |
193 | ||
194 | store.Append("k1",CreateTestRowValue({ | |
195 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired | |
196 | CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now)) | |
197 | })); | |
198 | ||
199 | store.Flush(); | |
200 | store.Compact(); | |
201 | ||
202 | auto ret = store.Get("k1"); | |
203 | ASSERT_TRUE(std::get<0>(ret)); | |
204 | RowValue& merged = std::get<1>(ret); | |
20effc67 TL |
205 | EXPECT_EQ(merged.get_columns().size(), 4); |
206 | VerifyRowValueColumns(merged.get_columns(), 0, kTombstone, 0, | |
207 | ToMicroSeconds(now - 10)); | |
208 | VerifyRowValueColumns(merged.get_columns(), 1, kExpiringColumn, 1, | |
209 | ToMicroSeconds(now - kTtl + 10)); | |
210 | VerifyRowValueColumns(merged.get_columns(), 2, kColumn, 2, | |
211 | ToMicroSeconds(now)); | |
212 | VerifyRowValueColumns(merged.get_columns(), 3, kTombstone, 3, | |
213 | ToMicroSeconds(now)); | |
11fdf7f2 TL |
214 | } |
215 | ||
216 | ||
217 | TEST_F(CassandraFunctionalTest, | |
218 | CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) { | |
219 | purge_ttl_on_expiration_ = true; | |
220 | CassandraStore store(OpenDb()); | |
221 | int64_t now = time(nullptr); | |
222 | ||
223 | store.Append("k1", CreateTestRowValue({ | |
224 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired | |
225 | CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired | |
226 | CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) | |
227 | })); | |
228 | ||
229 | store.Flush(); | |
230 | ||
231 | store.Append("k1",CreateTestRowValue({ | |
232 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired | |
233 | CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now)) | |
234 | })); | |
235 | ||
236 | store.Flush(); | |
237 | store.Compact(); | |
238 | ||
239 | auto ret = store.Get("k1"); | |
240 | ASSERT_TRUE(std::get<0>(ret)); | |
241 | RowValue& merged = std::get<1>(ret); | |
20effc67 TL |
242 | EXPECT_EQ(merged.get_columns().size(), 3); |
243 | VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 1, | |
244 | ToMicroSeconds(now)); | |
245 | VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 2, | |
246 | ToMicroSeconds(now)); | |
247 | VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 3, | |
248 | ToMicroSeconds(now)); | |
11fdf7f2 TL |
249 | } |
250 | ||
251 | TEST_F(CassandraFunctionalTest, | |
252 | CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) { | |
253 | purge_ttl_on_expiration_ = true; | |
254 | CassandraStore store(OpenDb()); | |
255 | int64_t now = time(nullptr); | |
256 | ||
257 | store.Append("k1", CreateTestRowValue({ | |
258 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), | |
259 | CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)), | |
260 | })); | |
261 | ||
262 | store.Flush(); | |
263 | ||
264 | store.Append("k1",CreateTestRowValue({ | |
265 | CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), | |
266 | })); | |
267 | ||
268 | store.Flush(); | |
269 | store.Compact(); | |
270 | ASSERT_FALSE(std::get<0>(store.Get("k1"))); | |
271 | } | |
272 | ||
273 | TEST_F(CassandraFunctionalTest, | |
274 | CompactionShouldRemoveTombstoneExceedingGCGracePeriod) { | |
275 | purge_ttl_on_expiration_ = true; | |
276 | CassandraStore store(OpenDb()); | |
277 | int64_t now = time(nullptr); | |
278 | ||
279 | store.Append("k1", CreateTestRowValue({ | |
280 | CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), | |
281 | CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)) | |
282 | })); | |
283 | ||
284 | store.Append("k2", CreateTestRowValue({ | |
285 | CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)) | |
286 | })); | |
287 | ||
288 | store.Flush(); | |
289 | ||
290 | store.Append("k1",CreateTestRowValue({ | |
291 | CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)), | |
292 | })); | |
293 | ||
294 | store.Flush(); | |
295 | store.Compact(); | |
296 | ||
297 | auto ret = store.Get("k1"); | |
298 | ASSERT_TRUE(std::get<0>(ret)); | |
299 | RowValue& gced = std::get<1>(ret); | |
20effc67 TL |
300 | EXPECT_EQ(gced.get_columns().size(), 1); |
301 | VerifyRowValueColumns(gced.get_columns(), 0, kColumn, 1, ToMicroSeconds(now)); | |
11fdf7f2 TL |
302 | } |
303 | ||
304 | TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) { | |
305 | purge_ttl_on_expiration_ = true; | |
306 | CassandraStore store(OpenDb()); | |
307 | int64_t now = time(nullptr); | |
308 | ||
309 | store.Put("k1", CreateTestRowValue({ | |
310 | CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), | |
311 | })); | |
312 | ||
313 | store.Flush(); | |
314 | store.Compact(); | |
315 | ASSERT_FALSE(std::get<0>(store.Get("k1"))); | |
316 | } | |
317 | ||
318 | } // namespace cassandra | |
f67539c2 | 319 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 TL |
320 | |
321 | int main(int argc, char** argv) { | |
322 | ::testing::InitGoogleTest(&argc, argv); | |
323 | return RUN_ALL_TESTS(); | |
324 | } |