]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestCallOptions.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / flight / flight-core / src / test / java / org / apache / arrow / flight / TestCallOptions.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.flight;
19
20 import java.io.IOException;
21 import java.time.Duration;
22 import java.time.Instant;
23 import java.util.Iterator;
24 import java.util.concurrent.TimeUnit;
25 import java.util.function.Consumer;
26
27 import org.apache.arrow.memory.BufferAllocator;
28 import org.apache.arrow.memory.RootAllocator;
29 import org.junit.Assert;
30 import org.junit.Ignore;
31 import org.junit.Test;
32
33 import io.grpc.Metadata;
34
35 public class TestCallOptions {
36
37 @Test
38 @Ignore
39 public void timeoutFires() {
40 // Ignored due to CI flakiness
41 test((client) -> {
42 Instant start = Instant.now();
43 Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(1, TimeUnit.SECONDS));
44 try {
45 results.next();
46 Assert.fail("Call should have failed");
47 } catch (RuntimeException e) {
48 Assert.assertTrue(e.getMessage(), e.getMessage().contains("deadline exceeded"));
49 }
50 Instant end = Instant.now();
51 Assert.assertTrue("Call took over 1500 ms despite timeout", Duration.between(start, end).toMillis() < 1500);
52 });
53 }
54
55 @Test
56 @Ignore
57 public void underTimeout() {
58 // Ignored due to CI flakiness
59 test((client) -> {
60 Instant start = Instant.now();
61 // This shouldn't fail and it should complete within the timeout
62 Iterator<Result> results = client.doAction(new Action("fast"), CallOptions.timeout(2, TimeUnit.SECONDS));
63 Assert.assertArrayEquals(new byte[]{42, 42}, results.next().getBody());
64 Instant end = Instant.now();
65 Assert.assertTrue("Call took over 2500 ms despite timeout", Duration.between(start, end).toMillis() < 2500);
66 });
67 }
68
69 @Test
70 public void singleProperty() {
71 final FlightCallHeaders headers = new FlightCallHeaders();
72 headers.insert("key", "value");
73 testHeaders(headers);
74 }
75
76 @Test
77 public void multipleProperties() {
78 final FlightCallHeaders headers = new FlightCallHeaders();
79 headers.insert("key", "value");
80 headers.insert("key2", "value2");
81 testHeaders(headers);
82 }
83
84 @Test
85 public void binaryProperties() {
86 final FlightCallHeaders headers = new FlightCallHeaders();
87 headers.insert("key-bin", "value".getBytes());
88 headers.insert("key3-bin", "ëfßæ".getBytes());
89 testHeaders(headers);
90 }
91
92 @Test
93 public void mixedProperties() {
94 final FlightCallHeaders headers = new FlightCallHeaders();
95 headers.insert("key", "value");
96 headers.insert("key3-bin", "ëfßæ".getBytes());
97 testHeaders(headers);
98 }
99
100 private void testHeaders(CallHeaders headers) {
101 try (
102 BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
103 HeaderProducer producer = new HeaderProducer();
104 FlightServer s =
105 FlightTestUtil.getStartedServer((location) -> FlightServer.builder(a, location, producer).build());
106 FlightClient client = FlightClient.builder(a, s.getLocation()).build()) {
107 client.doAction(new Action(""), new HeaderCallOption(headers)).hasNext();
108
109 final CallHeaders incomingHeaders = producer.headers();
110 for (String key : headers.keys()) {
111 if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
112 Assert.assertArrayEquals(headers.getByte(key), incomingHeaders.getByte(key));
113 } else {
114 Assert.assertEquals(headers.get(key), incomingHeaders.get(key));
115 }
116 }
117 } catch (InterruptedException | IOException e) {
118 throw new RuntimeException(e);
119 }
120 }
121
122 void test(Consumer<FlightClient> testFn) {
123 try (
124 BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
125 Producer producer = new Producer();
126 FlightServer s =
127 FlightTestUtil.getStartedServer((location) -> FlightServer.builder(a, location, producer).build());
128 FlightClient client = FlightClient.builder(a, s.getLocation()).build()) {
129 testFn.accept(client);
130 } catch (InterruptedException | IOException e) {
131 throw new RuntimeException(e);
132 }
133 }
134
135 static class HeaderProducer extends NoOpFlightProducer implements AutoCloseable {
136 CallHeaders headers;
137
138 @Override
139 public void close() {
140 }
141
142 public CallHeaders headers() {
143 return headers;
144 }
145
146 @Override
147 public void doAction(CallContext context, Action action, StreamListener<Result> listener) {
148 this.headers = context.getMiddleware(FlightConstants.HEADER_KEY).headers();
149 listener.onCompleted();
150 }
151 }
152
153 static class Producer extends NoOpFlightProducer implements AutoCloseable {
154
155 Producer() {
156 }
157
158 @Override
159 public void close() {
160 }
161
162 @Override
163 public void doAction(CallContext context, Action action, StreamListener<Result> listener) {
164 switch (action.getType()) {
165 case "hang": {
166 try {
167 Thread.sleep(25000);
168 } catch (InterruptedException e) {
169 throw new RuntimeException(e);
170 }
171 listener.onNext(new Result(new byte[]{}));
172 listener.onCompleted();
173 return;
174 }
175 case "fast": {
176 try {
177 Thread.sleep(500);
178 } catch (InterruptedException e) {
179 throw new RuntimeException(e);
180 }
181 listener.onNext(new Result(new byte[]{42, 42}));
182 listener.onCompleted();
183 return;
184 }
185 default: {
186 throw new UnsupportedOperationException(action.getType());
187 }
188 }
189 }
190 }
191 }