2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.flight
;
20 import java
.util
.Arrays
;
21 import java
.util
.List
;
22 import java
.util
.stream
.Stream
;
24 import org
.apache
.arrow
.memory
.BufferAllocator
;
25 import org
.apache
.arrow
.memory
.RootAllocator
;
26 import org
.apache
.arrow
.vector
.IntVector
;
27 import org
.apache
.arrow
.vector
.VectorSchemaRoot
;
28 import org
.apache
.arrow
.vector
.types
.pojo
.ArrowType
;
29 import org
.apache
.arrow
.vector
.types
.pojo
.Field
;
30 import org
.apache
.arrow
.vector
.types
.pojo
.FieldType
;
31 import org
.apache
.arrow
.vector
.types
.pojo
.Schema
;
32 import org
.junit
.Assert
;
33 import org
.junit
.Test
;
35 public class TestLargeMessage
{
37 * Make sure a Flight client accepts large message payloads by default.
40 public void getLargeMessage() throws Exception
{
41 try (final BufferAllocator a
= new RootAllocator(Long
.MAX_VALUE
);
42 final Producer producer
= new Producer(a
);
43 final FlightServer s
=
44 FlightTestUtil
.getStartedServer((location
) -> FlightServer
.builder(a
, location
, producer
).build())) {
46 try (FlightClient client
= FlightClient
.builder(a
, s
.getLocation()).build()) {
47 try (FlightStream stream
= client
.getStream(new Ticket(new byte[]{}));
48 VectorSchemaRoot root
= stream
.getRoot()) {
49 while (stream
.next()) {
50 for (final Field field
: root
.getSchema().getFields()) {
52 final IntVector iv
= (IntVector
) root
.getVector(field
.getName());
53 for (int i
= 0; i
< root
.getRowCount(); i
++) {
54 Assert
.assertEquals(value
, iv
.get(i
));
65 * Make sure a Flight server accepts large message payloads by default.
68 public void putLargeMessage() throws Exception
{
69 try (final BufferAllocator a
= new RootAllocator(Long
.MAX_VALUE
);
70 final Producer producer
= new Producer(a
);
71 final FlightServer s
=
72 FlightTestUtil
.getStartedServer((location
) -> FlightServer
.builder(a
, location
, producer
).build()
75 try (FlightClient client
= FlightClient
.builder(a
, s
.getLocation()).build();
76 BufferAllocator testAllocator
= a
.newChildAllocator("testcase", 0, Long
.MAX_VALUE
);
77 VectorSchemaRoot root
= generateData(testAllocator
)) {
78 final FlightClient
.ClientStreamListener listener
= client
.startPut(FlightDescriptor
.path("hello"), root
,
79 new AsyncPutListener());
87 private static VectorSchemaRoot
generateData(BufferAllocator allocator
) {
88 final int size
= 128 * 1024;
89 final List
<String
> fieldNames
= Arrays
.asList("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10");
90 final Stream
<Field
> fields
= fieldNames
92 .map(fieldName
-> new Field(fieldName
, FieldType
.nullable(new ArrowType
.Int(32, true)), null));
93 final Schema schema
= new Schema(fields
::iterator
, null);
95 final VectorSchemaRoot root
= VectorSchemaRoot
.create(schema
, allocator
);
97 for (final String fieldName
: fieldNames
) {
98 final IntVector iv
= (IntVector
) root
.getVector(fieldName
);
99 iv
.setValueCount(size
);
100 for (int i
= 0; i
< size
; i
++) {
104 root
.setRowCount(size
);
108 private static class Producer
implements FlightProducer
, AutoCloseable
{
109 private final BufferAllocator allocator
;
111 Producer(BufferAllocator allocator
) {
112 this.allocator
= allocator
;
116 public void getStream(CallContext context
, Ticket ticket
,
117 ServerStreamListener listener
) {
118 try (VectorSchemaRoot root
= generateData(allocator
)) {
119 listener
.start(root
);
121 listener
.completed();
126 public void listFlights(CallContext context
, Criteria criteria
,
127 StreamListener
<FlightInfo
> listener
) {
132 public FlightInfo
getFlightInfo(CallContext context
,
133 FlightDescriptor descriptor
) {
138 public Runnable
acceptPut(CallContext context
, FlightStream flightStream
, StreamListener
<PutResult
> ackStream
) {
140 try (VectorSchemaRoot root
= flightStream
.getRoot()) {
141 while (flightStream
.next()) {
149 public void doAction(CallContext context
, Action action
,
150 StreamListener
<Result
> listener
) {
151 listener
.onCompleted();
155 public void listActions(CallContext context
,
156 StreamListener
<ActionType
> listener
) {
161 public void close() throws Exception
{