]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/flight/flight-core/src/test/java/org/apache/arrow/flight/example/TestExampleServer.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / flight / flight-core / src / test / java / org / apache / arrow / flight / example / TestExampleServer.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow.flight.example;
19
20import java.io.IOException;
21
22import org.apache.arrow.flight.AsyncPutListener;
23import org.apache.arrow.flight.FlightClient;
24import org.apache.arrow.flight.FlightClient.ClientStreamListener;
25import org.apache.arrow.flight.FlightDescriptor;
26import org.apache.arrow.flight.FlightInfo;
27import org.apache.arrow.flight.FlightStream;
28import org.apache.arrow.flight.FlightTestUtil;
29import org.apache.arrow.flight.Location;
30import org.apache.arrow.memory.BufferAllocator;
31import org.apache.arrow.memory.RootAllocator;
32import org.apache.arrow.util.AutoCloseables;
33import org.apache.arrow.vector.IntVector;
34import org.apache.arrow.vector.VectorSchemaRoot;
35import org.junit.After;
36import org.junit.Before;
37import org.junit.Ignore;
38import org.junit.Test;
39
40/**
41 * Ensure that example server supports get and put.
42 */
43public class TestExampleServer {
44
45 private BufferAllocator allocator;
46 private BufferAllocator caseAllocator;
47 private ExampleFlightServer server;
48 private FlightClient client;
49
50 @Before
51 public void start() throws IOException {
52 allocator = new RootAllocator(Long.MAX_VALUE);
53
54 Location l = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 12233);
55 if (!Boolean.getBoolean("disableServer")) {
56 System.out.println("Starting server.");
57 server = new ExampleFlightServer(allocator, l);
58 server.start();
59 } else {
60 System.out.println("Skipping server startup.");
61 }
62 client = FlightClient.builder(allocator, l).build();
63 caseAllocator = allocator.newChildAllocator("test-case", 0, Long.MAX_VALUE);
64 }
65
66 @After
67 public void after() throws Exception {
68 AutoCloseables.close(server, client, caseAllocator, allocator);
69 }
70
71 @Test
72 @Ignore
73 public void putStream() {
74 BufferAllocator a = caseAllocator;
75 final int size = 10;
76
77 IntVector iv = new IntVector("c1", a);
78
79 VectorSchemaRoot root = VectorSchemaRoot.of(iv);
80 ClientStreamListener listener = client.startPut(FlightDescriptor.path("hello"), root,
81 new AsyncPutListener());
82
83 //batch 1
84 root.allocateNew();
85 for (int i = 0; i < size; i++) {
86 iv.set(i, i);
87 }
88 iv.setValueCount(size);
89 root.setRowCount(size);
90 listener.putNext();
91
92 // batch 2
93
94 root.allocateNew();
95 for (int i = 0; i < size; i++) {
96 iv.set(i, i + size);
97 }
98 iv.setValueCount(size);
99 root.setRowCount(size);
100 listener.putNext();
101 root.clear();
102 listener.completed();
103
104 // wait for ack to avoid memory leaks.
105 listener.getResult();
106
107 FlightInfo info = client.getInfo(FlightDescriptor.path("hello"));
108 try (final FlightStream stream = client.getStream(info.getEndpoints().get(0).getTicket())) {
109 VectorSchemaRoot newRoot = stream.getRoot();
110 while (stream.next()) {
111 newRoot.clear();
112 }
113 } catch (Exception e) {
114 throw new RuntimeException(e);
115 }
116 }
117}