]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.flight.example; | |
19 | ||
20 | import java.io.IOException; | |
21 | ||
22 | import org.apache.arrow.flight.AsyncPutListener; | |
23 | import org.apache.arrow.flight.FlightClient; | |
24 | import org.apache.arrow.flight.FlightClient.ClientStreamListener; | |
25 | import org.apache.arrow.flight.FlightDescriptor; | |
26 | import org.apache.arrow.flight.FlightInfo; | |
27 | import org.apache.arrow.flight.FlightStream; | |
28 | import org.apache.arrow.flight.FlightTestUtil; | |
29 | import org.apache.arrow.flight.Location; | |
30 | import org.apache.arrow.memory.BufferAllocator; | |
31 | import org.apache.arrow.memory.RootAllocator; | |
32 | import org.apache.arrow.util.AutoCloseables; | |
33 | import org.apache.arrow.vector.IntVector; | |
34 | import org.apache.arrow.vector.VectorSchemaRoot; | |
35 | import org.junit.After; | |
36 | import org.junit.Before; | |
37 | import org.junit.Ignore; | |
38 | import org.junit.Test; | |
39 | ||
40 | /** | |
41 | * Ensure that example server supports get and put. | |
42 | */ | |
43 | public class TestExampleServer { | |
44 | ||
45 | private BufferAllocator allocator; | |
46 | private BufferAllocator caseAllocator; | |
47 | private ExampleFlightServer server; | |
48 | private FlightClient client; | |
49 | ||
50 | @Before | |
51 | public void start() throws IOException { | |
52 | allocator = new RootAllocator(Long.MAX_VALUE); | |
53 | ||
54 | Location l = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 12233); | |
55 | if (!Boolean.getBoolean("disableServer")) { | |
56 | System.out.println("Starting server."); | |
57 | server = new ExampleFlightServer(allocator, l); | |
58 | server.start(); | |
59 | } else { | |
60 | System.out.println("Skipping server startup."); | |
61 | } | |
62 | client = FlightClient.builder(allocator, l).build(); | |
63 | caseAllocator = allocator.newChildAllocator("test-case", 0, Long.MAX_VALUE); | |
64 | } | |
65 | ||
66 | @After | |
67 | public void after() throws Exception { | |
68 | AutoCloseables.close(server, client, caseAllocator, allocator); | |
69 | } | |
70 | ||
71 | @Test | |
72 | @Ignore | |
73 | public void putStream() { | |
74 | BufferAllocator a = caseAllocator; | |
75 | final int size = 10; | |
76 | ||
77 | IntVector iv = new IntVector("c1", a); | |
78 | ||
79 | VectorSchemaRoot root = VectorSchemaRoot.of(iv); | |
80 | ClientStreamListener listener = client.startPut(FlightDescriptor.path("hello"), root, | |
81 | new AsyncPutListener()); | |
82 | ||
83 | //batch 1 | |
84 | root.allocateNew(); | |
85 | for (int i = 0; i < size; i++) { | |
86 | iv.set(i, i); | |
87 | } | |
88 | iv.setValueCount(size); | |
89 | root.setRowCount(size); | |
90 | listener.putNext(); | |
91 | ||
92 | // batch 2 | |
93 | ||
94 | root.allocateNew(); | |
95 | for (int i = 0; i < size; i++) { | |
96 | iv.set(i, i + size); | |
97 | } | |
98 | iv.setValueCount(size); | |
99 | root.setRowCount(size); | |
100 | listener.putNext(); | |
101 | root.clear(); | |
102 | listener.completed(); | |
103 | ||
104 | // wait for ack to avoid memory leaks. | |
105 | listener.getResult(); | |
106 | ||
107 | FlightInfo info = client.getInfo(FlightDescriptor.path("hello")); | |
108 | try (final FlightStream stream = client.getStream(info.getEndpoints().get(0).getTicket())) { | |
109 | VectorSchemaRoot newRoot = stream.getRoot(); | |
110 | while (stream.next()) { | |
111 | newRoot.clear(); | |
112 | } | |
113 | } catch (Exception e) { | |
114 | throw new RuntimeException(e); | |
115 | } | |
116 | } | |
117 | } |