]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / vector / src / main / java / org / apache / arrow / vector / VectorLoader.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow.vector;
19
20import static org.apache.arrow.util.Preconditions.checkArgument;
21
22import java.util.ArrayList;
23import java.util.Iterator;
24import java.util.List;
25
26import org.apache.arrow.memory.ArrowBuf;
27import org.apache.arrow.util.Collections2;
28import org.apache.arrow.vector.compression.CompressionCodec;
29import org.apache.arrow.vector.compression.CompressionUtil;
30import org.apache.arrow.vector.compression.NoCompressionCodec;
31import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
32import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
33import org.apache.arrow.vector.types.pojo.Field;
34
35/**
36 * Loads buffers into vectors.
37 */
38public class VectorLoader {
39
40 private final VectorSchemaRoot root;
41
42 private final CompressionCodec.Factory factory;
43
44 /**
45 * A flag indicating if decompression is needed.
46 * This will affect the behavior of releasing buffers.
47 */
48 private boolean decompressionNeeded;
49
50 /**
51 * Construct with a root to load and will create children in root based on schema.
52 *
53 * @param root the root to add vectors to based on schema
54 */
55 public VectorLoader(VectorSchemaRoot root) {
56 this(root, NoCompressionCodec.Factory.INSTANCE);
57 }
58
59 /**
60 * Construct with a root to load and will create children in root based on schema.
61 *
62 * @param root the root to add vectors to based on schema.
63 * @param factory the factory to create codec.
64 */
65 public VectorLoader(VectorSchemaRoot root, CompressionCodec.Factory factory) {
66 this.root = root;
67 this.factory = factory;
68 }
69
70 /**
71 * Loads the record batch in the vectors.
72 * will not close the record batch
73 *
74 * @param recordBatch the batch to load
75 */
76 public void load(ArrowRecordBatch recordBatch) {
77 Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
78 Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
79 CompressionUtil.CodecType codecType =
80 CompressionUtil.CodecType.fromCompressionType(recordBatch.getBodyCompression().getCodec());
81 decompressionNeeded = codecType != CompressionUtil.CodecType.NO_COMPRESSION;
82 CompressionCodec codec = decompressionNeeded ? factory.createCodec(codecType) : NoCompressionCodec.INSTANCE;
83 for (FieldVector fieldVector : root.getFieldVectors()) {
84 loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec);
85 }
86 root.setRowCount(recordBatch.getLength());
87 if (nodes.hasNext() || buffers.hasNext()) {
88 throw new IllegalArgumentException("not all nodes and buffers were consumed. nodes: " +
89 Collections2.toString(nodes) + " buffers: " + Collections2.toString(buffers));
90 }
91 }
92
93 private void loadBuffers(
94 FieldVector vector,
95 Field field,
96 Iterator<ArrowBuf> buffers,
97 Iterator<ArrowFieldNode> nodes,
98 CompressionCodec codec) {
99 checkArgument(nodes.hasNext(), "no more field nodes for for field %s and vector %s", field, vector);
100 ArrowFieldNode fieldNode = nodes.next();
101 int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType());
102 List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
103 for (int j = 0; j < bufferLayoutCount; j++) {
104 ArrowBuf nextBuf = buffers.next();
105 // for vectors without nulls, the buffer is empty, so there is no need to decompress it.
106 ArrowBuf bufferToAdd = nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf;
107 ownBuffers.add(bufferToAdd);
108 if (decompressionNeeded) {
109 // decompression performed
110 nextBuf.getReferenceManager().retain();
111 }
112 }
113 try {
114 vector.loadFieldBuffers(fieldNode, ownBuffers);
115 if (decompressionNeeded) {
116 for (ArrowBuf buf : ownBuffers) {
117 buf.close();
118 }
119 }
120 } catch (RuntimeException e) {
121 throw new IllegalArgumentException("Could not load buffers for field " +
122 field + ". error message: " + e.getMessage(), e);
123 }
124 List<Field> children = field.getChildren();
125 if (children.size() > 0) {
126 List<FieldVector> childrenFromFields = vector.getChildrenFromFields();
127 checkArgument(children.size() == childrenFromFields.size(),
128 "should have as many children as in the schema: found %s expected %s",
129 childrenFromFields.size(), children.size());
130 for (int i = 0; i < childrenFromFields.size(); i++) {
131 Field child = children.get(i);
132 FieldVector fieldVector = childrenFromFields.get(i);
133 loadBuffers(fieldVector, child, buffers, nodes, codec);
134 }
135 }
136 }
137}