]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.vector; | |
19 | ||
20 | import static org.apache.arrow.util.Preconditions.checkArgument; | |
21 | ||
22 | import java.util.ArrayList; | |
23 | import java.util.Iterator; | |
24 | import java.util.List; | |
25 | ||
26 | import org.apache.arrow.memory.ArrowBuf; | |
27 | import org.apache.arrow.util.Collections2; | |
28 | import org.apache.arrow.vector.compression.CompressionCodec; | |
29 | import org.apache.arrow.vector.compression.CompressionUtil; | |
30 | import org.apache.arrow.vector.compression.NoCompressionCodec; | |
31 | import org.apache.arrow.vector.ipc.message.ArrowFieldNode; | |
32 | import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; | |
33 | import org.apache.arrow.vector.types.pojo.Field; | |
34 | ||
35 | /** | |
36 | * Loads buffers into vectors. | |
37 | */ | |
38 | public class VectorLoader { | |
39 | ||
40 | private final VectorSchemaRoot root; | |
41 | ||
42 | private final CompressionCodec.Factory factory; | |
43 | ||
44 | /** | |
45 | * A flag indicating if decompression is needed. | |
46 | * This will affect the behavior of releasing buffers. | |
47 | */ | |
48 | private boolean decompressionNeeded; | |
49 | ||
50 | /** | |
51 | * Construct with a root to load and will create children in root based on schema. | |
52 | * | |
53 | * @param root the root to add vectors to based on schema | |
54 | */ | |
55 | public VectorLoader(VectorSchemaRoot root) { | |
56 | this(root, NoCompressionCodec.Factory.INSTANCE); | |
57 | } | |
58 | ||
59 | /** | |
60 | * Construct with a root to load and will create children in root based on schema. | |
61 | * | |
62 | * @param root the root to add vectors to based on schema. | |
63 | * @param factory the factory to create codec. | |
64 | */ | |
65 | public VectorLoader(VectorSchemaRoot root, CompressionCodec.Factory factory) { | |
66 | this.root = root; | |
67 | this.factory = factory; | |
68 | } | |
69 | ||
70 | /** | |
71 | * Loads the record batch in the vectors. | |
72 | * will not close the record batch | |
73 | * | |
74 | * @param recordBatch the batch to load | |
75 | */ | |
76 | public void load(ArrowRecordBatch recordBatch) { | |
77 | Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator(); | |
78 | Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator(); | |
79 | CompressionUtil.CodecType codecType = | |
80 | CompressionUtil.CodecType.fromCompressionType(recordBatch.getBodyCompression().getCodec()); | |
81 | decompressionNeeded = codecType != CompressionUtil.CodecType.NO_COMPRESSION; | |
82 | CompressionCodec codec = decompressionNeeded ? factory.createCodec(codecType) : NoCompressionCodec.INSTANCE; | |
83 | for (FieldVector fieldVector : root.getFieldVectors()) { | |
84 | loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); | |
85 | } | |
86 | root.setRowCount(recordBatch.getLength()); | |
87 | if (nodes.hasNext() || buffers.hasNext()) { | |
88 | throw new IllegalArgumentException("not all nodes and buffers were consumed. nodes: " + | |
89 | Collections2.toString(nodes) + " buffers: " + Collections2.toString(buffers)); | |
90 | } | |
91 | } | |
92 | ||
93 | private void loadBuffers( | |
94 | FieldVector vector, | |
95 | Field field, | |
96 | Iterator<ArrowBuf> buffers, | |
97 | Iterator<ArrowFieldNode> nodes, | |
98 | CompressionCodec codec) { | |
99 | checkArgument(nodes.hasNext(), "no more field nodes for for field %s and vector %s", field, vector); | |
100 | ArrowFieldNode fieldNode = nodes.next(); | |
101 | int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType()); | |
102 | List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount); | |
103 | for (int j = 0; j < bufferLayoutCount; j++) { | |
104 | ArrowBuf nextBuf = buffers.next(); | |
105 | // for vectors without nulls, the buffer is empty, so there is no need to decompress it. | |
106 | ArrowBuf bufferToAdd = nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf; | |
107 | ownBuffers.add(bufferToAdd); | |
108 | if (decompressionNeeded) { | |
109 | // decompression performed | |
110 | nextBuf.getReferenceManager().retain(); | |
111 | } | |
112 | } | |
113 | try { | |
114 | vector.loadFieldBuffers(fieldNode, ownBuffers); | |
115 | if (decompressionNeeded) { | |
116 | for (ArrowBuf buf : ownBuffers) { | |
117 | buf.close(); | |
118 | } | |
119 | } | |
120 | } catch (RuntimeException e) { | |
121 | throw new IllegalArgumentException("Could not load buffers for field " + | |
122 | field + ". error message: " + e.getMessage(), e); | |
123 | } | |
124 | List<Field> children = field.getChildren(); | |
125 | if (children.size() > 0) { | |
126 | List<FieldVector> childrenFromFields = vector.getChildrenFromFields(); | |
127 | checkArgument(children.size() == childrenFromFields.size(), | |
128 | "should have as many children as in the schema: found %s expected %s", | |
129 | childrenFromFields.size(), children.size()); | |
130 | for (int i = 0; i < childrenFromFields.size(); i++) { | |
131 | Field child = children.get(i); | |
132 | FieldVector fieldVector = childrenFromFields.get(i); | |
133 | loadBuffers(fieldVector, child, buffers, nodes, codec); | |
134 | } | |
135 | } | |
136 | } | |
137 | } |