import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.rocksdb.util.ByteBufferAllocator;
public class WriteBatchWithIndexTest {
}
}
+ @Test
+ public void readYourOwnWritesCfIterDirectBB() throws RocksDBException {
+ readYourOwnWritesCfIterDirect(ByteBufferAllocator.DIRECT);
+ }
+
+ @Test
+ public void readYourOwnWritesCfIterIndirectBB() throws RocksDBException {
+ readYourOwnWritesCfIterDirect(ByteBufferAllocator.HEAP);
+ }
+
+ public void readYourOwnWritesCfIterDirect(final ByteBufferAllocator byteBufferAllocator)
+ throws RocksDBException {
+ final List<ColumnFamilyDescriptor> cfNames =
+ Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
+ new ColumnFamilyDescriptor("new_cf".getBytes()));
+
+ final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
+
+ // Test open database with column family names
+ try (final DBOptions options =
+ new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
+ final RocksDB db = RocksDB.open(
+ options, dbFolder.getRoot().getAbsolutePath(), cfNames, columnFamilyHandleList)) {
+ final ColumnFamilyHandle newCf = columnFamilyHandleList.get(1);
+
+ try {
+ final byte[] kv1 = "key1".getBytes();
+ final byte[] vv1 = "value1".getBytes();
+ final ByteBuffer k1 = byteBufferAllocator.allocate(12);
+ k1.put(kv1);
+ final byte[] kv2 = "key2".getBytes();
+ final byte[] vv2 = "value2".getBytes();
+ final ByteBuffer k2 = byteBufferAllocator.allocate(12);
+ k2.put(kv2);
+
+ db.put(newCf, kv1, vv1);
+ db.put(newCf, kv2, vv2);
+
+ try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
+ final ReadOptions readOptions = new ReadOptions();
+ final RocksIterator base = db.newIterator(newCf, readOptions);
+ final RocksIterator it = wbwi.newIteratorWithBase(newCf, base, readOptions)) {
+ k1.flip();
+ it.seek(k1);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv1);
+ assertThat(it.value()).isEqualTo(vv1);
+
+ k2.flip();
+ it.seek(k2);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv2);
+ assertThat(it.value()).isEqualTo(vv2);
+
+ final byte[] kv1point5 = "key1point5".getBytes();
+ final ByteBuffer k1point5 = byteBufferAllocator.allocate(12);
+ k1point5.put(kv1point5);
+
+ k1point5.flip();
+ it.seek(k1point5);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv2);
+ assertThat(it.value()).isEqualTo(vv2);
+
+ k1point5.flip();
+ it.seekForPrev(k1point5);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv1);
+ assertThat(it.value()).isEqualTo(vv1);
+
+ // put data to the write batch and make sure we can read it.
+ final byte[] kv3 = "key3".getBytes();
+ final ByteBuffer k3 = byteBufferAllocator.allocate(12);
+ k3.put(kv3);
+ final byte[] vv3 = "value3".getBytes();
+ wbwi.put(newCf, kv3, vv3);
+ k3.flip();
+ it.seek(k3);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv3);
+ assertThat(it.value()).isEqualTo(vv3);
+
+ // update k2 in the write batch and check the value
+ final byte[] v2Other = "otherValue2".getBytes();
+ wbwi.put(newCf, kv2, v2Other);
+ k2.flip();
+ it.seek(k2);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv2);
+ assertThat(it.value()).isEqualTo(v2Other);
+
+ // delete k1 and make sure we can read back the write
+ wbwi.delete(newCf, kv1);
+ k1.flip();
+ it.seek(k1);
+ assertThat(it.key()).isNotEqualTo(kv1);
+
+ // reinsert k1 and make sure we see the new value
+ final byte[] v1Other = "otherValue1".getBytes();
+ wbwi.put(newCf, kv1, v1Other);
+ k1.flip();
+ it.seek(k1);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv1);
+ assertThat(it.value()).isEqualTo(v1Other);
+
+ // single remove k3 and make sure we can read back the write
+ wbwi.singleDelete(newCf, kv3);
+ k3.flip();
+ it.seek(k3);
+ assertThat(it.isValid()).isEqualTo(false);
+
+ // reinsert k3 and make sure we see the new value
+ final byte[] v3Other = "otherValue3".getBytes();
+ wbwi.put(newCf, kv3, v3Other);
+ k3.flip();
+ it.seek(k3);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv3);
+ assertThat(it.value()).isEqualTo(v3Other);
+ }
+ } finally {
+ for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) {
+ columnFamilyHandle.close();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void readYourOwnWritesCfIterIndirect() throws RocksDBException {
+ final List<ColumnFamilyDescriptor> cfNames =
+ Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
+ new ColumnFamilyDescriptor("new_cf".getBytes()));
+
+ final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
+
+ // Test open database with column family names
+ try (final DBOptions options =
+ new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
+ final RocksDB db = RocksDB.open(
+ options, dbFolder.getRoot().getAbsolutePath(), cfNames, columnFamilyHandleList)) {
+ final ColumnFamilyHandle newCf = columnFamilyHandleList.get(1);
+
+ try {
+ final byte[] kv1 = "key1".getBytes();
+ final byte[] vv1 = "value1".getBytes();
+ final ByteBuffer k1 = ByteBuffer.allocate(12);
+ k1.put(kv1).flip();
+ final byte[] kv2 = "key2".getBytes();
+ final byte[] vv2 = "value2".getBytes();
+ final ByteBuffer k2 = ByteBuffer.allocate(12);
+ k2.put(kv2).flip();
+
+ db.put(newCf, kv1, vv1);
+ db.put(newCf, kv2, vv2);
+
+ try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
+ final ReadOptions readOptions = new ReadOptions();
+ final RocksIterator base = db.newIterator(newCf, readOptions);
+ final RocksIterator it = wbwi.newIteratorWithBase(newCf, base, readOptions)) {
+ it.seek(k1);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv1);
+ assertThat(it.value()).isEqualTo(vv1);
+
+ it.seek(k2);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv2);
+ assertThat(it.value()).isEqualTo(vv2);
+
+ // put data to the write batch and make sure we can read it.
+ final byte[] kv3 = "key3".getBytes();
+ final ByteBuffer k3 = ByteBuffer.allocate(12);
+ k3.put(kv3);
+ final byte[] vv3 = "value3".getBytes();
+ wbwi.put(newCf, kv3, vv3);
+ k3.flip();
+ it.seek(k3);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv3);
+ assertThat(it.value()).isEqualTo(vv3);
+
+ // update k2 in the write batch and check the value
+ final byte[] v2Other = "otherValue2".getBytes();
+ wbwi.put(newCf, kv2, v2Other);
+ k2.flip();
+ it.seek(k2);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv2);
+ assertThat(it.value()).isEqualTo(v2Other);
+
+ // delete k1 and make sure we can read back the write
+ wbwi.delete(newCf, kv1);
+ k1.flip();
+ it.seek(k1);
+ assertThat(it.key()).isNotEqualTo(kv1);
+
+ // reinsert k1 and make sure we see the new value
+ final byte[] v1Other = "otherValue1".getBytes();
+ wbwi.put(newCf, kv1, v1Other);
+ k1.flip();
+ it.seek(k1);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv1);
+ assertThat(it.value()).isEqualTo(v1Other);
+
+ // single remove k3 and make sure we can read back the write
+ wbwi.singleDelete(newCf, kv3);
+ k3.flip();
+ it.seek(k3);
+ assertThat(it.isValid()).isEqualTo(false);
+
+ // reinsert k3 and make sure we see the new value
+ final byte[] v3Other = "otherValue3".getBytes();
+ wbwi.put(newCf, kv3, v3Other);
+ k3.flip();
+ it.seek(k3);
+ assertThat(it.isValid()).isTrue();
+ assertThat(it.key()).isEqualTo(kv3);
+ assertThat(it.value()).isEqualTo(v3Other);
+ }
+ } finally {
+ for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) {
+ columnFamilyHandle.close();
+ }
+ }
+ }
+ }
+
@Test
public void writeBatchWithIndex() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
public void write_writeBatchWithIndexDirect() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
- ByteBuffer k1 = ByteBuffer.allocateDirect(16);
- ByteBuffer v1 = ByteBuffer.allocateDirect(16);
- ByteBuffer k2 = ByteBuffer.allocateDirect(16);
- ByteBuffer v2 = ByteBuffer.allocateDirect(16);
+ final ByteBuffer k1 = ByteBuffer.allocateDirect(16);
+ final ByteBuffer v1 = ByteBuffer.allocateDirect(16);
+ final ByteBuffer k2 = ByteBuffer.allocateDirect(16);
+ final ByteBuffer v2 = ByteBuffer.allocateDirect(16);
k1.put("key1".getBytes()).flip();
v1.put("value1".getBytes()).flip();
k2.put("key2".getBytes()).flip();
final String v3 = "value3";
final String k4 = "key4";
final String k5 = "key5";
- final String k6 = "key6";
- final String k7 = "key7";
final String v8 = "value8";
final byte[] k1b = k1.getBytes(UTF_8);
final byte[] v1b = v1.getBytes(UTF_8);
final byte[] v3b = v3.getBytes(UTF_8);
final byte[] k4b = k4.getBytes(UTF_8);
final byte[] k5b = k5.getBytes(UTF_8);
- final byte[] k6b = k6.getBytes(UTF_8);
- final byte[] k7b = k7.getBytes(UTF_8);
final byte[] v8b = v8.getBytes(UTF_8);
+ final String k1point5 = "key1point5";
+ final String k2point5 = "key2point5";
+
// add put records
wbwi.put(k1b, v1b);
wbwi.put(k2b, v2b);
try (final WBWIRocksIterator it = wbwi.newIterator()) {
//direct access - seek to key offsets
final int[] testOffsets = {2, 0, 3, 4, 1};
-
- for (int i = 0; i < testOffsets.length; i++) {
- final int testOffset = testOffsets[i];
+ for (final int testOffset : testOffsets) {
final byte[] key = toArray(expected[testOffset].getKey().data());
it.seek(key);
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry).isEqualTo(expected[testOffset]);
+ }
+
+ for (final int testOffset : testOffsets) {
+ final byte[] key = toArray(expected[testOffset].getKey().data());
// Direct buffer seek
- expected[testOffset].getKey().data().mark();
- ByteBuffer db = expected[testOffset].getKey().data();
+ final ByteBuffer db = expected[testOffset].getKey().data();
it.seek(db);
assertThat(db.position()).isEqualTo(key.length);
assertThat(it.isValid()).isTrue();
+
+ final WBWIRocksIterator.WriteEntry entry = it.entry();
+ assertThat(entry).isEqualTo(expected[testOffset]);
+ }
+
+ for (final int testOffset : testOffsets) {
+ final byte[] key = toArray(expected[testOffset].getKey().data());
+
+ // Direct buffer seek
+ final ByteBuffer db = expected[testOffset].getKey().data();
+ it.seekForPrev(db);
+ assertThat(db.position()).isEqualTo(key.length);
+ assertThat(it.isValid()).isTrue();
+
+ final WBWIRocksIterator.WriteEntry entry = it.entry();
+ assertThat(entry).isEqualTo(expected[testOffset]);
+ }
+
+ for (final int testOffset : testOffsets) {
+ final byte[] key = toArray(expected[testOffset].getKey().data());
+
+ // Indirect buffer seek
+ final ByteBuffer db = ByteBuffer.allocate(key.length);
+ System.arraycopy(key, 0, db.array(), 0, key.length);
+ it.seek(db);
+ assertThat(db.position()).isEqualTo(key.length);
+ assertThat(it.isValid()).isTrue();
+
+ final WBWIRocksIterator.WriteEntry entry = it.entry();
+ assertThat(entry).isEqualTo(expected[testOffset]);
+ }
+
+ for (final int testOffset : testOffsets) {
+ final byte[] key = toArray(expected[testOffset].getKey().data());
+
+ // Indirect buffer seek for prev
+ final ByteBuffer db = ByteBuffer.allocate(key.length);
+ System.arraycopy(key, 0, db.array(), 0, key.length);
+ it.seekForPrev(db);
+ assertThat(db.position()).isEqualTo(key.length);
+ assertThat(it.isValid()).isTrue();
+
+ final WBWIRocksIterator.WriteEntry entry = it.entry();
+ assertThat(entry).isEqualTo(expected[testOffset]);
+ }
+
+ {
+ it.seekForPrev(k2point5.getBytes());
+ assertThat(it.isValid()).isTrue();
+ final WBWIRocksIterator.WriteEntry entry = it.entry();
+ assertThat(entry).isEqualTo(expected[1]);
+ }
+
+ {
+ it.seekForPrev(k1point5.getBytes());
+ assertThat(it.isValid()).isTrue();
+ final WBWIRocksIterator.WriteEntry entry = it.entry();
+ assertThat(entry).isEqualTo(expected[0]);
+ }
+
+ {
+ final ByteBuffer db = ByteBuffer.allocate(k2point5.length());
+ db.put(k2point5.getBytes());
+ db.flip();
+ it.seekForPrev(db);
+ assertThat(it.isValid()).isTrue();
+ final WBWIRocksIterator.WriteEntry entry = it.entry();
+ assertThat(entry).isEqualTo(expected[1]);
+ }
+
+ {
+ final ByteBuffer db = ByteBuffer.allocate(k1point5.length());
+ db.put(k1point5.getBytes());
+ db.flip();
+ it.seekForPrev(db);
+ assertThat(it.isValid()).isTrue();
+ final WBWIRocksIterator.WriteEntry entry = it.entry();
+ assertThat(entry).isEqualTo(expected[0]);
}
//forward iterative access
assertThat(db.get("key4".getBytes())).isEqualTo("xyz".getBytes());
}
}
+
+ @Test
+ public void iteratorWithBaseOverwriteTrue() throws RocksDBException {
+ try (final Options options = new Options().setCreateIfMissing(true);
+ final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
+ try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
+ final RocksIterator baseIter = db.newIterator();
+ final RocksIterator wbwiIter = wbwi.newIteratorWithBase(baseIter)) {
+ assertThat(wbwiIter).isNotNull();
+ assertThat(wbwiIter.nativeHandle_).isGreaterThan(0);
+ wbwiIter.status();
+ }
+
+ try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
+ final RocksIterator baseIter = db.newIterator();
+ final ReadOptions readOptions = new ReadOptions();
+ final RocksIterator wbwiIter = wbwi.newIteratorWithBase(baseIter, readOptions)) {
+ assertThat(wbwiIter).isNotNull();
+ assertThat(wbwiIter.nativeHandle_).isGreaterThan(0);
+ wbwiIter.status();
+ }
+ }
+
+ final List<ColumnFamilyDescriptor> cfNames =
+ Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
+ new ColumnFamilyDescriptor("new_cf".getBytes()));
+ final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
+ try (final DBOptions options =
+ new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
+ final RocksDB db = RocksDB.open(
+ options, dbFolder.getRoot().getAbsolutePath(), cfNames, columnFamilyHandleList)) {
+ try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
+ final RocksIterator baseIter = db.newIterator();
+ final RocksIterator wbwiIter =
+ wbwi.newIteratorWithBase(columnFamilyHandleList.get(1), baseIter)) {
+ assertThat(wbwiIter).isNotNull();
+ assertThat(wbwiIter.nativeHandle_).isGreaterThan(0);
+ wbwiIter.status();
+ }
+
+ try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
+ final RocksIterator baseIter = db.newIterator();
+ final ReadOptions readOptions = new ReadOptions();
+ final RocksIterator wbwiIter =
+ wbwi.newIteratorWithBase(columnFamilyHandleList.get(1), baseIter, readOptions)) {
+ assertThat(wbwiIter).isNotNull();
+ assertThat(wbwiIter.nativeHandle_).isGreaterThan(0);
+ wbwiIter.status();
+ }
+ }
+ }
+
+ @Test
+ public void iteratorWithBaseOverwriteFalse() throws RocksDBException {
+ try (final Options options = new Options().setCreateIfMissing(true);
+ final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
+ try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(false);
+ final RocksIterator baseIter = db.newIterator();
+ final RocksIterator wbwiIter = wbwi.newIteratorWithBase(baseIter)) {
+ assertThat(wbwiIter).isNotNull();
+ assertThat(wbwiIter.nativeHandle_).isGreaterThan(0);
+ wbwiIter.status();
+ }
+
+ try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(false);
+ final RocksIterator baseIter = db.newIterator();
+ final ReadOptions readOptions = new ReadOptions();
+ final RocksIterator wbwiIter = wbwi.newIteratorWithBase(baseIter, readOptions)) {
+ assertThat(wbwiIter).isNotNull();
+ assertThat(wbwiIter.nativeHandle_).isGreaterThan(0);
+ wbwiIter.status();
+ }
+ }
+
+ final List<ColumnFamilyDescriptor> cfNames =
+ Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
+ new ColumnFamilyDescriptor("new_cf".getBytes()));
+ final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
+ try (final DBOptions options =
+ new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
+ final RocksDB db = RocksDB.open(
+ options, dbFolder.getRoot().getAbsolutePath(), cfNames, columnFamilyHandleList)) {
+ try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(false);
+ final RocksIterator baseIter = db.newIterator();
+ final RocksIterator wbwiIter =
+ wbwi.newIteratorWithBase(columnFamilyHandleList.get(1), baseIter)) {
+ assertThat(wbwiIter).isNotNull();
+ assertThat(wbwiIter.nativeHandle_).isGreaterThan(0);
+ wbwiIter.status();
+ }
+
+ try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(false);
+ final RocksIterator baseIter = db.newIterator();
+ final ReadOptions readOptions = new ReadOptions();
+ final RocksIterator wbwiIter =
+ wbwi.newIteratorWithBase(columnFamilyHandleList.get(1), baseIter, readOptions)) {
+ assertThat(wbwiIter).isNotNull();
+ assertThat(wbwiIter.nativeHandle_).isGreaterThan(0);
+ wbwiIter.status();
+ }
+ }
+ }
}