1 // Copyright (c) 2016, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #if !defined(ROCKSDB_LITE)
14 #include "db/db_test_util.h"
15 #include "port/stack_trace.h"
16 #include "rocksdb/compaction_filter.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/utilities/lua/rocks_lua_compaction_filter.h"
19 #include "util/testharness.h"
23 class StopOnErrorLogger
: public Logger
{
26 virtual void Logv(const char* format
, va_list ap
) override
{
27 vfprintf(stderr
, format
, ap
);
28 fprintf(stderr
, "\n");
34 class RocksLuaTest
: public testing::Test
{
36 RocksLuaTest() : rnd_(301) {
37 temp_dir_
= test::TmpDir(Env::Default());
41 std::string
RandomString(int len
) {
43 for (int i
= 0; i
< len
; ++i
) {
44 res
+= rnd_
.Uniform(26) + 'a';
49 void CreateDBWithLuaCompactionFilter(
50 const lua::RocksLuaCompactionFilterOptions
& lua_opt
,
51 const std::string
& db_path
,
52 std::unordered_map
<std::string
, std::string
>* kvs
,
53 const int kNumFlushes
= 5,
54 std::shared_ptr
<rocksdb::lua::RocksLuaCompactionFilterFactory
>*
55 output_factory
= nullptr) {
56 const int kKeySize
= 10;
57 const int kValueSize
= 50;
58 const int kKeysPerFlush
= 2;
60 std::make_shared
<rocksdb::lua::RocksLuaCompactionFilterFactory
>(
62 if (output_factory
!= nullptr) {
63 *output_factory
= factory
;
67 options_
.create_if_missing
= true;
68 options_
.compaction_filter_factory
= factory
;
69 options_
.disable_auto_compactions
= true;
70 options_
.max_bytes_for_level_base
=
71 (kKeySize
+ kValueSize
) * kKeysPerFlush
* 2;
72 options_
.max_bytes_for_level_multiplier
= 2;
73 options_
.target_file_size_base
= (kKeySize
+ kValueSize
) * kKeysPerFlush
;
74 options_
.level0_file_num_compaction_trigger
= 2;
75 DestroyDB(db_path
, options_
);
76 ASSERT_OK(DB::Open(options_
, db_path
, &db_
));
78 for (int f
= 0; f
< kNumFlushes
; ++f
) {
79 for (int i
= 0; i
< kKeysPerFlush
; ++i
) {
80 std::string key
= RandomString(kKeySize
);
81 std::string value
= RandomString(kValueSize
);
82 kvs
->insert({key
, value
});
83 ASSERT_OK(db_
->Put(WriteOptions(), key
, value
));
85 db_
->Flush(FlushOptions());
94 std::string temp_dir_
;
100 TEST_F(RocksLuaTest
, Default
) {
101 // If nothing is set in the LuaCompactionFilterOptions, then
102 // RocksDB will keep all the key / value pairs, but it will also
103 // print our error log indicating failure.
104 std::string db_path
= test::PerThreadDBPath(temp_dir_
, "rocks_lua_test");
106 lua::RocksLuaCompactionFilterOptions lua_opt
;
108 std::unordered_map
<std::string
, std::string
> kvs
;
109 CreateDBWithLuaCompactionFilter(lua_opt
, db_path
, &kvs
);
111 for (auto const& entry
: kvs
) {
113 ASSERT_OK(db_
->Get(ReadOptions(), entry
.first
, &value
));
114 ASSERT_EQ(value
, entry
.second
);
118 TEST_F(RocksLuaTest
, KeepsAll
) {
119 std::string db_path
= test::PerThreadDBPath(temp_dir_
, "rocks_lua_test");
121 lua::RocksLuaCompactionFilterOptions lua_opt
;
122 lua_opt
.error_log
= std::make_shared
<StopOnErrorLogger
>();
123 // keeps all the key value pairs
125 "function Filter(level, key, existing_value)\n"
126 " return false, false, \"\"\n"
129 "function FilterMergeOperand(level, key, operand)\n"
133 " return \"KeepsAll\"\n"
137 std::unordered_map
<std::string
, std::string
> kvs
;
138 CreateDBWithLuaCompactionFilter(lua_opt
, db_path
, &kvs
);
140 for (auto const& entry
: kvs
) {
142 ASSERT_OK(db_
->Get(ReadOptions(), entry
.first
, &value
));
143 ASSERT_EQ(value
, entry
.second
);
147 TEST_F(RocksLuaTest
, GetName
) {
148 std::string db_path
= test::PerThreadDBPath(temp_dir_
, "rocks_lua_test");
150 lua::RocksLuaCompactionFilterOptions lua_opt
;
151 lua_opt
.error_log
= std::make_shared
<StopOnErrorLogger
>();
152 const std::string kScriptName
= "SimpleLuaCompactionFilter";
155 "function Filter(level, key, existing_value)\n"
156 " return false, false, \"\"\n"
159 "function FilterMergeOperand(level, key, operand)\n"
163 " return \"") + kScriptName
+ "\"\n"
167 std::shared_ptr
<CompactionFilterFactory
> factory
=
168 std::make_shared
<lua::RocksLuaCompactionFilterFactory
>(lua_opt
);
169 std::string
factory_name(factory
->Name());
170 ASSERT_NE(factory_name
.find(kScriptName
), std::string::npos
);
173 TEST_F(RocksLuaTest
, RemovesAll
) {
174 std::string db_path
= test::PerThreadDBPath(temp_dir_
, "rocks_lua_test");
176 lua::RocksLuaCompactionFilterOptions lua_opt
;
177 lua_opt
.error_log
= std::make_shared
<StopOnErrorLogger
>();
178 // removes all the key value pairs
180 "function Filter(level, key, existing_value)\n"
181 " return true, false, \"\"\n"
184 "function FilterMergeOperand(level, key, operand)\n"
188 " return \"RemovesAll\"\n"
192 std::unordered_map
<std::string
, std::string
> kvs
;
193 CreateDBWithLuaCompactionFilter(lua_opt
, db_path
, &kvs
);
194 // Issue full compaction and expect nothing is in the DB.
195 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
197 for (auto const& entry
: kvs
) {
199 auto s
= db_
->Get(ReadOptions(), entry
.first
, &value
);
200 ASSERT_TRUE(s
.IsNotFound());
204 TEST_F(RocksLuaTest
, FilterByKey
) {
205 std::string db_path
= test::PerThreadDBPath(temp_dir_
, "rocks_lua_test");
207 lua::RocksLuaCompactionFilterOptions lua_opt
;
208 lua_opt
.error_log
= std::make_shared
<StopOnErrorLogger
>();
209 // removes all keys whose initial is less than 'r'
211 "function Filter(level, key, existing_value)\n"
212 " if key:sub(1,1) < 'r' then\n"
213 " return true, false, \"\"\n"
215 " return false, false, \"\"\n"
218 "function FilterMergeOperand(level, key, operand)\n"
222 " return \"KeepsAll\"\n"
225 std::unordered_map
<std::string
, std::string
> kvs
;
226 CreateDBWithLuaCompactionFilter(lua_opt
, db_path
, &kvs
);
227 // Issue full compaction and expect nothing is in the DB.
228 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
230 for (auto const& entry
: kvs
) {
232 auto s
= db_
->Get(ReadOptions(), entry
.first
, &value
);
233 if (entry
.first
[0] < 'r') {
234 ASSERT_TRUE(s
.IsNotFound());
237 ASSERT_TRUE(value
== entry
.second
);
242 TEST_F(RocksLuaTest
, FilterByValue
) {
243 std::string db_path
= test::PerThreadDBPath(temp_dir_
, "rocks_lua_test");
245 lua::RocksLuaCompactionFilterOptions lua_opt
;
246 lua_opt
.error_log
= std::make_shared
<StopOnErrorLogger
>();
247 // removes all values whose initial is less than 'r'
249 "function Filter(level, key, existing_value)\n"
250 " if existing_value:sub(1,1) < 'r' then\n"
251 " return true, false, \"\"\n"
253 " return false, false, \"\"\n"
256 "function FilterMergeOperand(level, key, operand)\n"
260 " return \"FilterByValue\"\n"
264 std::unordered_map
<std::string
, std::string
> kvs
;
265 CreateDBWithLuaCompactionFilter(lua_opt
, db_path
, &kvs
);
266 // Issue full compaction and expect nothing is in the DB.
267 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
269 for (auto const& entry
: kvs
) {
271 auto s
= db_
->Get(ReadOptions(), entry
.first
, &value
);
272 if (entry
.second
[0] < 'r') {
273 ASSERT_TRUE(s
.IsNotFound());
276 ASSERT_EQ(value
, entry
.second
);
281 TEST_F(RocksLuaTest
, ChangeValue
) {
282 std::string db_path
= test::PerThreadDBPath(temp_dir_
, "rocks_lua_test");
284 lua::RocksLuaCompactionFilterOptions lua_opt
;
285 lua_opt
.error_log
= std::make_shared
<StopOnErrorLogger
>();
286 // Replace all values by their reversed key
288 "function Filter(level, key, existing_value)\n"
289 " return false, true, key:reverse()\n"
292 "function FilterMergeOperand(level, key, operand)\n"
296 " return \"ChangeValue\"\n"
300 std::unordered_map
<std::string
, std::string
> kvs
;
301 CreateDBWithLuaCompactionFilter(lua_opt
, db_path
, &kvs
);
302 // Issue full compaction and expect nothing is in the DB.
303 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
305 for (auto const& entry
: kvs
) {
307 ASSERT_OK(db_
->Get(ReadOptions(), entry
.first
, &value
));
308 std::string new_value
= entry
.first
;
309 std::reverse(new_value
.begin(), new_value
.end());
310 ASSERT_EQ(value
, new_value
);
314 TEST_F(RocksLuaTest
, ConditionallyChangeAndFilterValue
) {
315 std::string db_path
= test::PerThreadDBPath(temp_dir_
, "rocks_lua_test");
317 lua::RocksLuaCompactionFilterOptions lua_opt
;
318 lua_opt
.error_log
= std::make_shared
<StopOnErrorLogger
>();
319 // Performs the following logic:
320 // If key[0] < 'h' --> replace value by reverse key
321 // If key[0] >= 'r' --> keep the original key value
322 // Otherwise, filter the key value
324 "function Filter(level, key, existing_value)\n"
325 " if key:sub(1,1) < 'h' then\n"
326 " return false, true, key:reverse()\n"
327 " elseif key:sub(1,1) < 'r' then\n"
328 " return true, false, \"\"\n"
330 " return false, false, \"\"\n"
333 " return \"ConditionallyChangeAndFilterValue\"\n"
337 std::unordered_map
<std::string
, std::string
> kvs
;
338 CreateDBWithLuaCompactionFilter(lua_opt
, db_path
, &kvs
);
339 // Issue full compaction and expect nothing is in the DB.
340 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
342 for (auto const& entry
: kvs
) {
344 auto s
= db_
->Get(ReadOptions(), entry
.first
, &value
);
345 if (entry
.first
[0] < 'h') {
347 std::string new_value
= entry
.first
;
348 std::reverse(new_value
.begin(), new_value
.end());
349 ASSERT_EQ(value
, new_value
);
350 } else if (entry
.first
[0] < 'r') {
351 ASSERT_TRUE(s
.IsNotFound());
354 ASSERT_EQ(value
, entry
.second
);
359 TEST_F(RocksLuaTest
, DynamicChangeScript
) {
360 std::string db_path
= test::PerThreadDBPath(temp_dir_
, "rocks_lua_test");
362 lua::RocksLuaCompactionFilterOptions lua_opt
;
363 lua_opt
.error_log
= std::make_shared
<StopOnErrorLogger
>();
364 // keeps all the key value pairs
366 "function Filter(level, key, existing_value)\n"
367 " return false, false, \"\"\n"
370 "function FilterMergeOperand(level, key, operand)\n"
374 " return \"KeepsAll\"\n"
378 std::unordered_map
<std::string
, std::string
> kvs
;
379 std::shared_ptr
<rocksdb::lua::RocksLuaCompactionFilterFactory
> factory
;
380 CreateDBWithLuaCompactionFilter(lua_opt
, db_path
, &kvs
, 30, &factory
);
382 ASSERT_TRUE(db_
->GetIntProperty(
383 rocksdb::DB::Properties::kNumEntriesActiveMemTable
, &count
));
385 ASSERT_TRUE(db_
->GetIntProperty(
386 rocksdb::DB::Properties::kNumEntriesImmMemTables
, &count
));
389 CompactRangeOptions cr_opt
;
390 cr_opt
.bottommost_level_compaction
=
391 rocksdb::BottommostLevelCompaction::kForce
;
393 // Issue full compaction and expect everything is in the DB.
394 ASSERT_OK(db_
->CompactRange(cr_opt
, nullptr, nullptr));
396 for (auto const& entry
: kvs
) {
398 ASSERT_OK(db_
->Get(ReadOptions(), entry
.first
, &value
));
399 ASSERT_EQ(value
, entry
.second
);
402 // change the lua script to removes all the key value pairs
404 "function Filter(level, key, existing_value)\n"
405 " return true, false, \"\"\n"
408 "function FilterMergeOperand(level, key, operand)\n"
412 " return \"RemovesAll\"\n"
416 std::string key
= "another-key";
417 std::string value
= "another-value";
418 kvs
.insert({key
, value
});
419 ASSERT_OK(db_
->Put(WriteOptions(), key
, value
));
420 db_
->Flush(FlushOptions());
423 cr_opt
.change_level
= true;
424 cr_opt
.target_level
= 5;
425 // Issue full compaction and expect nothing is in the DB.
426 ASSERT_OK(db_
->CompactRange(cr_opt
, nullptr, nullptr));
428 for (auto const& entry
: kvs
) {
430 auto s
= db_
->Get(ReadOptions(), entry
.first
, &value
);
431 ASSERT_TRUE(s
.IsNotFound());
435 TEST_F(RocksLuaTest
, LuaConditionalTypeError
) {
436 std::string db_path
= test::PerThreadDBPath(temp_dir_
, "rocks_lua_test");
438 lua::RocksLuaCompactionFilterOptions lua_opt
;
439 // Filter() error when input key's initial >= 'r'
441 "function Filter(level, key, existing_value)\n"
442 " if existing_value:sub(1,1) >= 'r' then\n"
443 " return true, 2, \"\" -- incorrect type of 2nd return value\n"
445 " return true, false, \"\"\n"
448 "function FilterMergeOperand(level, key, operand)\n"
452 " return \"BuggyCode\"\n"
456 std::unordered_map
<std::string
, std::string
> kvs
;
457 // Create DB with 10 files
458 CreateDBWithLuaCompactionFilter(lua_opt
, db_path
, &kvs
, 10);
460 // Issue full compaction and expect all keys which initial is < 'r'
461 // will be deleted as we keep the key value when we hit an error.
462 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
464 for (auto const& entry
: kvs
) {
466 auto s
= db_
->Get(ReadOptions(), entry
.first
, &value
);
467 if (entry
.second
[0] < 'r') {
468 ASSERT_TRUE(s
.IsNotFound());
471 ASSERT_EQ(value
, entry
.second
);
476 } // namespace rocksdb
478 int main(int argc
, char** argv
) {
479 rocksdb::port::InstallStackTraceHandler();
480 ::testing::InitGoogleTest(&argc
, argv
);
481 return RUN_ALL_TESTS();
486 int main(int /*argc*/, char** /*argv*/) {
487 printf("LUA_PATH is not set. Ignoring the test.\n");
490 #endif // defined(LUA)
494 int main(int /*argc*/, char** /*argv*/) {
495 printf("Lua is not supported in RocksDBLite. Ignoring the test.\n");
498 #endif // !defined(ROCKSDB_LITE)