]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestLargeMessage.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / flight / flight-core / src / test / java / org / apache / arrow / flight / TestLargeMessage.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.flight;
19
20 import java.util.Arrays;
21 import java.util.List;
22 import java.util.stream.Stream;
23
24 import org.apache.arrow.memory.BufferAllocator;
25 import org.apache.arrow.memory.RootAllocator;
26 import org.apache.arrow.vector.IntVector;
27 import org.apache.arrow.vector.VectorSchemaRoot;
28 import org.apache.arrow.vector.types.pojo.ArrowType;
29 import org.apache.arrow.vector.types.pojo.Field;
30 import org.apache.arrow.vector.types.pojo.FieldType;
31 import org.apache.arrow.vector.types.pojo.Schema;
32 import org.junit.Assert;
33 import org.junit.Test;
34
35 public class TestLargeMessage {
36 /**
37 * Make sure a Flight client accepts large message payloads by default.
38 */
39 @Test
40 public void getLargeMessage() throws Exception {
41 try (final BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
42 final Producer producer = new Producer(a);
43 final FlightServer s =
44 FlightTestUtil.getStartedServer((location) -> FlightServer.builder(a, location, producer).build())) {
45
46 try (FlightClient client = FlightClient.builder(a, s.getLocation()).build()) {
47 try (FlightStream stream = client.getStream(new Ticket(new byte[]{}));
48 VectorSchemaRoot root = stream.getRoot()) {
49 while (stream.next()) {
50 for (final Field field : root.getSchema().getFields()) {
51 int value = 0;
52 final IntVector iv = (IntVector) root.getVector(field.getName());
53 for (int i = 0; i < root.getRowCount(); i++) {
54 Assert.assertEquals(value, iv.get(i));
55 value++;
56 }
57 }
58 }
59 }
60 }
61 }
62 }
63
64 /**
65 * Make sure a Flight server accepts large message payloads by default.
66 */
67 @Test
68 public void putLargeMessage() throws Exception {
69 try (final BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
70 final Producer producer = new Producer(a);
71 final FlightServer s =
72 FlightTestUtil.getStartedServer((location) -> FlightServer.builder(a, location, producer).build()
73 )) {
74
75 try (FlightClient client = FlightClient.builder(a, s.getLocation()).build();
76 BufferAllocator testAllocator = a.newChildAllocator("testcase", 0, Long.MAX_VALUE);
77 VectorSchemaRoot root = generateData(testAllocator)) {
78 final FlightClient.ClientStreamListener listener = client.startPut(FlightDescriptor.path("hello"), root,
79 new AsyncPutListener());
80 listener.putNext();
81 listener.completed();
82 listener.getResult();
83 }
84 }
85 }
86
87 private static VectorSchemaRoot generateData(BufferAllocator allocator) {
88 final int size = 128 * 1024;
89 final List<String> fieldNames = Arrays.asList("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10");
90 final Stream<Field> fields = fieldNames
91 .stream()
92 .map(fieldName -> new Field(fieldName, FieldType.nullable(new ArrowType.Int(32, true)), null));
93 final Schema schema = new Schema(fields::iterator, null);
94
95 final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
96 root.allocateNew();
97 for (final String fieldName : fieldNames) {
98 final IntVector iv = (IntVector) root.getVector(fieldName);
99 iv.setValueCount(size);
100 for (int i = 0; i < size; i++) {
101 iv.set(i, i);
102 }
103 }
104 root.setRowCount(size);
105 return root;
106 }
107
108 private static class Producer implements FlightProducer, AutoCloseable {
109 private final BufferAllocator allocator;
110
111 Producer(BufferAllocator allocator) {
112 this.allocator = allocator;
113 }
114
115 @Override
116 public void getStream(CallContext context, Ticket ticket,
117 ServerStreamListener listener) {
118 try (VectorSchemaRoot root = generateData(allocator)) {
119 listener.start(root);
120 listener.putNext();
121 listener.completed();
122 }
123 }
124
125 @Override
126 public void listFlights(CallContext context, Criteria criteria,
127 StreamListener<FlightInfo> listener) {
128
129 }
130
131 @Override
132 public FlightInfo getFlightInfo(CallContext context,
133 FlightDescriptor descriptor) {
134 return null;
135 }
136
137 @Override
138 public Runnable acceptPut(CallContext context, FlightStream flightStream, StreamListener<PutResult> ackStream) {
139 return () -> {
140 try (VectorSchemaRoot root = flightStream.getRoot()) {
141 while (flightStream.next()) {
142 ;
143 }
144 }
145 };
146 }
147
148 @Override
149 public void doAction(CallContext context, Action action,
150 StreamListener<Result> listener) {
151 listener.onCompleted();
152 }
153
154 @Override
155 public void listActions(CallContext context,
156 StreamListener<ActionType> listener) {
157
158 }
159
160 @Override
161 public void close() throws Exception {
162 allocator.close();
163 }
164 }
165 }