2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.vector
.ipc
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertNotNull
;
23 import static org
.junit
.Assert
.assertTrue
;
26 import java
.io
.FileInputStream
;
27 import java
.io
.FileOutputStream
;
28 import java
.io
.IOException
;
29 import java
.util
.Arrays
;
32 import org
.apache
.arrow
.memory
.RootAllocator
;
33 import org
.apache
.arrow
.vector
.BigIntVector
;
34 import org
.apache
.arrow
.vector
.IntVector
;
35 import org
.apache
.arrow
.vector
.VectorSchemaRoot
;
36 import org
.apache
.arrow
.vector
.dictionary
.Dictionary
;
37 import org
.apache
.arrow
.vector
.dictionary
.DictionaryProvider
;
38 import org
.apache
.arrow
.vector
.types
.pojo
.ArrowType
;
39 import org
.apache
.arrow
.vector
.types
.pojo
.DictionaryEncoding
;
40 import org
.apache
.arrow
.vector
.types
.pojo
.Field
;
41 import org
.apache
.arrow
.vector
.types
.pojo
.FieldType
;
42 import org
.apache
.arrow
.vector
.types
.pojo
.Schema
;
43 import org
.junit
.Test
;
44 import org
.slf4j
.Logger
;
45 import org
.slf4j
.LoggerFactory
;
48 * Integration test for reading/writing {@link org.apache.arrow.vector.VectorSchemaRoot} with
49 * large (more than 2GB) buffers by {@link ArrowReader} and {@link ArrowWriter}..
50 * To run this test, please make sure there is at least 8GB free memory, and 8GB
51 * free.disk space in the system.
53 public class ITTestIPCWithLargeArrowBuffers
{
55 private static final Logger logger
= LoggerFactory
.getLogger(ITTestIPCWithLargeArrowBuffers
.class);
58 static final long BUFFER_SIZE
= 4 * 1024 * 1024 * 1024L;
60 static final int DICTIONARY_VECTOR_SIZE
= (int) (BUFFER_SIZE
/ BigIntVector
.TYPE_WIDTH
);
62 static final int ENCODED_VECTOR_SIZE
= (int) (BUFFER_SIZE
/ IntVector
.TYPE_WIDTH
);
64 static final String FILE_NAME
= "largeArrowData.data";
66 static final long DICTIONARY_ID
= 123L;
68 static final ArrowType
.Int ENCODED_VECTOR_TYPE
= new ArrowType
.Int(32, true);
70 static final DictionaryEncoding DICTIONARY_ENCODING
=
71 new DictionaryEncoding(DICTIONARY_ID
, false, ENCODED_VECTOR_TYPE
);
73 static final FieldType ENCODED_FIELD_TYPE
=
74 new FieldType(true, ENCODED_VECTOR_TYPE
, DICTIONARY_ENCODING
, null);
76 static final Field ENCODED_VECTOR_FIELD
= new Field("encoded vector", ENCODED_FIELD_TYPE
, null);
78 private void testWriteLargeArrowData(boolean streamMode
) throws IOException
{
79 // simulate encoding big int as int
80 try (RootAllocator allocator
= new RootAllocator(Long
.MAX_VALUE
);
81 BigIntVector dictVector
= new BigIntVector("dic vector", allocator
);
82 FileOutputStream out
= new FileOutputStream(FILE_NAME
);
83 IntVector encodedVector
= (IntVector
) ENCODED_VECTOR_FIELD
.createVector(allocator
)) {
85 // prepare dictionary provider.
86 DictionaryProvider
.MapDictionaryProvider provider
= new DictionaryProvider
.MapDictionaryProvider();
87 Dictionary dictionary
= new Dictionary(dictVector
, DICTIONARY_ENCODING
);
88 provider
.put(dictionary
);
90 // populate the dictionary vector
91 dictVector
.allocateNew(DICTIONARY_VECTOR_SIZE
);
92 for (int i
= 0; i
< DICTIONARY_VECTOR_SIZE
; i
++) {
95 dictVector
.setValueCount(DICTIONARY_VECTOR_SIZE
);
96 assertTrue(dictVector
.getDataBuffer().capacity() > Integer
.MAX_VALUE
);
97 logger
.trace("Populating dictionary vector finished");
99 // populate the encoded vector
100 encodedVector
.allocateNew(ENCODED_VECTOR_SIZE
);
101 for (int i
= 0; i
< ENCODED_VECTOR_SIZE
; i
++) {
102 encodedVector
.set(i
, i
% DICTIONARY_VECTOR_SIZE
);
104 encodedVector
.setValueCount(ENCODED_VECTOR_SIZE
);
105 assertTrue(encodedVector
.getDataBuffer().capacity() > Integer
.MAX_VALUE
);
106 logger
.trace("Populating encoded vector finished");
108 // build vector schema root and write data.
109 try (VectorSchemaRoot root
=
110 new VectorSchemaRoot(
111 Arrays
.asList(ENCODED_VECTOR_FIELD
), Arrays
.asList(encodedVector
), ENCODED_VECTOR_SIZE
);
112 ArrowWriter writer
= streamMode ?
113 new ArrowStreamWriter(root
, provider
, out
) :
114 new ArrowFileWriter(root
, provider
, out
.getChannel())) {
118 logger
.trace("Writing data finished");
122 assertTrue(new File(FILE_NAME
).exists());
125 private void testReadLargeArrowData(boolean streamMode
) throws IOException
{
126 try (RootAllocator allocator
= new RootAllocator(Long
.MAX_VALUE
);
127 FileInputStream in
= new FileInputStream(FILE_NAME
);
128 ArrowReader reader
= streamMode ?
129 new ArrowStreamReader(in
, allocator
) :
130 new ArrowFileReader(in
.getChannel(), allocator
)) {
133 Schema readSchema
= reader
.getVectorSchemaRoot().getSchema();
134 assertEquals(1, readSchema
.getFields().size());
135 assertEquals(ENCODED_VECTOR_FIELD
, readSchema
.getFields().get(0));
136 logger
.trace("Verifying schema finished");
138 // verify vector schema root
139 assertTrue(reader
.loadNextBatch());
140 VectorSchemaRoot root
= reader
.getVectorSchemaRoot();
142 assertEquals(ENCODED_VECTOR_SIZE
, root
.getRowCount());
143 assertEquals(1, root
.getFieldVectors().size());
144 assertTrue(root
.getFieldVectors().get(0) instanceof IntVector
);
146 IntVector encodedVector
= (IntVector
) root
.getVector(0);
147 for (int i
= 0; i
< ENCODED_VECTOR_SIZE
; i
++) {
148 assertEquals(i
% DICTIONARY_VECTOR_SIZE
, encodedVector
.get(i
));
150 logger
.trace("Verifying encoded vector finished");
153 Map
<Long
, Dictionary
> dictVectors
= reader
.getDictionaryVectors();
154 assertEquals(1, dictVectors
.size());
155 Dictionary dictionary
= dictVectors
.get(DICTIONARY_ID
);
156 assertNotNull(dictionary
);
158 assertTrue(dictionary
.getVector() instanceof BigIntVector
);
159 BigIntVector dictVector
= (BigIntVector
) dictionary
.getVector();
160 assertEquals(DICTIONARY_VECTOR_SIZE
, dictVector
.getValueCount());
161 for (int i
= 0; i
< DICTIONARY_VECTOR_SIZE
; i
++) {
162 assertEquals(i
, dictVector
.get(i
));
164 logger
.trace("Verifying dictionary vector finished");
166 // ensure no more data available
167 assertFalse(reader
.loadNextBatch());
169 File dataFile
= new File(FILE_NAME
);
171 assertFalse(dataFile
.exists());
176 public void testIPC() throws IOException
{
177 logger
.trace("Start testing reading/writing large arrow stream data");
178 testWriteLargeArrowData(true);
179 testReadLargeArrowData(true);
180 logger
.trace("Finish testing reading/writing large arrow stream data");
182 logger
.trace("Start testing reading/writing large arrow file data");
183 testWriteLargeArrowData(false);
184 testReadLargeArrowData(false);
185 logger
.trace("Finish testing reading/writing large arrow file data");