2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.flight
;
20 import java
.nio
.charset
.StandardCharsets
;
21 import java
.util
.Collections
;
22 import java
.util
.List
;
24 import org
.apache
.arrow
.flight
.FlightClient
.ClientStreamListener
;
25 import org
.apache
.arrow
.flight
.TestBasicOperation
.Producer
;
26 import org
.apache
.arrow
.memory
.BufferAllocator
;
27 import org
.apache
.arrow
.memory
.RootAllocator
;
28 import org
.apache
.arrow
.vector
.FieldVector
;
29 import org
.apache
.arrow
.vector
.ValueVector
;
30 import org
.apache
.arrow
.vector
.VarCharVector
;
31 import org
.apache
.arrow
.vector
.VectorLoader
;
32 import org
.apache
.arrow
.vector
.VectorSchemaRoot
;
33 import org
.apache
.arrow
.vector
.VectorUnloader
;
34 import org
.apache
.arrow
.vector
.dictionary
.Dictionary
;
35 import org
.apache
.arrow
.vector
.dictionary
.DictionaryEncoder
;
36 import org
.apache
.arrow
.vector
.dictionary
.DictionaryProvider
;
37 import org
.apache
.arrow
.vector
.ipc
.message
.ArrowRecordBatch
;
38 import org
.apache
.arrow
.vector
.types
.pojo
.ArrowType
;
39 import org
.apache
.arrow
.vector
.types
.pojo
.DictionaryEncoding
;
40 import org
.apache
.arrow
.vector
.types
.pojo
.Field
;
41 import org
.apache
.arrow
.vector
.types
.pojo
.FieldType
;
42 import org
.apache
.arrow
.vector
.types
.pojo
.Schema
;
43 import org
.junit
.Assert
;
44 import org
.junit
.Ignore
;
45 import org
.junit
.Test
;
46 import org
.junit
.jupiter
.api
.Assertions
;
48 public class TestFlightClient
{
50 * ARROW-5063: make sure two clients to the same location can be closed independently.
53 public void independentShutdown() throws Exception
{
54 try (final BufferAllocator allocator
= new RootAllocator(Integer
.MAX_VALUE
);
55 final FlightServer server
= FlightTestUtil
.getStartedServer(
56 location
-> FlightServer
.builder(allocator
, location
,
57 new Producer(allocator
)).build())) {
58 final Location location
= Location
.forGrpcInsecure(FlightTestUtil
.LOCALHOST
, server
.getPort());
59 final Schema schema
= new Schema(Collections
.singletonList(Field
.nullable("a", new ArrowType
.Int(32, true))));
60 try (final FlightClient client1
= FlightClient
.builder(allocator
, location
).build();
61 final VectorSchemaRoot root
= VectorSchemaRoot
.create(schema
, allocator
)) {
62 // Use startPut as this ensures the RPC won't finish until we want it to
63 final ClientStreamListener listener
= client1
.startPut(FlightDescriptor
.path("test"), root
,
64 new AsyncPutListener());
65 try (final FlightClient client2
= FlightClient
.builder(allocator
, location
).build()) {
66 client2
.listActions().forEach(actionType
-> Assert
.assertNotNull(actionType
.getType()));
75 * ARROW-5978: make sure that we can properly close a client/stream after requesting dictionaries.
77 @Ignore // Unfortunately this test is flaky in CI.
79 public void freeDictionaries() throws Exception
{
80 final Schema expectedSchema
= new Schema(Collections
81 .singletonList(new Field("encoded",
82 new FieldType(true, new ArrowType
.Int(32, true), new DictionaryEncoding(1L, false, null)), null)));
83 try (final BufferAllocator allocator
= new RootAllocator(Integer
.MAX_VALUE
);
84 final BufferAllocator serverAllocator
= allocator
.newChildAllocator("flight-server", 0, Integer
.MAX_VALUE
);
85 final FlightServer server
= FlightTestUtil
.getStartedServer(
86 location
-> FlightServer
.builder(serverAllocator
, location
,
87 new DictionaryProducer(serverAllocator
)).build())) {
88 final Location location
= Location
.forGrpcInsecure(FlightTestUtil
.LOCALHOST
, server
.getPort());
89 try (final FlightClient client
= FlightClient
.builder(allocator
, location
).build()) {
90 try (final FlightStream stream
= client
.getStream(new Ticket(new byte[0]))) {
91 Assert
.assertTrue(stream
.next());
92 Assert
.assertNotNull(stream
.getDictionaryProvider().lookup(1));
93 final VectorSchemaRoot root
= stream
.getRoot();
94 Assert
.assertEquals(expectedSchema
, root
.getSchema());
95 Assert
.assertEquals(6, root
.getVector("encoded").getValueCount());
96 try (final ValueVector decoded
= DictionaryEncoder
97 .decode(root
.getVector("encoded"), stream
.getDictionaryProvider().lookup(1))) {
98 Assert
.assertFalse(decoded
.isNull(1));
99 Assert
.assertTrue(decoded
instanceof VarCharVector
);
100 Assert
.assertArrayEquals("one".getBytes(StandardCharsets
.UTF_8
), ((VarCharVector
) decoded
).get(1));
102 Assert
.assertFalse(stream
.next());
104 // Closing stream fails if it doesn't free dictionaries; closing dictionaries fails (refcount goes negative)
105 // if reference isn't retained in ArrowMessage
111 * ARROW-5978: make sure that dictionary ownership can't be claimed twice.
113 @Ignore // Unfortunately this test is flaky in CI.
115 public void ownDictionaries() throws Exception
{
116 try (final BufferAllocator allocator
= new RootAllocator(Integer
.MAX_VALUE
);
117 final BufferAllocator serverAllocator
= allocator
.newChildAllocator("flight-server", 0, Integer
.MAX_VALUE
);
118 final FlightServer server
= FlightTestUtil
.getStartedServer(
119 location
-> FlightServer
.builder(serverAllocator
, location
,
120 new DictionaryProducer(serverAllocator
)).build())) {
121 final Location location
= Location
.forGrpcInsecure(FlightTestUtil
.LOCALHOST
, server
.getPort());
122 try (final FlightClient client
= FlightClient
.builder(allocator
, location
).build()) {
123 try (final FlightStream stream
= client
.getStream(new Ticket(new byte[0]))) {
124 Assert
.assertTrue(stream
.next());
125 Assert
.assertFalse(stream
.next());
126 final DictionaryProvider provider
= stream
.takeDictionaryOwnership();
127 Assertions
.assertThrows(IllegalStateException
.class, stream
::takeDictionaryOwnership
);
128 Assertions
.assertThrows(IllegalStateException
.class, stream
::getDictionaryProvider
);
129 DictionaryUtils
.closeDictionaries(stream
.getSchema(), provider
);
136 * ARROW-5978: make sure that dictionaries can be used after closing the stream.
138 @Ignore // Unfortunately this test is flaky in CI.
140 public void useDictionariesAfterClose() throws Exception
{
141 try (final BufferAllocator allocator
= new RootAllocator(Integer
.MAX_VALUE
);
142 final BufferAllocator serverAllocator
= allocator
.newChildAllocator("flight-server", 0, Integer
.MAX_VALUE
);
143 final FlightServer server
= FlightTestUtil
.getStartedServer(
144 location
-> FlightServer
.builder(serverAllocator
, location
, new DictionaryProducer(serverAllocator
))
146 final Location location
= Location
.forGrpcInsecure(FlightTestUtil
.LOCALHOST
, server
.getPort());
147 try (final FlightClient client
= FlightClient
.builder(allocator
, location
).build()) {
148 final VectorSchemaRoot root
;
149 final DictionaryProvider provider
;
150 try (final FlightStream stream
= client
.getStream(new Ticket(new byte[0]))) {
151 final VectorUnloader unloader
= new VectorUnloader(stream
.getRoot());
152 root
= VectorSchemaRoot
.create(stream
.getSchema(), allocator
);
153 final VectorLoader loader
= new VectorLoader(root
);
154 while (stream
.next()) {
155 try (final ArrowRecordBatch arb
= unloader
.getRecordBatch()) {
159 provider
= stream
.takeDictionaryOwnership();
161 try (final ValueVector decoded
= DictionaryEncoder
162 .decode(root
.getVector("encoded"), provider
.lookup(1))) {
163 Assert
.assertFalse(decoded
.isNull(1));
164 Assert
.assertTrue(decoded
instanceof VarCharVector
);
165 Assert
.assertArrayEquals("one".getBytes(StandardCharsets
.UTF_8
), ((VarCharVector
) decoded
).get(1));
168 DictionaryUtils
.closeDictionaries(root
.getSchema(), provider
);
173 static class DictionaryProducer
extends NoOpFlightProducer
{
175 private final BufferAllocator allocator
;
177 public DictionaryProducer(BufferAllocator allocator
) {
178 this.allocator
= allocator
;
182 public void getStream(CallContext context
, Ticket ticket
, ServerStreamListener listener
) {
183 final byte[] zero
= "zero".getBytes(StandardCharsets
.UTF_8
);
184 final byte[] one
= "one".getBytes(StandardCharsets
.UTF_8
);
185 final byte[] two
= "two".getBytes(StandardCharsets
.UTF_8
);
186 try (final VarCharVector dictionaryVector
= newVarCharVector("dictionary", allocator
)) {
187 final DictionaryProvider
.MapDictionaryProvider provider
= new DictionaryProvider
.MapDictionaryProvider();
189 dictionaryVector
.allocateNew(512, 3);
190 dictionaryVector
.setSafe(0, zero
, 0, zero
.length
);
191 dictionaryVector
.setSafe(1, one
, 0, one
.length
);
192 dictionaryVector
.setSafe(2, two
, 0, two
.length
);
193 dictionaryVector
.setValueCount(3);
195 final Dictionary dictionary
= new Dictionary(dictionaryVector
, new DictionaryEncoding(1L, false, null));
196 provider
.put(dictionary
);
198 final FieldVector encodedVector
;
199 try (final VarCharVector unencoded
= newVarCharVector("encoded", allocator
)) {
200 unencoded
.allocateNewSafe();
201 unencoded
.set(1, one
);
202 unencoded
.set(2, two
);
203 unencoded
.set(3, zero
);
204 unencoded
.set(4, two
);
205 unencoded
.setValueCount(6);
206 encodedVector
= (FieldVector
) DictionaryEncoder
.encode(unencoded
, dictionary
);
209 final List
<Field
> fields
= Collections
.singletonList(encodedVector
.getField());
210 final List
<FieldVector
> vectors
= Collections
.singletonList(encodedVector
);
212 try (final VectorSchemaRoot root
= new VectorSchemaRoot(fields
, vectors
, encodedVector
.getValueCount())) {
213 listener
.start(root
, provider
);
215 listener
.completed();
220 private static VarCharVector
newVarCharVector(String name
, BufferAllocator allocator
) {
221 return (VarCharVector
)
222 FieldType
.nullable(new ArrowType
.Utf8()).createNewSingleVector(name
, allocator
, null);