2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.flight
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertNotNull
;
24 import java
.util
.HashMap
;
26 import java
.util
.concurrent
.ExecutorService
;
27 import java
.util
.concurrent
.Executors
;
28 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
29 import java
.util
.function
.Consumer
;
31 import org
.apache
.arrow
.flight
.TestBasicOperation
.Producer
;
32 import org
.apache
.arrow
.flight
.auth
.ServerAuthHandler
;
33 import org
.apache
.arrow
.flight
.impl
.FlightServiceGrpc
;
34 import org
.apache
.arrow
.memory
.BufferAllocator
;
35 import org
.apache
.arrow
.memory
.RootAllocator
;
36 import org
.apache
.arrow
.vector
.IntVector
;
37 import org
.apache
.arrow
.vector
.VectorSchemaRoot
;
38 import org
.junit
.Assert
;
39 import org
.junit
.Assume
;
40 import org
.junit
.Test
;
41 import org
.junit
.runner
.RunWith
;
42 import org
.junit
.runners
.JUnit4
;
44 import io
.grpc
.MethodDescriptor
;
45 import io
.grpc
.ServerServiceDefinition
;
46 import io
.grpc
.netty
.NettyServerBuilder
;
48 @RunWith(JUnit4
.class)
49 public class TestServerOptions
{
52 public void builderConsumer() throws Exception
{
53 final AtomicBoolean consumerCalled
= new AtomicBoolean();
54 final Consumer
<NettyServerBuilder
> consumer
= (builder
) -> consumerCalled
.set(true);
57 BufferAllocator a
= new RootAllocator(Long
.MAX_VALUE
);
58 Producer producer
= new Producer(a
);
60 FlightTestUtil
.getStartedServer(
61 (location
) -> FlightServer
.builder(a
, location
, producer
)
62 .transportHint("grpc.builderConsumer", consumer
).build()
64 Assert
.assertTrue(consumerCalled
.get());
69 * Make sure that if Flight supplies a default executor to gRPC, then it is closed along with the server.
72 public void defaultExecutorClosed() throws Exception
{
73 final ExecutorService executor
;
75 BufferAllocator a
= new RootAllocator(Long
.MAX_VALUE
);
77 FlightTestUtil
.getStartedServer(
78 (location
) -> FlightServer
.builder(a
, location
, new NoOpFlightProducer())
81 assertNotNull(server
.grpcExecutor
);
82 executor
= server
.grpcExecutor
;
84 Assert
.assertTrue(executor
.isShutdown());
88 * Make sure that if the user provides an executor to gRPC, then Flight does not close it.
91 public void suppliedExecutorNotClosed() throws Exception
{
92 final ExecutorService executor
= Executors
.newSingleThreadExecutor();
95 BufferAllocator a
= new RootAllocator(Long
.MAX_VALUE
);
97 FlightTestUtil
.getStartedServer(
98 (location
) -> FlightServer
.builder(a
, location
, new NoOpFlightProducer())
102 Assert
.assertNull(server
.grpcExecutor
);
104 Assert
.assertFalse(executor
.isShutdown());
111 public void domainSocket() throws Exception
{
112 Assume
.assumeTrue("We have a native transport available", FlightTestUtil
.isNativeTransportAvailable());
113 final File domainSocket
= File
.createTempFile("flight-unit-test-", ".sock");
114 Assert
.assertTrue(domainSocket
.delete());
115 // Domain socket paths have a platform-dependent limit. Set a conservative limit and skip the test if the temporary
116 // file name is too long. (We do not assume a particular platform-dependent temporary directory path.)
117 Assume
.assumeTrue("The domain socket path is not too long", domainSocket
.getAbsolutePath().length() < 100);
118 final Location location
= Location
.forGrpcDomainSocket(domainSocket
.getAbsolutePath());
120 BufferAllocator a
= new RootAllocator(Long
.MAX_VALUE
);
121 Producer producer
= new Producer(a
);
123 FlightTestUtil
.getStartedServer(
124 (port
) -> FlightServer
.builder(a
, location
, producer
).build()
126 try (FlightClient c
= FlightClient
.builder(a
, location
).build()) {
127 try (FlightStream stream
= c
.getStream(new Ticket(new byte[0]))) {
128 VectorSchemaRoot root
= stream
.getRoot();
129 IntVector iv
= (IntVector
) root
.getVector("c1");
131 while (stream
.next()) {
132 for (int i
= 0; i
< root
.getRowCount(); i
++) {
133 Assert
.assertEquals(value
, iv
.get(i
));
143 public void checkReflectionMetadata() {
144 // This metadata is needed for gRPC reflection to work.
145 final ExecutorService executorService
= Executors
.newSingleThreadExecutor();
146 try (final BufferAllocator allocator
= new RootAllocator(Integer
.MAX_VALUE
)) {
147 final FlightBindingService service
= new FlightBindingService(allocator
, new NoOpFlightProducer(),
148 ServerAuthHandler
.NO_OP
, executorService
);
149 final ServerServiceDefinition definition
= service
.bindService();
150 assertEquals(FlightServiceGrpc
.getServiceDescriptor().getSchemaDescriptor(),
151 definition
.getServiceDescriptor().getSchemaDescriptor());
153 final Map
<String
, MethodDescriptor
<?
, ?
>> definedMethods
= new HashMap
<>();
154 final Map
<String
, MethodDescriptor
<?
, ?
>> serviceMethods
= new HashMap
<>();
156 // Make sure that the reflection metadata object is identical across all the places where it's accessible
157 definition
.getMethods().forEach(
158 method
-> definedMethods
.put(method
.getMethodDescriptor().getFullMethodName(), method
.getMethodDescriptor()));
159 definition
.getServiceDescriptor().getMethods().forEach(
160 method
-> serviceMethods
.put(method
.getFullMethodName(), method
));
162 for (final MethodDescriptor
<?
, ?
> descriptor
: FlightServiceGrpc
.getServiceDescriptor().getMethods()) {
163 final String methodName
= descriptor
.getFullMethodName();
164 Assert
.assertTrue("Method is missing from ServerServiceDefinition: " + methodName
,
165 definedMethods
.containsKey(methodName
));
166 Assert
.assertTrue("Method is missing from ServiceDescriptor: " + methodName
,
167 definedMethods
.containsKey(methodName
));
169 assertEquals(descriptor
.getSchemaDescriptor(), definedMethods
.get(methodName
).getSchemaDescriptor());
170 assertEquals(descriptor
.getSchemaDescriptor(), serviceMethods
.get(methodName
).getSchemaDescriptor());
173 executorService
.shutdown();