1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include <sys/mount.h>
20 #include "kv/KeyValueDB.h"
21 #include "kv/RocksDBStore.h"
22 #include "include/Context.h"
23 #include "common/ceph_argparse.h"
24 #include "global/global_init.h"
25 #include "common/Cond.h"
26 #include "common/errno.h"
27 #include "include/stringify.h"
28 #include <gtest/gtest.h>
32 class KVTest
: public ::testing::TestWithParam
<const char*> {
34 boost::scoped_ptr
<KeyValueDB
> db
;
38 string
_bl_to_str(bufferlist val
) {
39 string
str(val
.c_str(), val
.length());
43 void rm_r(string path
) {
44 string cmd
= string("rm -r ") + path
;
45 cout
<< "==> " << cmd
<< std::endl
;
46 int r
= ::system(cmd
.c_str());
48 cerr
<< "failed with exit code " << r
49 << ", continuing anyway" << std::endl
;
54 cout
<< "Creating " << string(GetParam()) << "\n";
55 db
.reset(KeyValueDB::create(g_ceph_context
, string(GetParam()),
62 void SetUp() override
{
63 int r
= ::mkdir("kv_test_temp_dir", 0777);
64 if (r
< 0 && errno
!= EEXIST
) {
66 cerr
<< __func__
<< ": unable to create kv_test_temp_dir: "
67 << cpp_strerror(r
) << std::endl
;
72 void TearDown() override
{
74 rm_r("kv_test_temp_dir");
78 TEST_P(KVTest
, OpenClose
) {
79 ASSERT_EQ(0, db
->create_and_open(cout
));
85 TEST_P(KVTest
, OpenCloseReopenClose
) {
86 ASSERT_EQ(0, db
->create_and_open(cout
));
89 ASSERT_EQ(0, db
->open(cout
));
94 * Basic write and read test case in same database session.
96 TEST_P(KVTest
, OpenWriteRead
) {
97 ASSERT_EQ(0, db
->create_and_open(cout
));
99 KeyValueDB::Transaction t
= db
->get_transaction();
101 value
.append("value");
102 t
->set("prefix", "key", value
);
104 value
.append("value2");
105 t
->set("prefix", "key2", value
);
107 value
.append("value3");
108 t
->set("prefix", "key3", value
);
109 db
->submit_transaction_sync(t
);
112 ASSERT_EQ(0, db
->get("prefix", "key", &v1
));
113 ASSERT_EQ(v1
.length(), 5u);
114 (v1
.c_str())[v1
.length()] = 0x0;
115 ASSERT_EQ(std::string(v1
.c_str()), std::string("value"));
116 ASSERT_EQ(0, db
->get("prefix", "key2", &v2
));
117 ASSERT_EQ(v2
.length(), 6u);
118 (v2
.c_str())[v2
.length()] = 0x0;
119 ASSERT_EQ(std::string(v2
.c_str()), std::string("value2"));
124 TEST_P(KVTest
, PutReopen
) {
125 ASSERT_EQ(0, db
->create_and_open(cout
));
127 KeyValueDB::Transaction t
= db
->get_transaction();
129 value
.append("value");
130 t
->set("prefix", "key", value
);
131 t
->set("prefix", "key2", value
);
132 t
->set("prefix", "key3", value
);
133 db
->submit_transaction_sync(t
);
138 ASSERT_EQ(0, db
->open(cout
));
141 ASSERT_EQ(0, db
->get("prefix", "key", &v1
));
142 ASSERT_EQ(v1
.length(), 5u);
143 ASSERT_EQ(0, db
->get("prefix", "key2", &v2
));
144 ASSERT_EQ(v2
.length(), 5u);
147 KeyValueDB::Transaction t
= db
->get_transaction();
148 t
->rmkey("prefix", "key");
149 t
->rmkey("prefix", "key3");
150 db
->submit_transaction_sync(t
);
155 ASSERT_EQ(0, db
->open(cout
));
157 bufferlist v1
, v2
, v3
;
158 ASSERT_EQ(-ENOENT
, db
->get("prefix", "key", &v1
));
159 ASSERT_EQ(0, db
->get("prefix", "key2", &v2
));
160 ASSERT_EQ(v2
.length(), 5u);
161 ASSERT_EQ(-ENOENT
, db
->get("prefix", "key3", &v3
));
166 TEST_P(KVTest
, BenchCommit
) {
168 ASSERT_EQ(0, db
->create_and_open(cout
));
169 utime_t start
= ceph_clock_now();
171 cout
<< "priming" << std::endl
;
174 bufferptr
bp(1048576);
177 for (int i
=0; i
<30; ++i
) {
178 KeyValueDB::Transaction t
= db
->get_transaction();
179 t
->set("prefix", "big" + stringify(i
), big
);
180 db
->submit_transaction_sync(t
);
183 cout
<< "now doing small writes" << std::endl
;
188 for (int i
=0; i
<n
; ++i
) {
189 KeyValueDB::Transaction t
= db
->get_transaction();
190 t
->set("prefix", "key" + stringify(i
), data
);
191 db
->submit_transaction_sync(t
);
193 utime_t end
= ceph_clock_now();
194 utime_t dur
= end
- start
;
195 cout
<< n
<< " commits in " << dur
<< ", avg latency " << (dur
/ (double)n
)
200 struct AppendMOP
: public KeyValueDB::MergeOperator
{
201 void merge_nonexistent(
202 const char *rdata
, size_t rlen
, std::string
*new_value
) override
{
203 *new_value
= "?" + std::string(rdata
, rlen
);
206 const char *ldata
, size_t llen
,
207 const char *rdata
, size_t rlen
,
208 std::string
*new_value
) override
{
209 *new_value
= std::string(ldata
, llen
) + std::string(rdata
, rlen
);
211 // We use each operator name and each prefix to construct the
212 // overall RocksDB operator name for consistency check at open time.
213 const char *name() const override
{
218 string
tostr(bufferlist
& b
) {
219 return string(b
.c_str(),b
.length());
222 TEST_P(KVTest
, Merge
) {
223 shared_ptr
<KeyValueDB::MergeOperator
> p(new AppendMOP
);
224 int r
= db
->set_merge_operator("A",p
);
226 return; // No merge operators for this database type
227 ASSERT_EQ(0, db
->create_and_open(cout
));
229 KeyValueDB::Transaction t
= db
->get_transaction();
230 bufferlist v1
, v2
, v3
;
231 v1
.append(string("1"));
232 v2
.append(string("2"));
233 v3
.append(string("3"));
234 t
->set("P", "K1", v1
);
235 t
->set("A", "A1", v2
);
237 t
->merge("A", "A2", v3
);
238 db
->submit_transaction_sync(t
);
241 bufferlist v1
, v2
, v3
;
242 ASSERT_EQ(0, db
->get("P", "K1", &v1
));
243 ASSERT_EQ(tostr(v1
), "1");
244 ASSERT_EQ(0, db
->get("A", "A1", &v2
));
245 ASSERT_EQ(tostr(v2
), "2");
246 ASSERT_EQ(0, db
->get("A", "A2", &v3
));
247 ASSERT_EQ(tostr(v3
), "?3");
250 KeyValueDB::Transaction t
= db
->get_transaction();
252 v1
.append(string("1"));
253 t
->merge("A", "A2", v1
);
254 db
->submit_transaction_sync(t
);
258 ASSERT_EQ(0, db
->get("A", "A2", &v
));
259 ASSERT_EQ(tostr(v
), "?31");
264 TEST_P(KVTest
, RMRange
) {
265 ASSERT_EQ(0, db
->create_and_open(cout
));
267 value
.append("value");
269 KeyValueDB::Transaction t
= db
->get_transaction();
270 t
->set("prefix", "key1", value
);
271 t
->set("prefix", "key2", value
);
272 t
->set("prefix", "key3", value
);
273 t
->set("prefix", "key4", value
);
274 t
->set("prefix", "key45", value
);
275 t
->set("prefix", "key5", value
);
276 t
->set("prefix", "key6", value
);
277 db
->submit_transaction_sync(t
);
281 KeyValueDB::Transaction t
= db
->get_transaction();
282 t
->set("prefix", "key7", value
);
283 t
->set("prefix", "key8", value
);
284 t
->rm_range_keys("prefix", "key2", "key7");
285 db
->submit_transaction_sync(t
);
287 ASSERT_EQ(0, db
->get("prefix", "key1", &v1
));
289 ASSERT_EQ(-ENOENT
, db
->get("prefix", "key45", &v1
));
290 ASSERT_EQ(0, db
->get("prefix", "key8", &v1
));
292 ASSERT_EQ(-ENOENT
, db
->get("prefix", "key2", &v1
));
293 ASSERT_EQ(0, db
->get("prefix", "key7", &v2
));
297 KeyValueDB::Transaction t
= db
->get_transaction();
298 t
->rm_range_keys("prefix", "key", "key");
299 db
->submit_transaction_sync(t
);
301 ASSERT_EQ(0, db
->get("prefix", "key1", &v1
));
302 ASSERT_EQ(0, db
->get("prefix", "key8", &v2
));
306 KeyValueDB::Transaction t
= db
->get_transaction();
307 t
->rm_range_keys("prefix", "key-", "key~");
308 db
->submit_transaction_sync(t
);
310 ASSERT_EQ(-ENOENT
, db
->get("prefix", "key1", &v1
));
311 ASSERT_EQ(-ENOENT
, db
->get("prefix", "key8", &v2
));
317 TEST_P(KVTest
, ShardingRMRange
) {
318 if(string(GetParam()) != "rocksdb")
320 std::string
cfs("O(7)=");
321 ASSERT_EQ(0, db
->create_and_open(cout
, cfs
));
323 KeyValueDB::Transaction t
= db
->get_transaction();
324 for (size_t i
= 0; i
< 1000; i
++) {
327 ASSERT_EQ(asprintf(&a
, "key%3.3ld", i
), 6);
329 t
->set("O", a
, value
);
332 db
->submit_transaction_sync(t
);
336 KeyValueDB::Transaction t
= db
->get_transaction();
337 t
->rm_range_keys("O", "key277", "key467");
338 db
->submit_transaction_sync(t
);
341 for (size_t i
= 0; i
< 1000; i
++) {
343 ASSERT_EQ(asprintf(&key
, "key%3.3ld", i
), 6);
345 int r
= db
->get("O", key
, &value
);
346 ASSERT_EQ(r
, (i
>= 277 && i
< 467 ? -ENOENT
: 0));
354 TEST_P(KVTest
, RocksDBColumnFamilyTest
) {
355 if(string(GetParam()) != "rocksdb")
358 std::string
cfs("cf1 cf2");
359 ASSERT_EQ(0, db
->init(g_conf()->bluestore_rocksdb_options
));
360 cout
<< "creating two column families and opening them" << std::endl
;
361 ASSERT_EQ(0, db
->create_and_open(cout
, cfs
));
363 KeyValueDB::Transaction t
= db
->get_transaction();
365 value
.append("value");
366 cout
<< "write a transaction includes three keys in different CFs" << std::endl
;
367 t
->set("prefix", "key", value
);
368 t
->set("cf1", "key", value
);
369 t
->set("cf2", "key2", value
);
370 ASSERT_EQ(0, db
->submit_transaction_sync(t
));
375 ASSERT_EQ(0, db
->open(cout
, cfs
));
377 bufferlist v1
, v2
, v3
;
378 cout
<< "reopen db and read those keys" << std::endl
;
379 ASSERT_EQ(0, db
->get("prefix", "key", &v1
));
380 ASSERT_EQ(0, _bl_to_str(v1
) != "value");
381 ASSERT_EQ(0, db
->get("cf1", "key", &v2
));
382 ASSERT_EQ(0, _bl_to_str(v2
) != "value");
383 ASSERT_EQ(0, db
->get("cf2", "key2", &v3
));
384 ASSERT_EQ(0, _bl_to_str(v2
) != "value");
387 cout
<< "delete two keys in CFs" << std::endl
;
388 KeyValueDB::Transaction t
= db
->get_transaction();
389 t
->rmkey("prefix", "key");
390 t
->rmkey("cf2", "key2");
391 ASSERT_EQ(0, db
->submit_transaction_sync(t
));
396 ASSERT_EQ(0, db
->open(cout
, cfs
));
398 cout
<< "reopen db and read keys again." << std::endl
;
399 bufferlist v1
, v2
, v3
;
400 ASSERT_EQ(-ENOENT
, db
->get("prefix", "key", &v1
));
401 ASSERT_EQ(0, db
->get("cf1", "key", &v2
));
402 ASSERT_EQ(0, _bl_to_str(v2
) != "value");
403 ASSERT_EQ(-ENOENT
, db
->get("cf2", "key2", &v3
));
408 TEST_P(KVTest
, RocksDBIteratorTest
) {
409 if(string(GetParam()) != "rocksdb")
412 std::string
cfs("cf1");
413 ASSERT_EQ(0, db
->init(g_conf()->bluestore_rocksdb_options
));
414 cout
<< "creating one column family and opening it" << std::endl
;
415 ASSERT_EQ(0, db
->create_and_open(cout
, cfs
));
417 KeyValueDB::Transaction t
= db
->get_transaction();
422 cout
<< "write some kv pairs into default and new CFs" << std::endl
;
423 t
->set("prefix", "key1", bl1
);
424 t
->set("prefix", "key2", bl2
);
425 t
->set("cf1", "key1", bl1
);
426 t
->set("cf1", "key2", bl2
);
427 ASSERT_EQ(0, db
->submit_transaction_sync(t
));
430 cout
<< "iterating the default CF" << std::endl
;
431 KeyValueDB::Iterator iter
= db
->get_iterator("prefix");
432 iter
->seek_to_first();
433 ASSERT_EQ(1, iter
->valid());
434 ASSERT_EQ("key1", iter
->key());
435 ASSERT_EQ("hello", _bl_to_str(iter
->value()));
436 ASSERT_EQ(0, iter
->next());
437 ASSERT_EQ(1, iter
->valid());
438 ASSERT_EQ("key2", iter
->key());
439 ASSERT_EQ("world", _bl_to_str(iter
->value()));
442 cout
<< "iterating the new CF" << std::endl
;
443 KeyValueDB::Iterator iter
= db
->get_iterator("cf1");
444 iter
->seek_to_first();
445 ASSERT_EQ(1, iter
->valid());
446 ASSERT_EQ("key1", iter
->key());
447 ASSERT_EQ("hello", _bl_to_str(iter
->value()));
448 ASSERT_EQ(0, iter
->next());
449 ASSERT_EQ(1, iter
->valid());
450 ASSERT_EQ("key2", iter
->key());
451 ASSERT_EQ("world", _bl_to_str(iter
->value()));
456 TEST_P(KVTest
, RocksDBShardingIteratorTest
) {
457 if(string(GetParam()) != "rocksdb")
460 std::string
cfs("A(6)");
461 ASSERT_EQ(0, db
->init(g_conf()->bluestore_rocksdb_options
));
462 cout
<< "creating one column family and opening it" << std::endl
;
463 ASSERT_EQ(0, db
->create_and_open(cout
, cfs
));
465 KeyValueDB::Transaction t
= db
->get_transaction();
466 for (int v
= 100; v
<= 999; v
++) {
467 std::string str
= to_string(v
);
470 t
->set("A", str
, val
);
472 ASSERT_EQ(0, db
->submit_transaction_sync(t
));
475 KeyValueDB::Iterator it
= db
->get_iterator("A");
477 ASSERT_EQ(it
->lower_bound(to_string(pos
)), 0);
478 for (pos
= 100; pos
<= 999; pos
++) {
479 ASSERT_EQ(it
->valid(), true);
480 ASSERT_EQ(it
->key(), to_string(pos
));
481 ASSERT_EQ(it
->value().to_str(), to_string(pos
));
484 ASSERT_EQ(it
->valid(), false);
486 ASSERT_EQ(it
->lower_bound(to_string(pos
)), 0);
487 for (pos
= 999; pos
>= 100; pos
--) {
488 ASSERT_EQ(it
->valid(), true);
489 ASSERT_EQ(it
->key(), to_string(pos
));
490 ASSERT_EQ(it
->value().to_str(), to_string(pos
));
493 ASSERT_EQ(it
->valid(), false);
498 TEST_P(KVTest
, RocksDBCFMerge
) {
499 if(string(GetParam()) != "rocksdb")
502 shared_ptr
<KeyValueDB::MergeOperator
> p(new AppendMOP
);
503 int r
= db
->set_merge_operator("cf1",p
);
505 return; // No merge operators for this database type
506 std::string
cfs("cf1");
507 ASSERT_EQ(0, db
->init(g_conf()->bluestore_rocksdb_options
));
508 cout
<< "creating one column family and opening it" << std::endl
;
509 ASSERT_EQ(0, db
->create_and_open(cout
, cfs
));
512 KeyValueDB::Transaction t
= db
->get_transaction();
513 bufferlist v1
, v2
, v3
;
514 v1
.append(string("1"));
515 v2
.append(string("2"));
516 v3
.append(string("3"));
517 t
->set("P", "K1", v1
);
518 t
->set("cf1", "A1", v2
);
519 t
->rmkey("cf1", "A2");
520 t
->merge("cf1", "A2", v3
);
521 db
->submit_transaction_sync(t
);
524 bufferlist v1
, v2
, v3
;
525 ASSERT_EQ(0, db
->get("P", "K1", &v1
));
526 ASSERT_EQ(tostr(v1
), "1");
527 ASSERT_EQ(0, db
->get("cf1", "A1", &v2
));
528 ASSERT_EQ(tostr(v2
), "2");
529 ASSERT_EQ(0, db
->get("cf1", "A2", &v3
));
530 ASSERT_EQ(tostr(v3
), "?3");
533 KeyValueDB::Transaction t
= db
->get_transaction();
535 v1
.append(string("1"));
536 t
->merge("cf1", "A2", v1
);
537 db
->submit_transaction_sync(t
);
541 ASSERT_EQ(0, db
->get("cf1", "A2", &v
));
542 ASSERT_EQ(tostr(v
), "?31");
547 TEST_P(KVTest
, RocksDB_estimate_size
) {
548 if(string(GetParam()) != "rocksdb")
551 std::string
cfs("cf1");
552 ASSERT_EQ(0, db
->init(g_conf()->bluestore_rocksdb_options
));
553 cout
<< "creating one column family and opening it" << std::endl
;
554 ASSERT_EQ(0, db
->create_and_open(cout
));
556 for(int test
= 0; test
< 20; test
++)
558 KeyValueDB::Transaction t
= db
->get_transaction();
560 v1
.append(string(1000, '1'));
561 for (int i
= 0; i
< 100; i
++)
562 t
->set("A", to_string(rand()%100000), v1
);
563 db
->submit_transaction_sync(t
);
566 int64_t size_a
= db
->estimate_prefix_size("A","");
567 ASSERT_GT(size_a
, (test
+ 1) * 1000 * 100 * 0.5);
568 ASSERT_LT(size_a
, (test
+ 1) * 1000 * 100 * 1.5);
569 int64_t size_a1
= db
->estimate_prefix_size("A","1");
570 ASSERT_GT(size_a1
, (test
+ 1) * 1000 * 100 * 0.1 * 0.5);
571 ASSERT_LT(size_a1
, (test
+ 1) * 1000 * 100 * 0.1 * 1.5);
572 int64_t size_b
= db
->estimate_prefix_size("B","");
573 ASSERT_EQ(size_b
, 0);
579 TEST_P(KVTest
, RocksDB_estimate_size_column_family
) {
580 if(string(GetParam()) != "rocksdb")
583 std::string
cfs("cf1");
584 ASSERT_EQ(0, db
->init(g_conf()->bluestore_rocksdb_options
));
585 cout
<< "creating one column family and opening it" << std::endl
;
586 ASSERT_EQ(0, db
->create_and_open(cout
, cfs
));
588 for(int test
= 0; test
< 20; test
++)
590 KeyValueDB::Transaction t
= db
->get_transaction();
592 v1
.append(string(1000, '1'));
593 for (int i
= 0; i
< 100; i
++)
594 t
->set("cf1", to_string(rand()%100000), v1
);
595 db
->submit_transaction_sync(t
);
598 int64_t size_a
= db
->estimate_prefix_size("cf1","");
599 ASSERT_GT(size_a
, (test
+ 1) * 1000 * 100 * 0.5);
600 ASSERT_LT(size_a
, (test
+ 1) * 1000 * 100 * 1.5);
601 int64_t size_a1
= db
->estimate_prefix_size("cf1","1");
602 ASSERT_GT(size_a1
, (test
+ 1) * 1000 * 100 * 0.1 * 0.5);
603 ASSERT_LT(size_a1
, (test
+ 1) * 1000 * 100 * 0.1 * 1.5);
604 int64_t size_b
= db
->estimate_prefix_size("B","");
605 ASSERT_EQ(size_b
, 0);
611 TEST_P(KVTest
, RocksDB_parse_sharding_def
) {
612 if(string(GetParam()) != "rocksdb")
616 std::vector
<RocksDBStore::ColumnFamily
> sharding_def
;
617 char const* error_position
= nullptr;
618 std::string error_msg
;
620 std::string_view text_def
= "A(10,0-30) B(6)=option1,option2=aaaa C";
621 result
= RocksDBStore::parse_sharding_def(text_def
,
626 ASSERT_EQ(result
, true);
627 ASSERT_EQ(error_position
, nullptr);
628 ASSERT_EQ(error_msg
, "");
629 std::cout
<< text_def
<< std::endl
;
630 if (error_position
) std::cout
<< std::string(error_position
- text_def
.begin(), ' ') << "^" << error_msg
<< std::endl
;
632 ASSERT_EQ(sharding_def
.size(), 3);
633 ASSERT_EQ(sharding_def
[0].name
, "A");
634 ASSERT_EQ(sharding_def
[0].shard_cnt
, 10);
635 ASSERT_EQ(sharding_def
[0].hash_l
, 0);
636 ASSERT_EQ(sharding_def
[0].hash_h
, 30);
638 ASSERT_EQ(sharding_def
[1].name
, "B");
639 ASSERT_EQ(sharding_def
[1].shard_cnt
, 6);
640 ASSERT_EQ(sharding_def
[1].options
, "option1,option2=aaaa");
641 ASSERT_EQ(sharding_def
[2].name
, "C");
642 ASSERT_EQ(sharding_def
[2].shard_cnt
, 1);
645 text_def
= "A(10 B(6)=option C";
646 result
= RocksDBStore::parse_sharding_def(text_def
,
650 std::cout
<< text_def
<< std::endl
;
652 std::cout
<< std::string(error_position
- text_def
.begin(), ' ') << "^" << error_msg
<< std::endl
;
653 ASSERT_EQ(result
, false);
654 ASSERT_NE(error_position
, nullptr);
655 ASSERT_NE(error_msg
, "");
657 text_def
= "A(10,1) B(6)=option C";
658 result
= RocksDBStore::parse_sharding_def(text_def
,
662 std::cout
<< text_def
<< std::endl
;
663 std::cout
<< std::string(error_position
- text_def
.begin(), ' ') << "^" << error_msg
<< std::endl
;
664 ASSERT_EQ(result
, false);
665 ASSERT_NE(error_position
, nullptr);
666 ASSERT_NE(error_msg
, "");
671 class RocksDBShardingTest
: public ::testing::TestWithParam
<const char*> {
673 boost::scoped_ptr
<KeyValueDB
> db
;
675 RocksDBShardingTest() : db(0) {}
677 string
_bl_to_str(bufferlist val
) {
678 string
str(val
.c_str(), val
.length());
682 void rm_r(string path
) {
683 string cmd
= string("rm -r ") + path
;
685 cout
<< "==> " << cmd
<< std::endl
;
686 int r
= ::system(cmd
.c_str());
688 cerr
<< "failed with exit code " << r
689 << ", continuing anyway" << std::endl
;
693 void SetUp() override
{
694 verbose
= getenv("VERBOSE") && strcmp(getenv("VERBOSE"), "1") == 0;
696 int r
= ::mkdir("kv_test_temp_dir", 0777);
697 if (r
< 0 && errno
!= EEXIST
) {
699 cerr
<< __func__
<< ": unable to create kv_test_temp_dir: "
700 << cpp_strerror(r
) << std::endl
;
703 db
.reset(KeyValueDB::create(g_ceph_context
, "rocksdb",
704 "kv_test_temp_dir"));
705 ASSERT_EQ(0, db
->init(g_conf()->bluestore_rocksdb_options
));
707 cout
<< "Creating database with sharding: " << GetParam() << std::endl
;
708 ASSERT_EQ(0, db
->create_and_open(cout
, GetParam()));
710 void TearDown() override
{
712 rm_r("kv_test_temp_dir");
717 B - shard 1/3 x 0/1/20
719 D - shard 1/3 x 0/1/20
723 std::vector
<std::string
> sharding_defs
= {
727 "Betelgeuse(3) D(3)"};
728 std::vector
<std::string
> prefixes
= {"Ad", "Betelgeuse", "C", "D", "Evade"};
729 std::vector
<std::string
> randoms
= {"0", "1", "2", "3", "4", "5",
730 "found", "brain", "fully", "pen", "worth", "race",
731 "stand", "nodded", "whenever", "surrounded", "industrial", "skin",
732 "this", "direction", "family", "beginning", "whenever", "held",
733 "metal", "year", "like", "valuable", "softly", "whistle",
734 "perfectly", "broken", "idea", "also", "coffee", "branch",
735 "tongue", "immediately", "bent", "partly", "burn", "include",
736 "certain", "burst", "final", "smoke", "positive", "perfectly"
738 int R
= randoms
.size();
740 typedef int test_id
[6];
741 void zero(test_id
& x
) {
747 bool end(const test_id
& x
) {
750 void next(test_id
& x
) {
752 for (int i
= 0; i
< 5; i
++) {
760 std::map
<std::string
, std::string
> data
;
764 void generate_data(const test_id
& x
) {
766 for (int i
= 0; i
< 5; i
++) {
768 std::cout
<< x
[i
] << "-";
773 data
[RocksDBStore::combine_strings(prefixes
[i
], randoms
[k
++ % R
])] = randoms
[v
++ % R
];
776 std::string base
= randoms
[k
++ % R
];
777 for (int j
= 0; j
< 10; j
++) {
778 data
[RocksDBStore::combine_strings(prefixes
[i
], base
+ "." + randoms
[k
++ % R
])] = randoms
[v
++ % R
];
786 KeyValueDB::Transaction t
= db
->get_transaction();
787 for (auto &d
: data
) {
792 RocksDBStore::split_key(d
.first
, &prefix
, &key
);
793 t
->set(prefix
, key
, v1
);
795 std::cout
<< "SET " << prefix
<< " " << key
<< std::endl
;
797 ASSERT_EQ(db
->submit_transaction_sync(t
), 0);
801 KeyValueDB::Transaction t
= db
->get_transaction();
802 for (auto &d
: data
) {
805 RocksDBStore::split_key(d
.first
, &prefix
, &key
);
806 t
->rmkey(prefix
, key
);
808 ASSERT_EQ(db
->submit_transaction_sync(t
), 0);
809 //paranoid, check if db empty
810 KeyValueDB::WholeSpaceIterator it
= db
->get_wholespace_iterator();
811 ASSERT_EQ(it
->seek_to_first(), 0);
812 ASSERT_EQ(it
->valid(), false);
816 TEST_P(RocksDBShardingTest
, wholespace_next
) {
823 KeyValueDB::WholeSpaceIterator it
= db
->get_wholespace_iterator();
825 auto dit
= data
.begin();
826 int r
= it
->seek_to_first();
828 ASSERT_EQ(it
->valid(), (dit
!= data
.end()));
830 while (dit
!= data
.end()) {
831 ASSERT_EQ(it
->valid(), true);
834 RocksDBStore::split_key(dit
->first
, &prefix
, &key
);
835 auto raw_key
= it
->raw_key();
836 ASSERT_EQ(raw_key
.first
, prefix
);
837 ASSERT_EQ(raw_key
.second
, key
);
838 ASSERT_EQ(it
->value().to_str(), dit
->second
);
840 std::cout
<< "next " << prefix
<< " " << key
<< std::endl
;
841 ASSERT_EQ(it
->next(), 0);
844 ASSERT_EQ(it
->valid(), false);
851 TEST_P(RocksDBShardingTest
, wholespace_prev
) {
858 KeyValueDB::WholeSpaceIterator it
= db
->get_wholespace_iterator();
859 auto dit
= data
.rbegin();
860 int r
= it
->seek_to_last();
862 ASSERT_EQ(it
->valid(), (dit
!= data
.rend()));
864 while (dit
!= data
.rend()) {
865 ASSERT_EQ(it
->valid(), true);
868 RocksDBStore::split_key(dit
->first
, &prefix
, &key
);
869 auto raw_key
= it
->raw_key();
870 ASSERT_EQ(raw_key
.first
, prefix
);
871 ASSERT_EQ(raw_key
.second
, key
);
872 ASSERT_EQ(it
->value().to_str(), dit
->second
);
874 std::cout
<< "prev " << prefix
<< " " << key
<< std::endl
;
875 ASSERT_EQ(it
->prev(), 0);
878 ASSERT_EQ(it
->valid(), false);
885 TEST_P(RocksDBShardingTest
, wholespace_lower_bound
) {
892 KeyValueDB::WholeSpaceIterator it
= db
->get_wholespace_iterator();
893 auto dit
= data
.begin();
894 int r
= it
->seek_to_first();
896 ASSERT_EQ(it
->valid(), (dit
!= data
.end()));
898 while (dit
!= data
.end()) {
899 ASSERT_EQ(it
->valid(), true);
902 RocksDBStore::split_key(dit
->first
, &prefix
, &key
);
903 KeyValueDB::WholeSpaceIterator it1
= db
->get_wholespace_iterator();
904 ASSERT_EQ(it1
->lower_bound(prefix
, key
), 0);
905 ASSERT_EQ(it1
->valid(), true);
906 auto raw_key
= it1
->raw_key();
907 ASSERT_EQ(raw_key
.first
, prefix
);
908 ASSERT_EQ(raw_key
.second
, key
);
910 std::cout
<< "lower_bound " << prefix
<< " " << key
<< std::endl
;
911 ASSERT_EQ(it
->next(), 0);
914 ASSERT_EQ(it
->valid(), false);
921 TEST_P(RocksDBShardingTest
, wholespace_upper_bound
) {
928 KeyValueDB::WholeSpaceIterator it
= db
->get_wholespace_iterator();
929 auto dit
= data
.begin();
930 int r
= it
->seek_to_first();
932 ASSERT_EQ(it
->valid(), (dit
!= data
.end()));
934 while (dit
!= data
.end()) {
935 ASSERT_EQ(it
->valid(), true);
939 RocksDBStore::split_key(dit
->first
, &prefix
, &key
);
940 //decrement key minimally
941 key_minus_1
= key
.substr(0, key
.length() - 1) + std::string(1, key
[key
.length() - 1] - 1);
942 KeyValueDB::WholeSpaceIterator it1
= db
->get_wholespace_iterator();
943 ASSERT_EQ(it1
->upper_bound(prefix
, key_minus_1
), 0);
944 ASSERT_EQ(it1
->valid(), true);
945 auto raw_key
= it1
->raw_key();
946 ASSERT_EQ(raw_key
.first
, prefix
);
947 ASSERT_EQ(raw_key
.second
, key
);
949 std::cout
<< "upper_bound " << prefix
<< " " << key_minus_1
<< std::endl
;
950 ASSERT_EQ(it
->next(), 0);
953 ASSERT_EQ(it
->valid(), false);
960 TEST_P(RocksDBShardingTest
, wholespace_lookup_limits
) {
967 //lookup before first
968 if (data
.size() > 0) {
969 auto dit
= data
.begin();
972 RocksDBStore::split_key(dit
->first
, &prefix
, &key
);
973 KeyValueDB::WholeSpaceIterator it1
= db
->get_wholespace_iterator();
974 ASSERT_EQ(it1
->lower_bound(" ", " "), 0);
975 ASSERT_EQ(it1
->valid(), true);
976 auto raw_key
= it1
->raw_key();
977 ASSERT_EQ(raw_key
.first
, prefix
);
978 ASSERT_EQ(raw_key
.second
, key
);
981 KeyValueDB::WholeSpaceIterator it1
= db
->get_wholespace_iterator();
982 ASSERT_EQ(it1
->lower_bound("~", "~"), 0);
983 ASSERT_EQ(it1
->valid(), false);
992 class RocksDBResharding
: public ::testing::Test
{
994 boost::scoped_ptr
<RocksDBStore
> db
;
996 RocksDBResharding() : db(0) {}
998 string
_bl_to_str(bufferlist val
) {
999 string
str(val
.c_str(), val
.length());
1003 void rm_r(string path
) {
1004 string cmd
= string("rm -r ") + path
;
1006 cout
<< "==> " << cmd
<< std::endl
;
1007 int r
= ::system(cmd
.c_str());
1009 cerr
<< "failed with exit code " << r
1010 << ", continuing anyway" << std::endl
;
1014 void SetUp() override
{
1015 verbose
= getenv("VERBOSE") && strcmp(getenv("VERBOSE"), "1") == 0;
1017 int r
= ::mkdir("kv_test_temp_dir", 0777);
1018 if (r
< 0 && errno
!= EEXIST
) {
1020 cerr
<< __func__
<< ": unable to create kv_test_temp_dir: "
1021 << cpp_strerror(r
) << std::endl
;
1025 KeyValueDB
* db_kv
= KeyValueDB::create(g_ceph_context
, "rocksdb",
1026 "kv_test_temp_dir");
1027 RocksDBStore
* db_rocks
= dynamic_cast<RocksDBStore
*>(db_kv
);
1028 ceph_assert(db_rocks
);
1030 ASSERT_EQ(0, db
->init(g_conf()->bluestore_rocksdb_options
));
1032 void TearDown() override
{
1034 rm_r("kv_test_temp_dir");
1038 std::vector
<std::string
> prefixes
= {"Ad", "Betelgeuse", "C", "D", "Evade"};
1039 std::vector
<std::string
> randoms
= {"0", "1", "2", "3", "4", "5",
1040 "found", "brain", "fully", "pen", "worth", "race",
1041 "stand", "nodded", "whenever", "surrounded", "industrial", "skin",
1042 "this", "direction", "family", "beginning", "whenever", "held",
1043 "metal", "year", "like", "valuable", "softly", "whistle",
1044 "perfectly", "broken", "idea", "also", "coffee", "branch",
1045 "tongue", "immediately", "bent", "partly", "burn", "include",
1046 "certain", "burst", "final", "smoke", "positive", "perfectly"
1048 int R
= randoms
.size();
1050 std::map
<std::string
, std::string
> data
;
1052 void generate_data() {
1054 for (size_t p
= 0; p
< prefixes
.size(); p
++) {
1055 size_t elem_count
= 1 << (( p
* 3 ) + 3);
1056 for (size_t i
= 0; i
< elem_count
; i
++) {
1058 for (int x
= 0; x
< 5; x
++) {
1059 key
= key
+ randoms
[rand() % R
];
1062 for (int x
= 0; x
< 3; x
++) {
1063 value
= value
+ randoms
[rand() % R
];
1065 data
[RocksDBStore::combine_strings(prefixes
[p
], key
)] = value
;
1071 KeyValueDB::Transaction t
= db
->get_transaction();
1073 for (auto& d
: data
) {
1075 v1
.append(d
.second
);
1078 RocksDBStore::split_key(d
.first
, &prefix
, &key
);
1079 t
->set(prefix
, key
, v1
);
1081 std::cout
<< "SET " << prefix
<< " " << key
<< std::endl
;
1083 if ((i
% 1000) == 0) {
1084 ASSERT_EQ(db
->submit_transaction_sync(t
), 0);
1087 std::cout
<< "writing key to DB" << std::endl
;
1088 t
= db
->get_transaction();
1092 std::cout
<< "writing keys to DB" << std::endl
;
1093 ASSERT_EQ(db
->submit_transaction_sync(t
), 0);
1097 KeyValueDB::Transaction t
= db
->get_transaction();
1098 for (auto &d
: data
) {
1101 RocksDBStore::split_key(d
.first
, &prefix
, &key
);
1102 t
->rmkey(prefix
, key
);
1104 ASSERT_EQ(db
->submit_transaction_sync(t
), 0);
1105 //paranoid, check if db empty
1106 KeyValueDB::WholeSpaceIterator it
= db
->get_wholespace_iterator();
1107 ASSERT_EQ(it
->seek_to_first(), 0);
1108 ASSERT_EQ(it
->valid(), false);
1112 KeyValueDB::WholeSpaceIterator it
= db
->get_wholespace_iterator();
1114 auto dit
= data
.begin();
1115 int r
= it
->seek_to_first();
1117 ASSERT_EQ(it
->valid(), (dit
!= data
.end()));
1119 while (dit
!= data
.end()) {
1120 ASSERT_EQ(it
->valid(), true);
1123 RocksDBStore::split_key(dit
->first
, &prefix
, &key
);
1124 auto raw_key
= it
->raw_key();
1125 ASSERT_EQ(raw_key
.first
, prefix
);
1126 ASSERT_EQ(raw_key
.second
, key
);
1127 ASSERT_EQ(it
->value().to_str(), dit
->second
);
1129 std::cout
<< "next " << prefix
<< " " << key
<< std::endl
;
1130 ASSERT_EQ(it
->next(), 0);
1133 ASSERT_EQ(it
->valid(), false);
1137 TEST_F(RocksDBResharding
, basic
) {
1138 ASSERT_EQ(0, db
->create_and_open(cout
, ""));
1143 ASSERT_EQ(db
->reshard("Evade(4)"), 0);
1144 ASSERT_EQ(db
->open(cout
), 0);
1149 TEST_F(RocksDBResharding
, all_to_shards
) {
1150 ASSERT_EQ(0, db
->create_and_open(cout
, ""));
1155 ASSERT_EQ(db
->reshard("Ad(1) Betelgeuse(1) C(1) D(1) Evade(1)"), 0);
1156 ASSERT_EQ(db
->open(cout
), 0);
1161 TEST_F(RocksDBResharding
, all_to_shards_and_back_again
) {
1162 ASSERT_EQ(0, db
->create_and_open(cout
, ""));
1167 ASSERT_EQ(db
->reshard("Ad(1) Betelgeuse(1) C(1) D(1) Evade(1)"), 0);
1168 ASSERT_EQ(db
->open(cout
), 0);
1171 ASSERT_EQ(db
->reshard(""), 0);
1172 ASSERT_EQ(db
->open(cout
), 0);
1177 TEST_F(RocksDBResharding
, resume_interrupted_at_batch
) {
1178 ASSERT_EQ(0, db
->create_and_open(cout
, ""));
1183 RocksDBStore::resharding_ctrl ctrl
;
1184 ctrl
.unittest_fail_after_first_batch
= true;
1185 ASSERT_EQ(db
->reshard("Evade(4)", &ctrl
), -1000);
1186 ASSERT_NE(db
->open(cout
), 0);
1187 ASSERT_EQ(db
->reshard("Evade(4)"), 0);
1188 ASSERT_EQ(db
->open(cout
), 0);
1193 TEST_F(RocksDBResharding
, resume_interrupted_at_column
) {
1194 ASSERT_EQ(0, db
->create_and_open(cout
, ""));
1199 RocksDBStore::resharding_ctrl ctrl
;
1200 ctrl
.unittest_fail_after_processing_column
= true;
1201 ASSERT_EQ(db
->reshard("Evade(4)", &ctrl
), -1001);
1202 ASSERT_NE(db
->open(cout
), 0);
1203 ASSERT_EQ(db
->reshard("Evade(4)"), 0);
1204 ASSERT_EQ(db
->open(cout
), 0);
1209 TEST_F(RocksDBResharding
, resume_interrupted_before_commit
) {
1210 ASSERT_EQ(0, db
->create_and_open(cout
, ""));
1215 RocksDBStore::resharding_ctrl ctrl
;
1216 ctrl
.unittest_fail_after_successful_processing
= true;
1217 ASSERT_EQ(db
->reshard("Evade(4)", &ctrl
), -1002);
1218 ASSERT_NE(db
->open(cout
), 0);
1219 ASSERT_EQ(db
->reshard("Evade(4)"), 0);
1220 ASSERT_EQ(db
->open(cout
), 0);
1225 TEST_F(RocksDBResharding
, prevent_incomplete_hash_change
) {
1226 ASSERT_EQ(0, db
->create_and_open(cout
, "Evade(4,0-3)"));
1231 RocksDBStore::resharding_ctrl ctrl
;
1232 ctrl
.unittest_fail_after_successful_processing
= true;
1233 ASSERT_EQ(db
->reshard("Evade(4,0-8)", &ctrl
), -1002);
1234 ASSERT_NE(db
->open(cout
), 0);
1235 ASSERT_EQ(db
->reshard("Evade(4,0-8)"), 0);
1236 ASSERT_EQ(db
->open(cout
), 0);
1241 TEST_F(RocksDBResharding
, change_reshard
) {
1242 ASSERT_EQ(0, db
->create_and_open(cout
, "Ad(4)"));
1248 RocksDBStore::resharding_ctrl ctrl
;
1249 ctrl
.unittest_fail_after_first_batch
= true;
1250 ASSERT_EQ(db
->reshard("C(5) D(3)", &ctrl
), -1000);
1253 RocksDBStore::resharding_ctrl ctrl
;
1254 ASSERT_NE(db
->open(cout
), 0);
1255 ctrl
.unittest_fail_after_first_batch
= false;
1256 ctrl
.unittest_fail_after_processing_column
= true;
1257 ASSERT_EQ(db
->reshard("C(5) Evade(2)", &ctrl
), -1001);
1260 RocksDBStore::resharding_ctrl ctrl
;
1261 ASSERT_NE(db
->open(cout
), 0);
1262 ctrl
.unittest_fail_after_processing_column
= false;
1263 ctrl
.unittest_fail_after_successful_processing
= true;
1264 ASSERT_EQ(db
->reshard("Evade(2) D(3)", &ctrl
), -1002);
1267 ASSERT_NE(db
->open(cout
), 0);
1268 ASSERT_EQ(db
->reshard("Ad(1) Evade(5)"), 0);
1271 ASSERT_EQ(db
->open(cout
), 0);
1278 INSTANTIATE_TEST_SUITE_P(
1281 ::testing::Values("rocksdb", "memdb"));
1283 INSTANTIATE_TEST_SUITE_P(
1285 RocksDBShardingTest
,
1286 ::testing::Values("Betelgeuse D",
1289 "Betelgeuse(3) D(3)"));
1291 int main(int argc
, char **argv
) {
1292 auto args
= argv_to_vec(argc
, argv
);
1293 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
1294 CODE_ENVIRONMENT_UTILITY
,
1295 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
1296 common_init_finish(g_ceph_context
);
1297 g_ceph_context
->_conf
.set_val(
1298 "enable_experimental_unrecoverable_data_corrupting_features",
1300 g_ceph_context
->_conf
.apply_changes(nullptr);
1302 ::testing::InitGoogleTest(&argc
, argv
);
1303 return RUN_ALL_TESTS();