]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.flight; | |
19 | ||
20 | import java.io.IOException; | |
21 | import java.time.Duration; | |
22 | import java.time.Instant; | |
23 | import java.util.Iterator; | |
24 | import java.util.concurrent.TimeUnit; | |
25 | import java.util.function.Consumer; | |
26 | ||
27 | import org.apache.arrow.memory.BufferAllocator; | |
28 | import org.apache.arrow.memory.RootAllocator; | |
29 | import org.junit.Assert; | |
30 | import org.junit.Ignore; | |
31 | import org.junit.Test; | |
32 | ||
33 | import io.grpc.Metadata; | |
34 | ||
35 | public class TestCallOptions { | |
36 | ||
37 | @Test | |
38 | @Ignore | |
39 | public void timeoutFires() { | |
40 | // Ignored due to CI flakiness | |
41 | test((client) -> { | |
42 | Instant start = Instant.now(); | |
43 | Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(1, TimeUnit.SECONDS)); | |
44 | try { | |
45 | results.next(); | |
46 | Assert.fail("Call should have failed"); | |
47 | } catch (RuntimeException e) { | |
48 | Assert.assertTrue(e.getMessage(), e.getMessage().contains("deadline exceeded")); | |
49 | } | |
50 | Instant end = Instant.now(); | |
51 | Assert.assertTrue("Call took over 1500 ms despite timeout", Duration.between(start, end).toMillis() < 1500); | |
52 | }); | |
53 | } | |
54 | ||
55 | @Test | |
56 | @Ignore | |
57 | public void underTimeout() { | |
58 | // Ignored due to CI flakiness | |
59 | test((client) -> { | |
60 | Instant start = Instant.now(); | |
61 | // This shouldn't fail and it should complete within the timeout | |
62 | Iterator<Result> results = client.doAction(new Action("fast"), CallOptions.timeout(2, TimeUnit.SECONDS)); | |
63 | Assert.assertArrayEquals(new byte[]{42, 42}, results.next().getBody()); | |
64 | Instant end = Instant.now(); | |
65 | Assert.assertTrue("Call took over 2500 ms despite timeout", Duration.between(start, end).toMillis() < 2500); | |
66 | }); | |
67 | } | |
68 | ||
69 | @Test | |
70 | public void singleProperty() { | |
71 | final FlightCallHeaders headers = new FlightCallHeaders(); | |
72 | headers.insert("key", "value"); | |
73 | testHeaders(headers); | |
74 | } | |
75 | ||
76 | @Test | |
77 | public void multipleProperties() { | |
78 | final FlightCallHeaders headers = new FlightCallHeaders(); | |
79 | headers.insert("key", "value"); | |
80 | headers.insert("key2", "value2"); | |
81 | testHeaders(headers); | |
82 | } | |
83 | ||
84 | @Test | |
85 | public void binaryProperties() { | |
86 | final FlightCallHeaders headers = new FlightCallHeaders(); | |
87 | headers.insert("key-bin", "value".getBytes()); | |
88 | headers.insert("key3-bin", "ëfßæ".getBytes()); | |
89 | testHeaders(headers); | |
90 | } | |
91 | ||
92 | @Test | |
93 | public void mixedProperties() { | |
94 | final FlightCallHeaders headers = new FlightCallHeaders(); | |
95 | headers.insert("key", "value"); | |
96 | headers.insert("key3-bin", "ëfßæ".getBytes()); | |
97 | testHeaders(headers); | |
98 | } | |
99 | ||
100 | private void testHeaders(CallHeaders headers) { | |
101 | try ( | |
102 | BufferAllocator a = new RootAllocator(Long.MAX_VALUE); | |
103 | HeaderProducer producer = new HeaderProducer(); | |
104 | FlightServer s = | |
105 | FlightTestUtil.getStartedServer((location) -> FlightServer.builder(a, location, producer).build()); | |
106 | FlightClient client = FlightClient.builder(a, s.getLocation()).build()) { | |
107 | client.doAction(new Action(""), new HeaderCallOption(headers)).hasNext(); | |
108 | ||
109 | final CallHeaders incomingHeaders = producer.headers(); | |
110 | for (String key : headers.keys()) { | |
111 | if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { | |
112 | Assert.assertArrayEquals(headers.getByte(key), incomingHeaders.getByte(key)); | |
113 | } else { | |
114 | Assert.assertEquals(headers.get(key), incomingHeaders.get(key)); | |
115 | } | |
116 | } | |
117 | } catch (InterruptedException | IOException e) { | |
118 | throw new RuntimeException(e); | |
119 | } | |
120 | } | |
121 | ||
122 | void test(Consumer<FlightClient> testFn) { | |
123 | try ( | |
124 | BufferAllocator a = new RootAllocator(Long.MAX_VALUE); | |
125 | Producer producer = new Producer(); | |
126 | FlightServer s = | |
127 | FlightTestUtil.getStartedServer((location) -> FlightServer.builder(a, location, producer).build()); | |
128 | FlightClient client = FlightClient.builder(a, s.getLocation()).build()) { | |
129 | testFn.accept(client); | |
130 | } catch (InterruptedException | IOException e) { | |
131 | throw new RuntimeException(e); | |
132 | } | |
133 | } | |
134 | ||
135 | static class HeaderProducer extends NoOpFlightProducer implements AutoCloseable { | |
136 | CallHeaders headers; | |
137 | ||
138 | @Override | |
139 | public void close() { | |
140 | } | |
141 | ||
142 | public CallHeaders headers() { | |
143 | return headers; | |
144 | } | |
145 | ||
146 | @Override | |
147 | public void doAction(CallContext context, Action action, StreamListener<Result> listener) { | |
148 | this.headers = context.getMiddleware(FlightConstants.HEADER_KEY).headers(); | |
149 | listener.onCompleted(); | |
150 | } | |
151 | } | |
152 | ||
153 | static class Producer extends NoOpFlightProducer implements AutoCloseable { | |
154 | ||
155 | Producer() { | |
156 | } | |
157 | ||
158 | @Override | |
159 | public void close() { | |
160 | } | |
161 | ||
162 | @Override | |
163 | public void doAction(CallContext context, Action action, StreamListener<Result> listener) { | |
164 | switch (action.getType()) { | |
165 | case "hang": { | |
166 | try { | |
167 | Thread.sleep(25000); | |
168 | } catch (InterruptedException e) { | |
169 | throw new RuntimeException(e); | |
170 | } | |
171 | listener.onNext(new Result(new byte[]{})); | |
172 | listener.onCompleted(); | |
173 | return; | |
174 | } | |
175 | case "fast": { | |
176 | try { | |
177 | Thread.sleep(500); | |
178 | } catch (InterruptedException e) { | |
179 | throw new RuntimeException(e); | |
180 | } | |
181 | listener.onNext(new Result(new byte[]{42, 42})); | |
182 | listener.onCompleted(); | |
183 | return; | |
184 | } | |
185 | default: { | |
186 | throw new UnsupportedOperationException(action.getType()); | |
187 | } | |
188 | } | |
189 | } | |
190 | } | |
191 | } |