--- /dev/null
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+ public class ArrowFileReaderTests
+ {
+ [Fact]
+ public void Ctor_LeaveOpenDefault_StreamClosedOnDispose()
+ {
+ var stream = new MemoryStream();
+ new ArrowFileReader(stream).Dispose();
+ Assert.Throws<ObjectDisposedException>(() => stream.Position);
+ }
+
+ [Fact]
+ public void Ctor_LeaveOpenFalse_StreamClosedOnDispose()
+ {
+ var stream = new MemoryStream();
+ new ArrowFileReader(stream, leaveOpen: false).Dispose();
+ Assert.Throws<ObjectDisposedException>(() => stream.Position);
+ }
+
+ [Fact]
+ public void Ctor_LeaveOpenTrue_StreamValidOnDispose()
+ {
+ var stream = new MemoryStream();
+ new ArrowFileReader(stream, leaveOpen: true).Dispose();
+ Assert.Equal(0, stream.Position);
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task Ctor_MemoryPool_AllocatesFromPool(bool shouldLeaveOpen)
+ {
+ RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+
+ using (MemoryStream stream = new MemoryStream())
+ {
+ ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch.Schema);
+ await writer.WriteRecordBatchAsync(originalBatch);
+ await writer.WriteEndAsync();
+ stream.Position = 0;
+
+ var memoryPool = new TestMemoryAllocator();
+ ArrowFileReader reader = new ArrowFileReader(stream, memoryPool, leaveOpen: shouldLeaveOpen);
+ reader.ReadNextRecordBatch();
+
+ Assert.Equal(1, memoryPool.Statistics.Allocations);
+ Assert.True(memoryPool.Statistics.BytesAllocated > 0);
+
+ reader.Dispose();
+
+ if (shouldLeaveOpen)
+ {
+ Assert.True(stream.Position > 0);
+ }
+ else
+ {
+ Assert.Throws<ObjectDisposedException>(() => stream.Position);
+ }
+ }
+ }
+
+ [Fact]
+ public async Task TestReadNextRecordBatch()
+ {
+ await TestReadRecordBatchHelper((reader, originalBatch) =>
+ {
+ ArrowReaderVerifier.VerifyReader(reader, originalBatch);
+ return Task.CompletedTask;
+ });
+ }
+
+ [Fact]
+ public async Task TestReadNextRecordBatchAsync()
+ {
+ await TestReadRecordBatchHelper(ArrowReaderVerifier.VerifyReaderAsync);
+ }
+
+ [Fact]
+ public async Task TestReadRecordBatchAsync()
+ {
+ await TestReadRecordBatchHelper(async (reader, originalBatch) =>
+ {
+ RecordBatch readBatch = await reader.ReadRecordBatchAsync(0);
+ ArrowReaderVerifier.CompareBatches(originalBatch, readBatch);
+
+ // You should be able to read the same record batch again
+ RecordBatch readBatch2 = await reader.ReadRecordBatchAsync(0);
+ ArrowReaderVerifier.CompareBatches(originalBatch, readBatch2);
+ });
+ }
+
+ private static async Task TestReadRecordBatchHelper(
+ Func<ArrowFileReader, RecordBatch, Task> verificationFunc)
+ {
+ RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+
+ using (MemoryStream stream = new MemoryStream())
+ {
+ ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch.Schema);
+ await writer.WriteRecordBatchAsync(originalBatch);
+ await writer.WriteEndAsync();
+ stream.Position = 0;
+
+ ArrowFileReader reader = new ArrowFileReader(stream);
+ await verificationFunc(reader, originalBatch);
+ }
+ }
+
+ [Fact]
+ public async Task TestReadMultipleRecordBatchAsync()
+ {
+ RecordBatch originalBatch1 = TestData.CreateSampleRecordBatch(length: 100);
+ RecordBatch originalBatch2 = TestData.CreateSampleRecordBatch(length: 50);
+
+ using (MemoryStream stream = new MemoryStream())
+ {
+ ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch1.Schema);
+ await writer.WriteRecordBatchAsync(originalBatch1);
+ await writer.WriteRecordBatchAsync(originalBatch2);
+ await writer.WriteEndAsync();
+ stream.Position = 0;
+
+ ArrowFileReader reader = new ArrowFileReader(stream);
+ RecordBatch readBatch1 = await reader.ReadRecordBatchAsync(0);
+ ArrowReaderVerifier.CompareBatches(originalBatch1, readBatch1);
+
+ RecordBatch readBatch2 = await reader.ReadRecordBatchAsync(1);
+ ArrowReaderVerifier.CompareBatches(originalBatch2, readBatch2);
+
+ // now read the first again, for random access
+ RecordBatch readBatch3 = await reader.ReadRecordBatchAsync(0);
+ ArrowReaderVerifier.CompareBatches(originalBatch1, readBatch3);
+ }
+ }
+ }
+}