]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/js/test/unit/ipc/validate.ts
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / js / test / unit / ipc / validate.ts
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18import '../../jest-extensions';
19
20import {
21 Schema,
22 RecordBatch,
23 RecordBatchReader,
24 RecordBatchFileReader,
25 RecordBatchStreamReader,
26} from 'apache-arrow';
27
28export function validateRecordBatchReader<T extends RecordBatchFileReader | RecordBatchStreamReader>(type: 'json' | 'file' | 'stream', numBatches: number, r: T) {
29 const reader = r.open();
30 expect(reader).toBeInstanceOf(RecordBatchReader);
31 expect(type === 'file' ? reader.isFile() : reader.isStream()).toBe(true);
32 expect(reader.schema).toBeInstanceOf(Schema);
33 validateRecordBatchIterator(numBatches, reader[Symbol.iterator]());
34 expect(reader.closed).toBe(reader.autoDestroy);
35 return reader;
36}
37
38export async function validateAsyncRecordBatchReader<T extends RecordBatchReader>(type: 'json' | 'file' | 'stream', numBatches: number, r: T) {
39 const reader = await r.open();
40 expect(reader).toBeInstanceOf(RecordBatchReader);
41 expect(reader.schema).toBeInstanceOf(Schema);
42 expect(type === 'file' ? reader.isFile() : reader.isStream()).toBe(true);
43 await validateRecordBatchAsyncIterator(numBatches, reader[Symbol.asyncIterator]());
44 expect(reader.closed).toBe(reader.autoDestroy);
45 return reader;
46}
47
48export function validateRecordBatchIterator(numBatches: number, iterator: Iterable<RecordBatch> | IterableIterator<RecordBatch>) {
49 let i = 0;
50 try {
51 for (const recordBatch of iterator) {
52 expect(recordBatch).toBeInstanceOf(RecordBatch);
53 expect(i++).toBeLessThan(numBatches);
54 }
55 } catch (e) { throw new Error(`${i}: ${e}`); }
56 expect(i).toBe(numBatches);
57 if (typeof (iterator as any).return === 'function') {
58 (iterator as any).return();
59 }
60}
61
62export async function validateRecordBatchAsyncIterator(numBatches: number, iterator: AsyncIterable<RecordBatch> | AsyncIterableIterator<RecordBatch>) {
63 let i = 0;
64 try {
65 for await (const recordBatch of iterator) {
66 expect(recordBatch).toBeInstanceOf(RecordBatch);
67 expect(i++).toBeLessThan(numBatches);
68 }
69 } catch (e) { throw new Error(`${i}: ${e}`); }
70 expect(i).toBe(numBatches);
71 if (typeof (iterator as any).return === 'function') {
72 await (iterator as any).return();
73 }
74}