2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.flight
;
20 import java
.io
.IOException
;
21 import java
.time
.Duration
;
22 import java
.time
.Instant
;
23 import java
.util
.Iterator
;
24 import java
.util
.concurrent
.TimeUnit
;
25 import java
.util
.function
.Consumer
;
27 import org
.apache
.arrow
.memory
.BufferAllocator
;
28 import org
.apache
.arrow
.memory
.RootAllocator
;
29 import org
.junit
.Assert
;
30 import org
.junit
.Ignore
;
31 import org
.junit
.Test
;
33 import io
.grpc
.Metadata
;
35 public class TestCallOptions
{
39 public void timeoutFires() {
40 // Ignored due to CI flakiness
42 Instant start
= Instant
.now();
43 Iterator
<Result
> results
= client
.doAction(new Action("hang"), CallOptions
.timeout(1, TimeUnit
.SECONDS
));
46 Assert
.fail("Call should have failed");
47 } catch (RuntimeException e
) {
48 Assert
.assertTrue(e
.getMessage(), e
.getMessage().contains("deadline exceeded"));
50 Instant end
= Instant
.now();
51 Assert
.assertTrue("Call took over 1500 ms despite timeout", Duration
.between(start
, end
).toMillis() < 1500);
57 public void underTimeout() {
58 // Ignored due to CI flakiness
60 Instant start
= Instant
.now();
61 // This shouldn't fail and it should complete within the timeout
62 Iterator
<Result
> results
= client
.doAction(new Action("fast"), CallOptions
.timeout(2, TimeUnit
.SECONDS
));
63 Assert
.assertArrayEquals(new byte[]{42, 42}, results
.next().getBody());
64 Instant end
= Instant
.now();
65 Assert
.assertTrue("Call took over 2500 ms despite timeout", Duration
.between(start
, end
).toMillis() < 2500);
70 public void singleProperty() {
71 final FlightCallHeaders headers
= new FlightCallHeaders();
72 headers
.insert("key", "value");
77 public void multipleProperties() {
78 final FlightCallHeaders headers
= new FlightCallHeaders();
79 headers
.insert("key", "value");
80 headers
.insert("key2", "value2");
85 public void binaryProperties() {
86 final FlightCallHeaders headers
= new FlightCallHeaders();
87 headers
.insert("key-bin", "value".getBytes());
88 headers
.insert("key3-bin", "ëfßæ".getBytes());
93 public void mixedProperties() {
94 final FlightCallHeaders headers
= new FlightCallHeaders();
95 headers
.insert("key", "value");
96 headers
.insert("key3-bin", "ëfßæ".getBytes());
100 private void testHeaders(CallHeaders headers
) {
102 BufferAllocator a
= new RootAllocator(Long
.MAX_VALUE
);
103 HeaderProducer producer
= new HeaderProducer();
105 FlightTestUtil
.getStartedServer((location
) -> FlightServer
.builder(a
, location
, producer
).build());
106 FlightClient client
= FlightClient
.builder(a
, s
.getLocation()).build()) {
107 client
.doAction(new Action(""), new HeaderCallOption(headers
)).hasNext();
109 final CallHeaders incomingHeaders
= producer
.headers();
110 for (String key
: headers
.keys()) {
111 if (key
.endsWith(Metadata
.BINARY_HEADER_SUFFIX
)) {
112 Assert
.assertArrayEquals(headers
.getByte(key
), incomingHeaders
.getByte(key
));
114 Assert
.assertEquals(headers
.get(key
), incomingHeaders
.get(key
));
117 } catch (InterruptedException
| IOException e
) {
118 throw new RuntimeException(e
);
122 void test(Consumer
<FlightClient
> testFn
) {
124 BufferAllocator a
= new RootAllocator(Long
.MAX_VALUE
);
125 Producer producer
= new Producer();
127 FlightTestUtil
.getStartedServer((location
) -> FlightServer
.builder(a
, location
, producer
).build());
128 FlightClient client
= FlightClient
.builder(a
, s
.getLocation()).build()) {
129 testFn
.accept(client
);
130 } catch (InterruptedException
| IOException e
) {
131 throw new RuntimeException(e
);
135 static class HeaderProducer
extends NoOpFlightProducer
implements AutoCloseable
{
139 public void close() {
142 public CallHeaders
headers() {
147 public void doAction(CallContext context
, Action action
, StreamListener
<Result
> listener
) {
148 this.headers
= context
.getMiddleware(FlightConstants
.HEADER_KEY
).headers();
149 listener
.onCompleted();
153 static class Producer
extends NoOpFlightProducer
implements AutoCloseable
{
159 public void close() {
163 public void doAction(CallContext context
, Action action
, StreamListener
<Result
> listener
) {
164 switch (action
.getType()) {
168 } catch (InterruptedException e
) {
169 throw new RuntimeException(e
);
171 listener
.onNext(new Result(new byte[]{}));
172 listener
.onCompleted();
178 } catch (InterruptedException e
) {
179 throw new RuntimeException(e
);
181 listener
.onNext(new Result(new byte[]{42, 42}));
182 listener
.onCompleted();
186 throw new UnsupportedOperationException(action
.getType());