]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / csharp / test / Apache.Arrow.Tests / ArrowFileReaderTests.cs
1 // Licensed to the Apache Software Foundation (ASF) under one or more
2 // contributor license agreements. See the NOTICE file distributed with
3 // this work for additional information regarding copyright ownership.
4 // The ASF licenses this file to You under the Apache License, Version 2.0
5 // (the "License"); you may not use this file except in compliance with
6 // the License. You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 using Apache.Arrow.Ipc;
17 using Apache.Arrow.Memory;
18 using System;
19 using System.IO;
20 using System.Threading.Tasks;
21 using Xunit;
22
23 namespace Apache.Arrow.Tests
24 {
25 public class ArrowFileReaderTests
26 {
27 [Fact]
28 public void Ctor_LeaveOpenDefault_StreamClosedOnDispose()
29 {
30 var stream = new MemoryStream();
31 new ArrowFileReader(stream).Dispose();
32 Assert.Throws<ObjectDisposedException>(() => stream.Position);
33 }
34
35 [Fact]
36 public void Ctor_LeaveOpenFalse_StreamClosedOnDispose()
37 {
38 var stream = new MemoryStream();
39 new ArrowFileReader(stream, leaveOpen: false).Dispose();
40 Assert.Throws<ObjectDisposedException>(() => stream.Position);
41 }
42
43 [Fact]
44 public void Ctor_LeaveOpenTrue_StreamValidOnDispose()
45 {
46 var stream = new MemoryStream();
47 new ArrowFileReader(stream, leaveOpen: true).Dispose();
48 Assert.Equal(0, stream.Position);
49 }
50
51 [Theory]
52 [InlineData(true)]
53 [InlineData(false)]
54 public async Task Ctor_MemoryPool_AllocatesFromPool(bool shouldLeaveOpen)
55 {
56 RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
57
58 using (MemoryStream stream = new MemoryStream())
59 {
60 ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch.Schema);
61 await writer.WriteRecordBatchAsync(originalBatch);
62 await writer.WriteEndAsync();
63 stream.Position = 0;
64
65 var memoryPool = new TestMemoryAllocator();
66 ArrowFileReader reader = new ArrowFileReader(stream, memoryPool, leaveOpen: shouldLeaveOpen);
67 reader.ReadNextRecordBatch();
68
69 Assert.Equal(1, memoryPool.Statistics.Allocations);
70 Assert.True(memoryPool.Statistics.BytesAllocated > 0);
71
72 reader.Dispose();
73
74 if (shouldLeaveOpen)
75 {
76 Assert.True(stream.Position > 0);
77 }
78 else
79 {
80 Assert.Throws<ObjectDisposedException>(() => stream.Position);
81 }
82 }
83 }
84
85 [Fact]
86 public async Task TestReadNextRecordBatch()
87 {
88 await TestReadRecordBatchHelper((reader, originalBatch) =>
89 {
90 ArrowReaderVerifier.VerifyReader(reader, originalBatch);
91 return Task.CompletedTask;
92 });
93 }
94
95 [Fact]
96 public async Task TestReadNextRecordBatchAsync()
97 {
98 await TestReadRecordBatchHelper(ArrowReaderVerifier.VerifyReaderAsync);
99 }
100
101 [Fact]
102 public async Task TestReadRecordBatchAsync()
103 {
104 await TestReadRecordBatchHelper(async (reader, originalBatch) =>
105 {
106 RecordBatch readBatch = await reader.ReadRecordBatchAsync(0);
107 ArrowReaderVerifier.CompareBatches(originalBatch, readBatch);
108
109 // You should be able to read the same record batch again
110 RecordBatch readBatch2 = await reader.ReadRecordBatchAsync(0);
111 ArrowReaderVerifier.CompareBatches(originalBatch, readBatch2);
112 });
113 }
114
115 private static async Task TestReadRecordBatchHelper(
116 Func<ArrowFileReader, RecordBatch, Task> verificationFunc)
117 {
118 RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
119
120 using (MemoryStream stream = new MemoryStream())
121 {
122 ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch.Schema);
123 await writer.WriteRecordBatchAsync(originalBatch);
124 await writer.WriteEndAsync();
125 stream.Position = 0;
126
127 ArrowFileReader reader = new ArrowFileReader(stream);
128 await verificationFunc(reader, originalBatch);
129 }
130 }
131
132 [Fact]
133 public async Task TestReadMultipleRecordBatchAsync()
134 {
135 RecordBatch originalBatch1 = TestData.CreateSampleRecordBatch(length: 100);
136 RecordBatch originalBatch2 = TestData.CreateSampleRecordBatch(length: 50);
137
138 using (MemoryStream stream = new MemoryStream())
139 {
140 ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch1.Schema);
141 await writer.WriteRecordBatchAsync(originalBatch1);
142 await writer.WriteRecordBatchAsync(originalBatch2);
143 await writer.WriteEndAsync();
144 stream.Position = 0;
145
146 ArrowFileReader reader = new ArrowFileReader(stream);
147 RecordBatch readBatch1 = await reader.ReadRecordBatchAsync(0);
148 ArrowReaderVerifier.CompareBatches(originalBatch1, readBatch1);
149
150 RecordBatch readBatch2 = await reader.ReadRecordBatchAsync(1);
151 ArrowReaderVerifier.CompareBatches(originalBatch2, readBatch2);
152
153 // now read the first again, for random access
154 RecordBatch readBatch3 = await reader.ReadRecordBatchAsync(0);
155 ArrowReaderVerifier.CompareBatches(originalBatch1, readBatch3);
156 }
157 }
158 }
159 }