]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightClient.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / flight / flight-core / src / test / java / org / apache / arrow / flight / TestFlightClient.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.flight;
19
20 import java.nio.charset.StandardCharsets;
21 import java.util.Collections;
22 import java.util.List;
23
24 import org.apache.arrow.flight.FlightClient.ClientStreamListener;
25 import org.apache.arrow.flight.TestBasicOperation.Producer;
26 import org.apache.arrow.memory.BufferAllocator;
27 import org.apache.arrow.memory.RootAllocator;
28 import org.apache.arrow.vector.FieldVector;
29 import org.apache.arrow.vector.ValueVector;
30 import org.apache.arrow.vector.VarCharVector;
31 import org.apache.arrow.vector.VectorLoader;
32 import org.apache.arrow.vector.VectorSchemaRoot;
33 import org.apache.arrow.vector.VectorUnloader;
34 import org.apache.arrow.vector.dictionary.Dictionary;
35 import org.apache.arrow.vector.dictionary.DictionaryEncoder;
36 import org.apache.arrow.vector.dictionary.DictionaryProvider;
37 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
38 import org.apache.arrow.vector.types.pojo.ArrowType;
39 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
40 import org.apache.arrow.vector.types.pojo.Field;
41 import org.apache.arrow.vector.types.pojo.FieldType;
42 import org.apache.arrow.vector.types.pojo.Schema;
43 import org.junit.Assert;
44 import org.junit.Ignore;
45 import org.junit.Test;
46 import org.junit.jupiter.api.Assertions;
47
48 public class TestFlightClient {
49 /**
50 * ARROW-5063: make sure two clients to the same location can be closed independently.
51 */
52 @Test
53 public void independentShutdown() throws Exception {
54 try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
55 final FlightServer server = FlightTestUtil.getStartedServer(
56 location -> FlightServer.builder(allocator, location,
57 new Producer(allocator)).build())) {
58 final Location location = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
59 final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
60 try (final FlightClient client1 = FlightClient.builder(allocator, location).build();
61 final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
62 // Use startPut as this ensures the RPC won't finish until we want it to
63 final ClientStreamListener listener = client1.startPut(FlightDescriptor.path("test"), root,
64 new AsyncPutListener());
65 try (final FlightClient client2 = FlightClient.builder(allocator, location).build()) {
66 client2.listActions().forEach(actionType -> Assert.assertNotNull(actionType.getType()));
67 }
68 listener.completed();
69 listener.getResult();
70 }
71 }
72 }
73
74 /**
75 * ARROW-5978: make sure that we can properly close a client/stream after requesting dictionaries.
76 */
77 @Ignore // Unfortunately this test is flaky in CI.
78 @Test
79 public void freeDictionaries() throws Exception {
80 final Schema expectedSchema = new Schema(Collections
81 .singletonList(new Field("encoded",
82 new FieldType(true, new ArrowType.Int(32, true), new DictionaryEncoding(1L, false, null)), null)));
83 try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
84 final BufferAllocator serverAllocator = allocator.newChildAllocator("flight-server", 0, Integer.MAX_VALUE);
85 final FlightServer server = FlightTestUtil.getStartedServer(
86 location -> FlightServer.builder(serverAllocator, location,
87 new DictionaryProducer(serverAllocator)).build())) {
88 final Location location = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
89 try (final FlightClient client = FlightClient.builder(allocator, location).build()) {
90 try (final FlightStream stream = client.getStream(new Ticket(new byte[0]))) {
91 Assert.assertTrue(stream.next());
92 Assert.assertNotNull(stream.getDictionaryProvider().lookup(1));
93 final VectorSchemaRoot root = stream.getRoot();
94 Assert.assertEquals(expectedSchema, root.getSchema());
95 Assert.assertEquals(6, root.getVector("encoded").getValueCount());
96 try (final ValueVector decoded = DictionaryEncoder
97 .decode(root.getVector("encoded"), stream.getDictionaryProvider().lookup(1))) {
98 Assert.assertFalse(decoded.isNull(1));
99 Assert.assertTrue(decoded instanceof VarCharVector);
100 Assert.assertArrayEquals("one".getBytes(StandardCharsets.UTF_8), ((VarCharVector) decoded).get(1));
101 }
102 Assert.assertFalse(stream.next());
103 }
104 // Closing stream fails if it doesn't free dictionaries; closing dictionaries fails (refcount goes negative)
105 // if reference isn't retained in ArrowMessage
106 }
107 }
108 }
109
110 /**
111 * ARROW-5978: make sure that dictionary ownership can't be claimed twice.
112 */
113 @Ignore // Unfortunately this test is flaky in CI.
114 @Test
115 public void ownDictionaries() throws Exception {
116 try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
117 final BufferAllocator serverAllocator = allocator.newChildAllocator("flight-server", 0, Integer.MAX_VALUE);
118 final FlightServer server = FlightTestUtil.getStartedServer(
119 location -> FlightServer.builder(serverAllocator, location,
120 new DictionaryProducer(serverAllocator)).build())) {
121 final Location location = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
122 try (final FlightClient client = FlightClient.builder(allocator, location).build()) {
123 try (final FlightStream stream = client.getStream(new Ticket(new byte[0]))) {
124 Assert.assertTrue(stream.next());
125 Assert.assertFalse(stream.next());
126 final DictionaryProvider provider = stream.takeDictionaryOwnership();
127 Assertions.assertThrows(IllegalStateException.class, stream::takeDictionaryOwnership);
128 Assertions.assertThrows(IllegalStateException.class, stream::getDictionaryProvider);
129 DictionaryUtils.closeDictionaries(stream.getSchema(), provider);
130 }
131 }
132 }
133 }
134
135 /**
136 * ARROW-5978: make sure that dictionaries can be used after closing the stream.
137 */
138 @Ignore // Unfortunately this test is flaky in CI.
139 @Test
140 public void useDictionariesAfterClose() throws Exception {
141 try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
142 final BufferAllocator serverAllocator = allocator.newChildAllocator("flight-server", 0, Integer.MAX_VALUE);
143 final FlightServer server = FlightTestUtil.getStartedServer(
144 location -> FlightServer.builder(serverAllocator, location, new DictionaryProducer(serverAllocator))
145 .build())) {
146 final Location location = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
147 try (final FlightClient client = FlightClient.builder(allocator, location).build()) {
148 final VectorSchemaRoot root;
149 final DictionaryProvider provider;
150 try (final FlightStream stream = client.getStream(new Ticket(new byte[0]))) {
151 final VectorUnloader unloader = new VectorUnloader(stream.getRoot());
152 root = VectorSchemaRoot.create(stream.getSchema(), allocator);
153 final VectorLoader loader = new VectorLoader(root);
154 while (stream.next()) {
155 try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
156 loader.load(arb);
157 }
158 }
159 provider = stream.takeDictionaryOwnership();
160 }
161 try (final ValueVector decoded = DictionaryEncoder
162 .decode(root.getVector("encoded"), provider.lookup(1))) {
163 Assert.assertFalse(decoded.isNull(1));
164 Assert.assertTrue(decoded instanceof VarCharVector);
165 Assert.assertArrayEquals("one".getBytes(StandardCharsets.UTF_8), ((VarCharVector) decoded).get(1));
166 }
167 root.close();
168 DictionaryUtils.closeDictionaries(root.getSchema(), provider);
169 }
170 }
171 }
172
173 static class DictionaryProducer extends NoOpFlightProducer {
174
175 private final BufferAllocator allocator;
176
177 public DictionaryProducer(BufferAllocator allocator) {
178 this.allocator = allocator;
179 }
180
181 @Override
182 public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
183 final byte[] zero = "zero".getBytes(StandardCharsets.UTF_8);
184 final byte[] one = "one".getBytes(StandardCharsets.UTF_8);
185 final byte[] two = "two".getBytes(StandardCharsets.UTF_8);
186 try (final VarCharVector dictionaryVector = newVarCharVector("dictionary", allocator)) {
187 final DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
188
189 dictionaryVector.allocateNew(512, 3);
190 dictionaryVector.setSafe(0, zero, 0, zero.length);
191 dictionaryVector.setSafe(1, one, 0, one.length);
192 dictionaryVector.setSafe(2, two, 0, two.length);
193 dictionaryVector.setValueCount(3);
194
195 final Dictionary dictionary = new Dictionary(dictionaryVector, new DictionaryEncoding(1L, false, null));
196 provider.put(dictionary);
197
198 final FieldVector encodedVector;
199 try (final VarCharVector unencoded = newVarCharVector("encoded", allocator)) {
200 unencoded.allocateNewSafe();
201 unencoded.set(1, one);
202 unencoded.set(2, two);
203 unencoded.set(3, zero);
204 unencoded.set(4, two);
205 unencoded.setValueCount(6);
206 encodedVector = (FieldVector) DictionaryEncoder.encode(unencoded, dictionary);
207 }
208
209 final List<Field> fields = Collections.singletonList(encodedVector.getField());
210 final List<FieldVector> vectors = Collections.singletonList(encodedVector);
211
212 try (final VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, encodedVector.getValueCount())) {
213 listener.start(root, provider);
214 listener.putNext();
215 listener.completed();
216 }
217 }
218 }
219
220 private static VarCharVector newVarCharVector(String name, BufferAllocator allocator) {
221 return (VarCharVector)
222 FieldType.nullable(new ArrowType.Utf8()).createNewSingleVector(name, allocator, null);
223 }
224 }
225 }