]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/vector/src/test/java/org/apache/arrow/vector/ipc/ITTestIPCWithLargeArrowBuffers.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / vector / src / test / java / org / apache / arrow / vector / ipc / ITTestIPCWithLargeArrowBuffers.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.vector.ipc;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.File;
26 import java.io.FileInputStream;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.util.Arrays;
30 import java.util.Map;
31
32 import org.apache.arrow.memory.RootAllocator;
33 import org.apache.arrow.vector.BigIntVector;
34 import org.apache.arrow.vector.IntVector;
35 import org.apache.arrow.vector.VectorSchemaRoot;
36 import org.apache.arrow.vector.dictionary.Dictionary;
37 import org.apache.arrow.vector.dictionary.DictionaryProvider;
38 import org.apache.arrow.vector.types.pojo.ArrowType;
39 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
40 import org.apache.arrow.vector.types.pojo.Field;
41 import org.apache.arrow.vector.types.pojo.FieldType;
42 import org.apache.arrow.vector.types.pojo.Schema;
43 import org.junit.Test;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48 * Integration test for reading/writing {@link org.apache.arrow.vector.VectorSchemaRoot} with
49 * large (more than 2GB) buffers by {@link ArrowReader} and {@link ArrowWriter}..
50 * To run this test, please make sure there is at least 8GB free memory, and 8GB
51 * free.disk space in the system.
52 */
53 public class ITTestIPCWithLargeArrowBuffers {
54
55 private static final Logger logger = LoggerFactory.getLogger(ITTestIPCWithLargeArrowBuffers.class);
56
57 // 4GB buffer size
58 static final long BUFFER_SIZE = 4 * 1024 * 1024 * 1024L;
59
60 static final int DICTIONARY_VECTOR_SIZE = (int) (BUFFER_SIZE / BigIntVector.TYPE_WIDTH);
61
62 static final int ENCODED_VECTOR_SIZE = (int) (BUFFER_SIZE / IntVector.TYPE_WIDTH);
63
64 static final String FILE_NAME = "largeArrowData.data";
65
66 static final long DICTIONARY_ID = 123L;
67
68 static final ArrowType.Int ENCODED_VECTOR_TYPE = new ArrowType.Int(32, true);
69
70 static final DictionaryEncoding DICTIONARY_ENCODING =
71 new DictionaryEncoding(DICTIONARY_ID, false, ENCODED_VECTOR_TYPE);
72
73 static final FieldType ENCODED_FIELD_TYPE =
74 new FieldType(true, ENCODED_VECTOR_TYPE, DICTIONARY_ENCODING, null);
75
76 static final Field ENCODED_VECTOR_FIELD = new Field("encoded vector", ENCODED_FIELD_TYPE, null);
77
78 private void testWriteLargeArrowData(boolean streamMode) throws IOException {
79 // simulate encoding big int as int
80 try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
81 BigIntVector dictVector = new BigIntVector("dic vector", allocator);
82 FileOutputStream out = new FileOutputStream(FILE_NAME);
83 IntVector encodedVector = (IntVector) ENCODED_VECTOR_FIELD.createVector(allocator)) {
84
85 // prepare dictionary provider.
86 DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
87 Dictionary dictionary = new Dictionary(dictVector, DICTIONARY_ENCODING);
88 provider.put(dictionary);
89
90 // populate the dictionary vector
91 dictVector.allocateNew(DICTIONARY_VECTOR_SIZE);
92 for (int i = 0; i < DICTIONARY_VECTOR_SIZE; i++) {
93 dictVector.set(i, i);
94 }
95 dictVector.setValueCount(DICTIONARY_VECTOR_SIZE);
96 assertTrue(dictVector.getDataBuffer().capacity() > Integer.MAX_VALUE);
97 logger.trace("Populating dictionary vector finished");
98
99 // populate the encoded vector
100 encodedVector.allocateNew(ENCODED_VECTOR_SIZE);
101 for (int i = 0; i < ENCODED_VECTOR_SIZE; i++) {
102 encodedVector.set(i, i % DICTIONARY_VECTOR_SIZE);
103 }
104 encodedVector.setValueCount(ENCODED_VECTOR_SIZE);
105 assertTrue(encodedVector.getDataBuffer().capacity() > Integer.MAX_VALUE);
106 logger.trace("Populating encoded vector finished");
107
108 // build vector schema root and write data.
109 try (VectorSchemaRoot root =
110 new VectorSchemaRoot(
111 Arrays.asList(ENCODED_VECTOR_FIELD), Arrays.asList(encodedVector), ENCODED_VECTOR_SIZE);
112 ArrowWriter writer = streamMode ?
113 new ArrowStreamWriter(root, provider, out) :
114 new ArrowFileWriter(root, provider, out.getChannel())) {
115 writer.start();
116 writer.writeBatch();
117 writer.end();
118 logger.trace("Writing data finished");
119 }
120 }
121
122 assertTrue(new File(FILE_NAME).exists());
123 }
124
125 private void testReadLargeArrowData(boolean streamMode) throws IOException {
126 try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
127 FileInputStream in = new FileInputStream(FILE_NAME);
128 ArrowReader reader = streamMode ?
129 new ArrowStreamReader(in, allocator) :
130 new ArrowFileReader(in.getChannel(), allocator)) {
131
132 // verify schema
133 Schema readSchema = reader.getVectorSchemaRoot().getSchema();
134 assertEquals(1, readSchema.getFields().size());
135 assertEquals(ENCODED_VECTOR_FIELD, readSchema.getFields().get(0));
136 logger.trace("Verifying schema finished");
137
138 // verify vector schema root
139 assertTrue(reader.loadNextBatch());
140 VectorSchemaRoot root = reader.getVectorSchemaRoot();
141
142 assertEquals(ENCODED_VECTOR_SIZE, root.getRowCount());
143 assertEquals(1, root.getFieldVectors().size());
144 assertTrue(root.getFieldVectors().get(0) instanceof IntVector);
145
146 IntVector encodedVector = (IntVector) root.getVector(0);
147 for (int i = 0; i < ENCODED_VECTOR_SIZE; i++) {
148 assertEquals(i % DICTIONARY_VECTOR_SIZE, encodedVector.get(i));
149 }
150 logger.trace("Verifying encoded vector finished");
151
152 // verify dictionary
153 Map<Long, Dictionary> dictVectors = reader.getDictionaryVectors();
154 assertEquals(1, dictVectors.size());
155 Dictionary dictionary = dictVectors.get(DICTIONARY_ID);
156 assertNotNull(dictionary);
157
158 assertTrue(dictionary.getVector() instanceof BigIntVector);
159 BigIntVector dictVector = (BigIntVector) dictionary.getVector();
160 assertEquals(DICTIONARY_VECTOR_SIZE, dictVector.getValueCount());
161 for (int i = 0; i < DICTIONARY_VECTOR_SIZE; i++) {
162 assertEquals(i, dictVector.get(i));
163 }
164 logger.trace("Verifying dictionary vector finished");
165
166 // ensure no more data available
167 assertFalse(reader.loadNextBatch());
168 } finally {
169 File dataFile = new File(FILE_NAME);
170 dataFile.delete();
171 assertFalse(dataFile.exists());
172 }
173 }
174
175 @Test
176 public void testIPC() throws IOException {
177 logger.trace("Start testing reading/writing large arrow stream data");
178 testWriteLargeArrowData(true);
179 testReadLargeArrowData(true);
180 logger.trace("Finish testing reading/writing large arrow stream data");
181
182 logger.trace("Start testing reading/writing large arrow file data");
183 testWriteLargeArrowData(false);
184 testReadLargeArrowData(false);
185 logger.trace("Finish testing reading/writing large arrow file data");
186 }
187 }