]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / utilities / cassandra / cassandra_functional_test.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include <iostream>
f67539c2 7#include "db/db_impl/db_impl.h"
11fdf7f2 8#include "rocksdb/db.h"
11fdf7f2
TL
9#include "rocksdb/merge_operator.h"
10#include "rocksdb/utilities/db_ttl.h"
f67539c2 11#include "test_util/testharness.h"
11fdf7f2 12#include "util/random.h"
11fdf7f2
TL
13#include "utilities/cassandra/cassandra_compaction_filter.h"
14#include "utilities/cassandra/merge_operator.h"
15#include "utilities/cassandra/test_utils.h"
f67539c2 16#include "utilities/merge_operators.h"
11fdf7f2 17
f67539c2 18using namespace ROCKSDB_NAMESPACE;
11fdf7f2 19
f67539c2 20namespace ROCKSDB_NAMESPACE {
11fdf7f2
TL
21namespace cassandra {
22
23// Path to the database on file system
24const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test");
25
26class CassandraStore {
27 public:
28 explicit CassandraStore(std::shared_ptr<DB> db)
29 : db_(db), write_option_(), get_option_() {
30 assert(db);
31 }
32
33 bool Append(const std::string& key, const RowValue& val){
34 std::string result;
35 val.Serialize(&result);
36 Slice valSlice(result.data(), result.size());
37 auto s = db_->Merge(write_option_, key, valSlice);
38
39 if (s.ok()) {
40 return true;
41 } else {
42 std::cerr << "ERROR " << s.ToString() << std::endl;
43 return false;
44 }
45 }
46
47 bool Put(const std::string& key, const RowValue& val) {
48 std::string result;
49 val.Serialize(&result);
50 Slice valSlice(result.data(), result.size());
51 auto s = db_->Put(write_option_, key, valSlice);
52 if (s.ok()) {
53 return true;
54 } else {
55 std::cerr << "ERROR " << s.ToString() << std::endl;
56 return false;
57 }
58 }
59
60 void Flush() {
61 dbfull()->TEST_FlushMemTable();
62 dbfull()->TEST_WaitForCompact();
63 }
64
65 void Compact() {
66 dbfull()->TEST_CompactRange(
67 0, nullptr, nullptr, db_->DefaultColumnFamily());
68 }
69
70 std::tuple<bool, RowValue> Get(const std::string& key){
71 std::string result;
72 auto s = db_->Get(get_option_, key, &result);
73
74 if (s.ok()) {
75 return std::make_tuple(true,
76 RowValue::Deserialize(result.data(),
77 result.size()));
78 }
79
80 if (!s.IsNotFound()) {
81 std::cerr << "ERROR " << s.ToString() << std::endl;
82 }
83
84 return std::make_tuple(false, RowValue(0, 0));
85 }
86
87 private:
88 std::shared_ptr<DB> db_;
89 WriteOptions write_option_;
90 ReadOptions get_option_;
91
92 DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_.get()); }
93};
94
95class TestCompactionFilterFactory : public CompactionFilterFactory {
96public:
97 explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
98 int32_t gc_grace_period_in_seconds)
99 : purge_ttl_on_expiration_(purge_ttl_on_expiration),
100 gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
101
494da23a 102 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
11fdf7f2 103 const CompactionFilter::Context& /*context*/) override {
494da23a 104 return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
11fdf7f2 105 purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
494da23a 106 }
11fdf7f2 107
494da23a 108 const char* Name() const override { return "TestCompactionFilterFactory"; }
11fdf7f2
TL
109
110private:
111 bool purge_ttl_on_expiration_;
112 int32_t gc_grace_period_in_seconds_;
113};
114
115
116// The class for unit-testing
117class CassandraFunctionalTest : public testing::Test {
118public:
119 CassandraFunctionalTest() {
120 DestroyDB(kDbName, Options()); // Start each test with a fresh DB
121 }
122
123 std::shared_ptr<DB> OpenDb() {
124 DB* db;
125 Options options;
126 options.create_if_missing = true;
127 options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
128 auto* cf_factory = new TestCompactionFilterFactory(
129 purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
130 options.compaction_filter_factory.reset(cf_factory);
131 EXPECT_OK(DB::Open(options, kDbName, &db));
132 return std::shared_ptr<DB>(db);
133 }
134
135 bool purge_ttl_on_expiration_ = false;
136 int32_t gc_grace_period_in_seconds_ = 100;
137};
138
139// THE TEST CASES BEGIN HERE
140
141TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
142 CassandraStore store(OpenDb());
143 int64_t now = time(nullptr);
144
145 store.Append("k1", CreateTestRowValue({
146 CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
147 CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
148 CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
149 }));
150 store.Append("k1",CreateTestRowValue({
151 CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
152 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
153 CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
154 CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
155 }));
156 store.Append("k1", CreateTestRowValue({
157 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
158 CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
159 CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
160 CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
161 }));
162
163 auto ret = store.Get("k1");
164
165 ASSERT_TRUE(std::get<0>(ret));
166 RowValue& merged = std::get<1>(ret);
167 EXPECT_EQ(merged.columns_.size(), 5);
168 VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, ToMicroSeconds(now + 6));
169 VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, ToMicroSeconds(now + 8));
170 VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, ToMicroSeconds(now + 7));
171 VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, ToMicroSeconds(now + 17));
172 VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, ToMicroSeconds(now + 11));
173}
174
175TEST_F(CassandraFunctionalTest,
176 CompactionShouldConvertExpiredColumnsToTombstone) {
177 CassandraStore store(OpenDb());
178 int64_t now= time(nullptr);
179
180 store.Append("k1", CreateTestRowValue({
181 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
182 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired
183 CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
184 }));
185
186 store.Flush();
187
188 store.Append("k1",CreateTestRowValue({
189 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
190 CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
191 }));
192
193 store.Flush();
194 store.Compact();
195
196 auto ret = store.Get("k1");
197 ASSERT_TRUE(std::get<0>(ret));
198 RowValue& merged = std::get<1>(ret);
199 EXPECT_EQ(merged.columns_.size(), 4);
200 VerifyRowValueColumns(merged.columns_, 0, kTombstone, 0, ToMicroSeconds(now - 10));
201 VerifyRowValueColumns(merged.columns_, 1, kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10));
202 VerifyRowValueColumns(merged.columns_, 2, kColumn, 2, ToMicroSeconds(now));
203 VerifyRowValueColumns(merged.columns_, 3, kTombstone, 3, ToMicroSeconds(now));
204}
205
206
207TEST_F(CassandraFunctionalTest,
208 CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
209 purge_ttl_on_expiration_ = true;
210 CassandraStore store(OpenDb());
211 int64_t now = time(nullptr);
212
213 store.Append("k1", CreateTestRowValue({
214 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
215 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired
216 CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
217 }));
218
219 store.Flush();
220
221 store.Append("k1",CreateTestRowValue({
222 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
223 CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
224 }));
225
226 store.Flush();
227 store.Compact();
228
229 auto ret = store.Get("k1");
230 ASSERT_TRUE(std::get<0>(ret));
231 RowValue& merged = std::get<1>(ret);
232 EXPECT_EQ(merged.columns_.size(), 3);
233 VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 1, ToMicroSeconds(now));
234 VerifyRowValueColumns(merged.columns_, 1, kColumn, 2, ToMicroSeconds(now));
235 VerifyRowValueColumns(merged.columns_, 2, kTombstone, 3, ToMicroSeconds(now));
236}
237
238TEST_F(CassandraFunctionalTest,
239 CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
240 purge_ttl_on_expiration_ = true;
241 CassandraStore store(OpenDb());
242 int64_t now = time(nullptr);
243
244 store.Append("k1", CreateTestRowValue({
245 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)),
246 CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)),
247 }));
248
249 store.Flush();
250
251 store.Append("k1",CreateTestRowValue({
252 CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)),
253 }));
254
255 store.Flush();
256 store.Compact();
257 ASSERT_FALSE(std::get<0>(store.Get("k1")));
258}
259
260TEST_F(CassandraFunctionalTest,
261 CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
262 purge_ttl_on_expiration_ = true;
263 CassandraStore store(OpenDb());
264 int64_t now = time(nullptr);
265
266 store.Append("k1", CreateTestRowValue({
267 CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
268 CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))
269 }));
270
271 store.Append("k2", CreateTestRowValue({
272 CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now))
273 }));
274
275 store.Flush();
276
277 store.Append("k1",CreateTestRowValue({
278 CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
279 }));
280
281 store.Flush();
282 store.Compact();
283
284 auto ret = store.Get("k1");
285 ASSERT_TRUE(std::get<0>(ret));
286 RowValue& gced = std::get<1>(ret);
287 EXPECT_EQ(gced.columns_.size(), 1);
288 VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now));
289}
290
291TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
292 purge_ttl_on_expiration_ = true;
293 CassandraStore store(OpenDb());
294 int64_t now = time(nullptr);
295
296 store.Put("k1", CreateTestRowValue({
297 CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
298 }));
299
300 store.Flush();
301 store.Compact();
302 ASSERT_FALSE(std::get<0>(store.Get("k1")));
303}
304
305} // namespace cassandra
f67539c2 306} // namespace ROCKSDB_NAMESPACE
11fdf7f2
TL
307
308int main(int argc, char** argv) {
309 ::testing::InitGoogleTest(&argc, argv);
310 return RUN_ALL_TESTS();
311}