1 // Licensed to the Apache Software Foundation (ASF) under one or more
2 // contributor license agreements. See the NOTICE file distributed with
3 // this work for additional information regarding copyright ownership.
4 // The ASF licenses this file to You under the Apache License, Version 2.0
5 // (the "License"); you may not use this file except in compliance with
6 // the License. You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
16 using Apache.Arrow.Ipc;
17 using Apache.Arrow.Memory;
20 using System.Threading.Tasks;
23 namespace Apache.Arrow.Tests
25 public class ArrowFileReaderTests
28 public void Ctor_LeaveOpenDefault_StreamClosedOnDispose()
30 var stream = new MemoryStream();
31 new ArrowFileReader(stream).Dispose();
32 Assert.Throws<ObjectDisposedException>(() => stream.Position);
36 public void Ctor_LeaveOpenFalse_StreamClosedOnDispose()
38 var stream = new MemoryStream();
39 new ArrowFileReader(stream, leaveOpen: false).Dispose();
40 Assert.Throws<ObjectDisposedException>(() => stream.Position);
44 public void Ctor_LeaveOpenTrue_StreamValidOnDispose()
46 var stream = new MemoryStream();
47 new ArrowFileReader(stream, leaveOpen: true).Dispose();
48 Assert.Equal(0, stream.Position);
54 public async Task Ctor_MemoryPool_AllocatesFromPool(bool shouldLeaveOpen)
56 RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
58 using (MemoryStream stream = new MemoryStream())
60 ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch.Schema);
61 await writer.WriteRecordBatchAsync(originalBatch);
62 await writer.WriteEndAsync();
65 var memoryPool = new TestMemoryAllocator();
66 ArrowFileReader reader = new ArrowFileReader(stream, memoryPool, leaveOpen: shouldLeaveOpen);
67 reader.ReadNextRecordBatch();
69 Assert.Equal(1, memoryPool.Statistics.Allocations);
70 Assert.True(memoryPool.Statistics.BytesAllocated > 0);
76 Assert.True(stream.Position > 0);
80 Assert.Throws<ObjectDisposedException>(() => stream.Position);
86 public async Task TestReadNextRecordBatch()
88 await TestReadRecordBatchHelper((reader, originalBatch) =>
90 ArrowReaderVerifier.VerifyReader(reader, originalBatch);
91 return Task.CompletedTask;
96 public async Task TestReadNextRecordBatchAsync()
98 await TestReadRecordBatchHelper(ArrowReaderVerifier.VerifyReaderAsync);
102 public async Task TestReadRecordBatchAsync()
104 await TestReadRecordBatchHelper(async (reader, originalBatch) =>
106 RecordBatch readBatch = await reader.ReadRecordBatchAsync(0);
107 ArrowReaderVerifier.CompareBatches(originalBatch, readBatch);
109 // You should be able to read the same record batch again
110 RecordBatch readBatch2 = await reader.ReadRecordBatchAsync(0);
111 ArrowReaderVerifier.CompareBatches(originalBatch, readBatch2);
115 private static async Task TestReadRecordBatchHelper(
116 Func<ArrowFileReader, RecordBatch, Task> verificationFunc)
118 RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
120 using (MemoryStream stream = new MemoryStream())
122 ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch.Schema);
123 await writer.WriteRecordBatchAsync(originalBatch);
124 await writer.WriteEndAsync();
127 ArrowFileReader reader = new ArrowFileReader(stream);
128 await verificationFunc(reader, originalBatch);
133 public async Task TestReadMultipleRecordBatchAsync()
135 RecordBatch originalBatch1 = TestData.CreateSampleRecordBatch(length: 100);
136 RecordBatch originalBatch2 = TestData.CreateSampleRecordBatch(length: 50);
138 using (MemoryStream stream = new MemoryStream())
140 ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch1.Schema);
141 await writer.WriteRecordBatchAsync(originalBatch1);
142 await writer.WriteRecordBatchAsync(originalBatch2);
143 await writer.WriteEndAsync();
146 ArrowFileReader reader = new ArrowFileReader(stream);
147 RecordBatch readBatch1 = await reader.ReadRecordBatchAsync(0);
148 ArrowReaderVerifier.CompareBatches(originalBatch1, readBatch1);
150 RecordBatch readBatch2 = await reader.ReadRecordBatchAsync(1);
151 ArrowReaderVerifier.CompareBatches(originalBatch2, readBatch2);
153 // now read the first again, for random access
154 RecordBatch readBatch3 = await reader.ReadRecordBatchAsync(0);
155 ArrowReaderVerifier.CompareBatches(originalBatch1, readBatch3);