]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / csharp / test / Apache.Arrow.Flight.Tests / FlightTests.cs
1 // Licensed to the Apache Software Foundation (ASF) under one or more
2 // contributor license agreements. See the NOTICE file distributed with
3 // this work for additional information regarding copyright ownership.
4 // The ASF licenses this file to You under the Apache License, Version 2.0
5 // (the "License"); you may not use this file except in compliance with
6 // the License. You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 using System;
17 using System.Collections.Generic;
18 using System.Linq;
19 using System.Threading.Tasks;
20 using Apache.Arrow.Flight.Client;
21 using Apache.Arrow.Flight.TestWeb;
22 using Apache.Arrow.Tests;
23 using Google.Protobuf;
24 using Grpc.Core.Utils;
25 using Xunit;
26
27 namespace Apache.Arrow.Flight.Tests
28 {
29 public class FlightTests : IDisposable
30 {
31 readonly TestWebFactory _testWebFactory;
32 readonly FlightClient _flightClient;
33 readonly FlightStore _flightStore;
34 public FlightTests()
35 {
36 _flightStore = new FlightStore();
37 _testWebFactory = new TestWebFactory(_flightStore);
38 _flightClient = new FlightClient(_testWebFactory.GetChannel());
39 }
40
41 public void Dispose()
42 {
43 _testWebFactory.Dispose();
44 }
45
46 private RecordBatch CreateTestBatch(int startValue, int length)
47 {
48 var batchBuilder = new RecordBatch.Builder();
49 Int32Array.Builder builder = new Int32Array.Builder();
50 for (int i = 0; i < length; i++)
51 {
52 builder.Append(startValue + i);
53 }
54 batchBuilder.Append("test", true, builder.Build());
55 return batchBuilder.Build();
56 }
57
58
59 private IEnumerable<RecordBatchWithMetadata> GetStoreBatch(FlightDescriptor flightDescriptor)
60 {
61 Assert.Contains(flightDescriptor, (IReadOnlyDictionary<FlightDescriptor, FlightHolder>)_flightStore.Flights);
62
63 var flightHolder = _flightStore.Flights[flightDescriptor];
64 return flightHolder.GetRecordBatches();
65 }
66
67 private FlightInfo GivenStoreBatches(FlightDescriptor flightDescriptor, params RecordBatchWithMetadata[] batches)
68 {
69 var initialBatch = batches.FirstOrDefault();
70
71 var flightHolder = new FlightHolder(flightDescriptor, initialBatch.RecordBatch.Schema, _testWebFactory.GetAddress());
72
73 foreach(var batch in batches)
74 {
75 flightHolder.AddBatch(batch);
76 }
77
78 _flightStore.Flights.Add(flightDescriptor, flightHolder);
79
80 return flightHolder.GetFlightInfo();
81 }
82
83 [Fact]
84 public async Task TestPutSingleRecordBatch()
85 {
86 var flightDescriptor = FlightDescriptor.CreatePathDescriptor("test");
87 var expectedBatch = CreateTestBatch(0, 100);
88
89 var putStream = _flightClient.StartPut(flightDescriptor);
90 await putStream.RequestStream.WriteAsync(expectedBatch);
91 await putStream.RequestStream.CompleteAsync();
92 var putResults = await putStream.ResponseStream.ToListAsync();
93
94 Assert.Single(putResults);
95
96 var actualBatches = GetStoreBatch(flightDescriptor);
97 Assert.Single(actualBatches);
98
99 ArrowReaderVerifier.CompareBatches(expectedBatch, actualBatches.First().RecordBatch);
100 }
101
102 [Fact]
103 public async Task TestPutTwoRecordBatches()
104 {
105 var flightDescriptor = FlightDescriptor.CreatePathDescriptor("test");
106 var expectedBatch1 = CreateTestBatch(0, 100);
107 var expectedBatch2 = CreateTestBatch(0, 100);
108
109 var putStream = _flightClient.StartPut(flightDescriptor);
110 await putStream.RequestStream.WriteAsync(expectedBatch1);
111 await putStream.RequestStream.WriteAsync(expectedBatch2);
112 await putStream.RequestStream.CompleteAsync();
113 var putResults = await putStream.ResponseStream.ToListAsync();
114
115 Assert.Equal(2, putResults.Count);
116
117 var actualBatches = GetStoreBatch(flightDescriptor).ToList();
118 Assert.Equal(2, actualBatches.Count);
119
120 ArrowReaderVerifier.CompareBatches(expectedBatch1, actualBatches[0].RecordBatch);
121 ArrowReaderVerifier.CompareBatches(expectedBatch2, actualBatches[1].RecordBatch);
122 }
123
124 [Fact]
125 public async Task TestGetSingleRecordBatch()
126 {
127 var flightDescriptor = FlightDescriptor.CreatePathDescriptor("test");
128 var expectedBatch = CreateTestBatch(0, 100);
129
130 //Add batch to the in memory store
131 GivenStoreBatches(flightDescriptor, new RecordBatchWithMetadata(expectedBatch));
132
133 //Get the flight info for the ticket
134 var flightInfo = await _flightClient.GetInfo(flightDescriptor);
135 Assert.Single(flightInfo.Endpoints);
136
137 var endpoint = flightInfo.Endpoints.FirstOrDefault();
138
139 var getStream = _flightClient.GetStream(endpoint.Ticket);
140 var resultList = await getStream.ResponseStream.ToListAsync();
141
142 Assert.Single(resultList);
143 ArrowReaderVerifier.CompareBatches(expectedBatch, resultList[0]);
144 }
145
146 [Fact]
147 public async Task TestGetTwoRecordBatch()
148 {
149 var flightDescriptor = FlightDescriptor.CreatePathDescriptor("test");
150 var expectedBatch1 = CreateTestBatch(0, 100);
151 var expectedBatch2 = CreateTestBatch(100, 100);
152
153 //Add batch to the in memory store
154 GivenStoreBatches(flightDescriptor, new RecordBatchWithMetadata(expectedBatch1), new RecordBatchWithMetadata(expectedBatch2));
155
156 //Get the flight info for the ticket
157 var flightInfo = await _flightClient.GetInfo(flightDescriptor);
158 Assert.Single(flightInfo.Endpoints);
159
160 var endpoint = flightInfo.Endpoints.FirstOrDefault();
161
162 var getStream = _flightClient.GetStream(endpoint.Ticket);
163 var resultList = await getStream.ResponseStream.ToListAsync();
164
165 Assert.Equal(2, resultList.Count);
166 ArrowReaderVerifier.CompareBatches(expectedBatch1, resultList[0]);
167 ArrowReaderVerifier.CompareBatches(expectedBatch2, resultList[1]);
168 }
169
170 [Fact]
171 public async Task TestGetFlightMetadata()
172 {
173 var flightDescriptor = FlightDescriptor.CreatePathDescriptor("test");
174 var expectedBatch1 = CreateTestBatch(0, 100);
175
176 var expectedMetadata = ByteString.CopyFromUtf8("test metadata");
177 var expectedMetadataList = new List<ByteString>() { expectedMetadata };
178
179 //Add batch to the in memory store
180 GivenStoreBatches(flightDescriptor, new RecordBatchWithMetadata(expectedBatch1, expectedMetadata));
181
182 //Get the flight info for the ticket
183 var flightInfo = await _flightClient.GetInfo(flightDescriptor);
184 Assert.Single(flightInfo.Endpoints);
185
186 var endpoint = flightInfo.Endpoints.FirstOrDefault();
187
188 var getStream = _flightClient.GetStream(endpoint.Ticket);
189
190 List<ByteString> actualMetadata = new List<ByteString>();
191 while(await getStream.ResponseStream.MoveNext(default))
192 {
193 actualMetadata.AddRange(getStream.ResponseStream.ApplicationMetadata);
194 }
195
196 Assert.Equal(expectedMetadataList, actualMetadata);
197 }
198
199 [Fact]
200 public async Task TestPutWithMetadata()
201 {
202 var flightDescriptor = FlightDescriptor.CreatePathDescriptor("test");
203 var expectedBatch = CreateTestBatch(0, 100);
204 var expectedMetadata = ByteString.CopyFromUtf8("test metadata");
205
206 var putStream = _flightClient.StartPut(flightDescriptor);
207 await putStream.RequestStream.WriteAsync(expectedBatch, expectedMetadata);
208 await putStream.RequestStream.CompleteAsync();
209 var putResults = await putStream.ResponseStream.ToListAsync();
210
211 Assert.Single(putResults);
212
213 var actualBatches = GetStoreBatch(flightDescriptor);
214 Assert.Single(actualBatches);
215
216 ArrowReaderVerifier.CompareBatches(expectedBatch, actualBatches.First().RecordBatch);
217 Assert.Equal(expectedMetadata, actualBatches.First().Metadata);
218 }
219
220 [Fact]
221 public async Task TestGetSchema()
222 {
223 var flightDescriptor = FlightDescriptor.CreatePathDescriptor("test");
224 var expectedBatch = CreateTestBatch(0, 100);
225 var expectedSchema = expectedBatch.Schema;
226
227 GivenStoreBatches(flightDescriptor, new RecordBatchWithMetadata(expectedBatch));
228
229 var actualSchema = await _flightClient.GetSchema(flightDescriptor);
230
231 SchemaComparer.Compare(expectedSchema, actualSchema);
232 }
233
234 [Fact]
235 public async Task TestDoAction()
236 {
237 var expectedResult = new List<FlightResult>()
238 {
239 new FlightResult("test data")
240 };
241
242 var resultStream = _flightClient.DoAction(new FlightAction("test"));
243 var actualResult = await resultStream.ResponseStream.ToListAsync();
244
245 Assert.Equal(expectedResult, actualResult);
246 }
247
248 [Fact]
249 public async Task TestListActions()
250 {
251 var expected = new List<FlightActionType>()
252 {
253 new FlightActionType("get", "get a flight"),
254 new FlightActionType("put", "add a flight"),
255 new FlightActionType("delete", "delete a flight"),
256 new FlightActionType("test", "test action")
257 };
258
259 var actual = await _flightClient.ListActions().ResponseStream.ToListAsync();
260
261 Assert.Equal(expected, actual);
262 }
263
264 [Fact]
265 public async Task TestListFlights()
266 {
267 var flightDescriptor1 = FlightDescriptor.CreatePathDescriptor("test1");
268 var flightDescriptor2 = FlightDescriptor.CreatePathDescriptor("test2");
269 var expectedBatch = CreateTestBatch(0, 100);
270
271 List<FlightInfo> expectedFlightInfo = new List<FlightInfo>();
272
273 expectedFlightInfo.Add(GivenStoreBatches(flightDescriptor1, new RecordBatchWithMetadata(expectedBatch)));
274 expectedFlightInfo.Add(GivenStoreBatches(flightDescriptor2, new RecordBatchWithMetadata(expectedBatch)));
275
276 var listFlightStream = _flightClient.ListFlights();
277
278 var actualFlights = await listFlightStream.ResponseStream.ToListAsync();
279
280 for(int i = 0; i < expectedFlightInfo.Count; i++)
281 {
282 FlightInfoComparer.Compare(expectedFlightInfo[i], actualFlights[i]);
283 }
284 }
285
286 [Fact]
287 public async Task TestGetBatchesWithAsyncEnumerable()
288 {
289 var flightDescriptor = FlightDescriptor.CreatePathDescriptor("test");
290 var expectedBatch1 = CreateTestBatch(0, 100);
291 var expectedBatch2 = CreateTestBatch(100, 100);
292
293 //Add batch to the in memory store
294 GivenStoreBatches(flightDescriptor, new RecordBatchWithMetadata(expectedBatch1), new RecordBatchWithMetadata(expectedBatch2));
295
296 //Get the flight info for the ticket
297 var flightInfo = await _flightClient.GetInfo(flightDescriptor);
298 Assert.Single(flightInfo.Endpoints);
299
300 var endpoint = flightInfo.Endpoints.FirstOrDefault();
301
302 var getStream = _flightClient.GetStream(endpoint.Ticket);
303
304
305 List<RecordBatch> resultList = new List<RecordBatch>();
306 await foreach(var recordBatch in getStream.ResponseStream)
307 {
308 resultList.Add(recordBatch);
309 }
310
311 Assert.Equal(2, resultList.Count);
312 ArrowReaderVerifier.CompareBatches(expectedBatch1, resultList[0]);
313 ArrowReaderVerifier.CompareBatches(expectedBatch2, resultList[1]);
314 }
315 }
316 }