]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / flight / flight-core / src / test / java / org / apache / arrow / flight / TestServerOptions.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.flight;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotNull;
22
23 import java.io.File;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.function.Consumer;
30
31 import org.apache.arrow.flight.TestBasicOperation.Producer;
32 import org.apache.arrow.flight.auth.ServerAuthHandler;
33 import org.apache.arrow.flight.impl.FlightServiceGrpc;
34 import org.apache.arrow.memory.BufferAllocator;
35 import org.apache.arrow.memory.RootAllocator;
36 import org.apache.arrow.vector.IntVector;
37 import org.apache.arrow.vector.VectorSchemaRoot;
38 import org.junit.Assert;
39 import org.junit.Assume;
40 import org.junit.Test;
41 import org.junit.runner.RunWith;
42 import org.junit.runners.JUnit4;
43
44 import io.grpc.MethodDescriptor;
45 import io.grpc.ServerServiceDefinition;
46 import io.grpc.netty.NettyServerBuilder;
47
48 @RunWith(JUnit4.class)
49 public class TestServerOptions {
50
51 @Test
52 public void builderConsumer() throws Exception {
53 final AtomicBoolean consumerCalled = new AtomicBoolean();
54 final Consumer<NettyServerBuilder> consumer = (builder) -> consumerCalled.set(true);
55
56 try (
57 BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
58 Producer producer = new Producer(a);
59 FlightServer s =
60 FlightTestUtil.getStartedServer(
61 (location) -> FlightServer.builder(a, location, producer)
62 .transportHint("grpc.builderConsumer", consumer).build()
63 )) {
64 Assert.assertTrue(consumerCalled.get());
65 }
66 }
67
68 /**
69 * Make sure that if Flight supplies a default executor to gRPC, then it is closed along with the server.
70 */
71 @Test
72 public void defaultExecutorClosed() throws Exception {
73 final ExecutorService executor;
74 try (
75 BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
76 FlightServer server =
77 FlightTestUtil.getStartedServer(
78 (location) -> FlightServer.builder(a, location, new NoOpFlightProducer())
79 .build()
80 )) {
81 assertNotNull(server.grpcExecutor);
82 executor = server.grpcExecutor;
83 }
84 Assert.assertTrue(executor.isShutdown());
85 }
86
87 /**
88 * Make sure that if the user provides an executor to gRPC, then Flight does not close it.
89 */
90 @Test
91 public void suppliedExecutorNotClosed() throws Exception {
92 final ExecutorService executor = Executors.newSingleThreadExecutor();
93 try {
94 try (
95 BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
96 FlightServer server =
97 FlightTestUtil.getStartedServer(
98 (location) -> FlightServer.builder(a, location, new NoOpFlightProducer())
99 .executor(executor)
100 .build()
101 )) {
102 Assert.assertNull(server.grpcExecutor);
103 }
104 Assert.assertFalse(executor.isShutdown());
105 } finally {
106 executor.shutdown();
107 }
108 }
109
110 @Test
111 public void domainSocket() throws Exception {
112 Assume.assumeTrue("We have a native transport available", FlightTestUtil.isNativeTransportAvailable());
113 final File domainSocket = File.createTempFile("flight-unit-test-", ".sock");
114 Assert.assertTrue(domainSocket.delete());
115 // Domain socket paths have a platform-dependent limit. Set a conservative limit and skip the test if the temporary
116 // file name is too long. (We do not assume a particular platform-dependent temporary directory path.)
117 Assume.assumeTrue("The domain socket path is not too long", domainSocket.getAbsolutePath().length() < 100);
118 final Location location = Location.forGrpcDomainSocket(domainSocket.getAbsolutePath());
119 try (
120 BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
121 Producer producer = new Producer(a);
122 FlightServer s =
123 FlightTestUtil.getStartedServer(
124 (port) -> FlightServer.builder(a, location, producer).build()
125 )) {
126 try (FlightClient c = FlightClient.builder(a, location).build()) {
127 try (FlightStream stream = c.getStream(new Ticket(new byte[0]))) {
128 VectorSchemaRoot root = stream.getRoot();
129 IntVector iv = (IntVector) root.getVector("c1");
130 int value = 0;
131 while (stream.next()) {
132 for (int i = 0; i < root.getRowCount(); i++) {
133 Assert.assertEquals(value, iv.get(i));
134 value++;
135 }
136 }
137 }
138 }
139 }
140 }
141
142 @Test
143 public void checkReflectionMetadata() {
144 // This metadata is needed for gRPC reflection to work.
145 final ExecutorService executorService = Executors.newSingleThreadExecutor();
146 try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) {
147 final FlightBindingService service = new FlightBindingService(allocator, new NoOpFlightProducer(),
148 ServerAuthHandler.NO_OP, executorService);
149 final ServerServiceDefinition definition = service.bindService();
150 assertEquals(FlightServiceGrpc.getServiceDescriptor().getSchemaDescriptor(),
151 definition.getServiceDescriptor().getSchemaDescriptor());
152
153 final Map<String, MethodDescriptor<?, ?>> definedMethods = new HashMap<>();
154 final Map<String, MethodDescriptor<?, ?>> serviceMethods = new HashMap<>();
155
156 // Make sure that the reflection metadata object is identical across all the places where it's accessible
157 definition.getMethods().forEach(
158 method -> definedMethods.put(method.getMethodDescriptor().getFullMethodName(), method.getMethodDescriptor()));
159 definition.getServiceDescriptor().getMethods().forEach(
160 method -> serviceMethods.put(method.getFullMethodName(), method));
161
162 for (final MethodDescriptor<?, ?> descriptor : FlightServiceGrpc.getServiceDescriptor().getMethods()) {
163 final String methodName = descriptor.getFullMethodName();
164 Assert.assertTrue("Method is missing from ServerServiceDefinition: " + methodName,
165 definedMethods.containsKey(methodName));
166 Assert.assertTrue("Method is missing from ServiceDescriptor: " + methodName,
167 definedMethods.containsKey(methodName));
168
169 assertEquals(descriptor.getSchemaDescriptor(), definedMethods.get(methodName).getSchemaDescriptor());
170 assertEquals(descriptor.getSchemaDescriptor(), serviceMethods.get(methodName).getSchemaDescriptor());
171 }
172 } finally {
173 executorService.shutdown();
174 }
175 }
176 }